Netty源碼分析——心跳服務(wù)之IdleStateHandler

基于Netty源代碼版本:netty-all-4.1.33.Final

什么是心跳機(jī)制?

心跳說的是在客戶端和服務(wù)端在互相建立ESTABLISH狀態(tài)的時(shí)候,如何通過發(fā)送一個(gè)最簡(jiǎn)單的包來保持連接的存活,還有監(jiān)控另一邊服務(wù)的可用性等。

心跳機(jī)制

  • 心跳是在TCP長(zhǎng)連接中,客戶端和服務(wù)端定時(shí)向?qū)Ψ桨l(fā)送數(shù)據(jù)包通知對(duì)方自己還在線,保證連接的有效性的一種機(jī)制
  • 在服務(wù)器和客戶端之間一定時(shí)間內(nèi)沒有數(shù)據(jù)交互時(shí), 即處于 idle 狀態(tài)時(shí), 客戶端或服務(wù)器會(huì)發(fā)送一個(gè)特殊的數(shù)據(jù)包給對(duì)方, 當(dāng)接收方收到這個(gè)數(shù)據(jù)報(bào)文后, 也立即發(fā)送一個(gè)特殊的數(shù)據(jù)報(bào)文, 回應(yīng)發(fā)送方, 此即一個(gè) PING-PONG 交互. 自然地, 當(dāng)某一端收到心跳消息后, 就知道了對(duì)方仍然在線, 這就確保 TCP 連接的有效性

心跳實(shí)現(xiàn)

  • 使用TCP協(xié)議層的Keeplive機(jī)制,但是該機(jī)制默認(rèn)的心跳時(shí)間是2小時(shí),依賴操作系統(tǒng)實(shí)現(xiàn)不夠靈活;
  • 應(yīng)用層實(shí)現(xiàn)自定義心跳機(jī)制,比如基于Netty的IdleStateHandler實(shí)現(xiàn)心跳機(jī)制;

心跳包的作用

  • ?;?br> Q:為什么說心跳機(jī)制能保持連接的存活,它是集群中或長(zhǎng)連接中最為有效避免網(wǎng)絡(luò)中斷的一個(gè)重要的保障措施?
    A:之所以說是“避免網(wǎng)絡(luò)中斷的一個(gè)重要保障措施”,原因是:我們得知公網(wǎng)IP是一個(gè)寶貴的資源,一旦某一連接長(zhǎng)時(shí)間的占用并且不發(fā)數(shù)據(jù),這怎能對(duì)得起網(wǎng)絡(luò)給此連接分配公網(wǎng)IP,這簡(jiǎn)直是對(duì)網(wǎng)絡(luò)資源最大的浪費(fèi),所以基本上所有的NAT路由器都會(huì)定時(shí)的清除那些長(zhǎng)時(shí)間沒有數(shù)據(jù)傳輸?shù)挠成浔眄?xiàng)。一是回收IP資源,二是釋放NAT路由器本身內(nèi)存的資源,這樣問題就來了,連接被從中間斷開了,雙發(fā)還都不曉得對(duì)方已經(jīng)連通不了了,還會(huì)繼續(xù)發(fā)數(shù)據(jù),這樣會(huì)有兩個(gè)結(jié)果:a) 發(fā)方會(huì)收到NAT路由器的RST包,導(dǎo)致發(fā)方知道連接已中斷;b) 發(fā)方?jīng)]有收到任何NAT的回執(zhí),NAT只是簡(jiǎn)單的drop相應(yīng)的數(shù)據(jù)包
    通常我們測(cè)試得出的是第二種情況會(huì)多些,就是客戶端是不知道自己應(yīng)經(jīng)連接斷開了,所以這時(shí)候心跳就可以和NAT建立關(guān)聯(lián)了,只要我們?cè)贜AT認(rèn)為合理連接的時(shí)間內(nèi)發(fā)送心跳數(shù)據(jù)包,這樣NAT會(huì)繼續(xù)keep連接的IP映射表項(xiàng)不被移除,達(dá)到了連接不會(huì)被中斷的目的。

  • 檢測(cè)另一端服務(wù)是否可用
    TCP的斷開可能有時(shí)候是不能瞬時(shí)探知的,甚至是不能探知的,也可能有很長(zhǎng)時(shí)間的延遲,如果前端沒有正常的斷開TCP連接,四次握手沒有發(fā)起,服務(wù)端無(wú)從得知客戶端的掉線,這個(gè)時(shí)候我們就需要心跳包來檢測(cè)另一端服務(wù)是否還存活可用。

基于TCP的keepalive機(jī)制實(shí)現(xiàn)

基于TCP的keepalive機(jī)制,由具體的TCP協(xié)議棧來實(shí)現(xiàn)長(zhǎng)連接的維持。如在netty中可以在創(chuàng)建channel的時(shí)候,指定SO_KEEPALIVE參數(shù)來實(shí)現(xiàn):



存在的問題:Netty只能控制SO_KEEPALIVE這個(gè)參數(shù),其他參數(shù),則需要從系統(tǒng)的sysctl中讀取,其中比較關(guān)鍵的是tcp_keepalive_time,發(fā)送心跳包檢測(cè)的時(shí)間間隔,默認(rèn)為7200s,即空閑后,每2小時(shí)檢測(cè)一次。如果客戶端在這2小時(shí)內(nèi)斷開了,那么服務(wù)端也要維護(hù)這個(gè)連接2小時(shí),浪費(fèi)服務(wù)端資源;另外就是對(duì)于需要實(shí)時(shí)傳輸數(shù)據(jù)的場(chǎng)景,客戶端斷開了,服務(wù)端也要2小時(shí)后才能發(fā)現(xiàn)。服務(wù)端發(fā)送心跳檢測(cè),具體可能出現(xiàn)的情況如下:

  • (1)連接正常:客戶端仍然存在,網(wǎng)絡(luò)連接狀況良好。此時(shí)客戶端會(huì)返回一個(gè) ACK 。 服務(wù)端接收到ACK后重置計(jì)時(shí)器,在2小時(shí)后再發(fā)送探測(cè)。如果2小時(shí)內(nèi)連接上有數(shù)據(jù)傳輸,那么在該時(shí)間基礎(chǔ)上向后推延2個(gè)小時(shí);
  • (2)連接斷開:客戶端異常關(guān)閉,或是網(wǎng)絡(luò)斷開。在這兩種情況下,客戶端都不會(huì)響應(yīng)。服務(wù)器沒有收到對(duì)其發(fā)出探測(cè)的響應(yīng),并且在一定時(shí)間(系統(tǒng)默認(rèn)為 1000 ms )后重復(fù)發(fā)送 keep-alive packet ,并且重復(fù)發(fā)送一定次數(shù)。
  • (3)客戶端曾經(jīng)崩潰,但已經(jīng)重啟:這種情況下,服務(wù)器將會(huì)收到對(duì)其存活探測(cè)的響應(yīng),但該響應(yīng)是一個(gè)復(fù)位,從而引起服務(wù)器對(duì)連接的終止。

基于Netty的IdleStateHandler實(shí)現(xiàn)

什么是 IdleStateHandler

當(dāng)連接的空閑時(shí)間(讀或者寫)太長(zhǎng)時(shí),將會(huì)觸發(fā)一個(gè) IdleStateEvent 事件。然后,你可以通過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。

如何使用?

IdleStateHandler 既是出站處理器也是入站處理器,繼承了 ChannelDuplexHandler 。通常在 initChannel 方法中將 IdleStateHandler 添加到 pipeline 中。然后在自己的 handler 中重寫 userEventTriggered 方法,當(dāng)發(fā)生空閑事件(讀或者寫),就會(huì)觸發(fā)這個(gè)方法,并傳入具體事件。
這時(shí),你可以通過 Context 對(duì)象嘗試向目標(biāo) Socekt 寫入數(shù)據(jù),并設(shè)置一個(gè) 監(jiān)聽器,如果發(fā)送失敗就關(guān)閉 Socket (Netty 準(zhǔn)備了一個(gè) ChannelFutureListener.CLOSE_ON_FAILURE 監(jiān)聽器用來實(shí)現(xiàn)關(guān)閉 Socket 邏輯)。
這樣,就實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的心跳服務(wù)。

先通過一段代碼來學(xué)習(xí)下IdleStateHandler的用法:

MyServerHandler:(負(fù)責(zé)監(jiān)測(cè)通道的各種狀態(tài)并處理空閑事件IdleStateEvent)

public class MyServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            String eventType = null;
            switch (event.state()){
                case READER_IDLE:
                    eventType = "讀空閑";
                    break;
                case WRITER_IDLE:
                    eventType = "寫空閑";
                    break;
                case ALL_IDLE:
                    eventType = "讀寫空閑";
                    break;
            }
            System.out.println(ctx.channel().remoteAddress() + "超時(shí)事件:" + eventType);
            ctx.channel().close();
        }else{
            super.userEventTriggered(ctx, evt);
        }
    }
}

服務(wù)器代碼:

public class MyServer {

    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new MyServerInitializer());
            ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

public class MyServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new IdleStateHandler(20,40,60, TimeUnit.SECONDS));
        pipeline.addLast(new MyServerHandler());
    }
}

客戶端代碼:

public class MyClient {

    public static void main(String[] args) throws Exception {
        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                    .handler(new MyClientInitializer());
            Channel channel = bootstrap.connect("localhost", 8899).sync().channel();
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            for(;;){
                channel.writeAndFlush(reader.readLine() + "\r\n");
            }
        } finally {
            eventLoopGroup.shutdownGracefully();
        }
    }
}

public class MyClientInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new DelimiterBasedFrameDecoder(4096, Delimiters.lineDelimiter()));
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));
        pipeline.addLast(new MyClientHandler());
    }
}

public class MyClientHandler extends SimpleChannelInboundHandler<String> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println(msg);
    }
}

源碼分析

構(gòu)造方法

IdleStateHandler構(gòu)造器

  • readerIdleTime讀空閑超時(shí)時(shí)間設(shè)定,如果channelRead()方法超過readerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法;
  • writerIdleTime寫空閑超時(shí)時(shí)間設(shè)定,如果write()方法超過writerIdleTime時(shí)間未被調(diào)用則會(huì)觸發(fā)超時(shí)事件調(diào)用userEventTrigger()方法;
  • allIdleTime所有類型的空閑超時(shí)時(shí)間設(shè)定,包括讀空閑和寫空閑;
  • unit時(shí)間單位,包括時(shí)分秒等;
public IdleStateHandler(
        long readerIdleTime, long writerIdleTime, long allIdleTime,
        TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

該類有 3 個(gè)構(gòu)造方法,主要對(duì)一下 4 個(gè)屬性賦值:

private final boolean observeOutput;// 是否考慮出站時(shí)較慢的情況。默認(rèn)值是false(不考慮)。
private final long readerIdleTimeNanos; // 讀事件空閑時(shí)間,0 則禁用事件
private final long writerIdleTimeNanos;// 寫事件空閑時(shí)間,0 則禁用事件
private final long allIdleTimeNanos; //讀或?qū)懣臻e時(shí)間,0 則禁用事件

可以分別控制讀,寫,讀寫超時(shí)的時(shí)間,單位為秒,如果是0表示不檢測(cè),所以如果全是0,則相當(dāng)于沒添加這個(gè)IdleStateHandler,連接是個(gè)普通的短連接。

handlerAdded 方法

IdleStateHandler是在創(chuàng)建IdleStateHandler實(shí)例并添加到ChannelPipeline時(shí)添加定時(shí)任務(wù)來進(jìn)行定時(shí)檢測(cè)的,具體在initialize(ctx)方法實(shí)現(xiàn);同時(shí)在從ChannelPipeline移除或Channel關(guān)閉時(shí),移除這個(gè)定時(shí)檢測(cè),具體在destroy()實(shí)現(xiàn)

public class IdleStateHandler extends ChannelDuplexHandler {   
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
            // channelActive() event has been fired already, which means this.channelActive() will
            // not be invoked. We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelActive() event has not been fired yet.  this.channelActive() will be invoked
            // and initialization will occur there.
        }
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        destroy();
    }
}

initialize

private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:
        return;
    }

    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        // 這里的 schedule 方法會(huì)調(diào)用 eventLoop 的 schedule 方法,將定時(shí)任務(wù)添加進(jìn)隊(duì)列中
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

只要給定的參數(shù)大于0,就創(chuàng)建一個(gè)定時(shí)任務(wù),每個(gè)事件都創(chuàng)建。同時(shí),將 state 狀態(tài)設(shè)置為 1,防止重復(fù)初始化。調(diào)用 initOutputChanged 方法,初始化 “監(jiān)控出站數(shù)據(jù)屬性”,代碼如下:

private void initOutputChanged(ChannelHandlerContext ctx) {
    if (observeOutput) {
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 記錄了出站緩沖區(qū)相關(guān)的數(shù)據(jù),buf 對(duì)象的 hash 碼,和 buf 的剩余緩沖字節(jié)數(shù)
        if (buf != null) {
            lastMessageHashCode = System.identityHashCode(buf.current());
            lastPendingWriteBytes = buf.totalPendingWriteBytes();
        }
    }
}

讀事件的 run 方法

代碼如下:

private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

    ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {
        long nextDelay = readerIdleTimeNanos;
        if (!reading) {
            nextDelay -= ticksInNanos() - lastReadTime;
        }

        if (nextDelay <= 0) {
            // Reader is idle - set a new timeout and notify the callback.
            // 用于取消任務(wù) promise
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstReaderIdleEvent;
            firstReaderIdleEvent = false;

            try {
                // 再次提交任務(wù)
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                // 觸發(fā)用戶 handler use
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

nextDelay的初始化值為超時(shí)秒數(shù)readerIdleTimeNanos,如果檢測(cè)的時(shí)候沒有正在讀,且計(jì)算多久沒讀了:nextDelay -= 當(dāng)前時(shí)間 - 上次讀取時(shí)間,如果小于0,說明左邊的readerIdleTimeNanos小于空閑時(shí)間(當(dāng)前時(shí)間 - 上次讀取時(shí)間)了,則超時(shí)了
則創(chuàng)建IdleStateEvent事件,IdleState枚舉值為READER_IDLE,然后調(diào)用channelIdle方法分發(fā)給下一個(gè)ChannelInboundHandler,通常由用戶自定義一個(gè)ChannelInboundHandler來捕獲并處理

總的來說,每次讀取操作都會(huì)記錄一個(gè)時(shí)間,定時(shí)任務(wù)時(shí)間到了,會(huì)計(jì)算當(dāng)前時(shí)間和最后一次讀的時(shí)間的間隔,如果間隔超過了設(shè)置的時(shí)間,就觸發(fā) UserEventTriggered 方法。就是這么簡(jiǎn)單。

寫事件的 run 方法

寫任務(wù)的邏輯基本和讀任務(wù)的邏輯一樣,唯一不同的就是有一個(gè)針對(duì) 出站較慢數(shù)據(jù)的判斷。

private final class WriterIdleTimeoutTask extends AbstractIdleTask {

    WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {

        long lastWriteTime = IdleStateHandler.this.lastWriteTime;
        long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
        if (nextDelay <= 0) {
            // Writer is idle - set a new timeout and notify the callback.
            writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstWriterIdleEvent;
            firstWriterIdleEvent = false;

            try {
                //如果這個(gè)方法返回 true,就不執(zhí)行觸發(fā)事件操作了,即使時(shí)間到了??纯丛摲椒▽?shí)現(xiàn):
                if (hasOutputChanged(ctx, first)) {
                    return;
                }

                IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Write occurred before the timeout - set a new timeout with shorter delay.
            writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

如果這個(gè)方法返回 true,就不執(zhí)行觸發(fā)事件操作了,即使時(shí)間到了??纯丛摲椒▽?shí)現(xiàn):

private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
    if (observeOutput) {

        // We can take this shortcut if the ChannelPromises that got passed into write()
        // appear to complete. It indicates "change" on message level and we simply assume
        // that there's change happening on byte level. If the user doesn't observe channel
        // writability events then they'll eventually OOME and there's clearly a different
        // problem and idleness is least of their concerns.
        // 如果最后一次寫的時(shí)間和上一次記錄的時(shí)間不一樣,說明寫操作進(jìn)行過了,則更新此值
        if (lastChangeCheckTimeStamp != lastWriteTime) {
            lastChangeCheckTimeStamp = lastWriteTime;

            // But this applies only if it's the non-first call.
            // 但如果,在這個(gè)方法的調(diào)用間隙修改的,就仍然不觸發(fā)事件
            if (!first) {
                return true;
            }
        }

        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 如果出站區(qū)有數(shù)據(jù)
        if (buf != null) {
            // 拿到出站緩沖區(qū)的 對(duì)象 hashcode
            int messageHashCode = System.identityHashCode(buf.current());
            // 拿到這個(gè) 緩沖區(qū)的 所有字節(jié)
            long pendingWriteBytes = buf.totalPendingWriteBytes();
            // 如果和之前的不相等,或者字節(jié)數(shù)不同,說明,輸出有變化,將 "最后一個(gè)緩沖區(qū)引用" 和 “剩余字節(jié)數(shù)” 刷新
            if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                lastMessageHashCode = messageHashCode;
                lastPendingWriteBytes = pendingWriteBytes;
                // 如果寫操作沒有進(jìn)行過,則任務(wù)寫的慢,不觸發(fā)空閑事件
                if (!first) {
                    return true;
                }
            }
        }
    }

    return false;
}
  • 1、如果用戶沒有設(shè)置了需要觀察出站情況。就返回 false,繼續(xù)執(zhí)行事件。
  • 2、反之,繼續(xù)向下, 如果最后一次寫的時(shí)間和上一次記錄的時(shí)間不一樣,說明寫操作剛剛做過了,則更新此值,但仍然需要判斷這個(gè) first 的值,如果這個(gè)值還是 false,說明在這個(gè)寫事件是在兩個(gè)方法調(diào)用間隙完成的 / 或者是第一次訪問這個(gè)方法,就仍然不觸發(fā)事件。
  • 3、如果不滿足上面的條件,就取出緩沖區(qū)對(duì)象,如果緩沖區(qū)沒對(duì)象了,說明沒有發(fā)生寫的很慢的事件,就觸發(fā)空閑事件。反之,記錄當(dāng)前緩沖區(qū)對(duì)象的 hashcode 和 剩余字節(jié)數(shù),再和之前的比較,如果任意一個(gè)不相等,說明數(shù)據(jù)在變化,或者說數(shù)據(jù)在慢慢的寫出去。那么就更新這兩個(gè)值,留在下一次判斷。
  • 4、繼續(xù)判斷 first ,如果是 fasle,說明這是第二次調(diào)用,就不用觸發(fā)空閑事件了。

所有事件的 run 方法

這個(gè)類叫做 AllIdleTimeoutTask ,表示這個(gè)監(jiān)控著所有的事件。當(dāng)讀寫事件發(fā)生時(shí),都會(huì)記錄。代碼邏輯和寫事件的的基本一致,除了這里:

private final class AllIdleTimeoutTask extends AbstractIdleTask {

    AllIdleTimeoutTask(ChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    protected void run(ChannelHandlerContext ctx) {

        long nextDelay = allIdleTimeNanos;
        if (!reading) {
            // 當(dāng)前時(shí)間減去 最后一次寫或讀 的時(shí)間 ,若大于0,說明超時(shí)了
            nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
        }
        if (nextDelay <= 0) {
            // Both reader and writer are idle - set a new timeout and
            // notify the callback.
            allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

            boolean first = firstAllIdleEvent;
            firstAllIdleEvent = false;

            try {
                if (hasOutputChanged(ctx, first)) {
                    return;
                }

                IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Either read or write occurred before the timeout - set a new
            // timeout with shorter delay.
            allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
}

這里的時(shí)間計(jì)算是取讀寫事件中的最大值來的。然后像寫事件一樣,判斷是否發(fā)生了寫的慢的情況。最后調(diào)用 ctx.fireUserEventTriggered(evt) 方法。

通常這個(gè)使用的是最多的。構(gòu)造方法一般是:

pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));

讀寫都是 0 表示禁用,30 表示 30 秒內(nèi)沒有任務(wù)讀寫事件發(fā)生,就觸發(fā)事件。注意,當(dāng)不是 0 的時(shí)候,這三個(gè)任務(wù)會(huì)重疊。

  • 心跳檢測(cè)也是一種Handler,在啟動(dòng)時(shí)添加到ChannelPipeline管道中,當(dāng)有讀寫操作時(shí)消息在其中傳遞;
socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
  • IdleStateHandler的channelActive()方法在socket通道建立時(shí)被觸發(fā)
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    initialize(ctx);
    super.channelActive(ctx);
}
  • channelActive()方法調(diào)用Initialize()方法,根據(jù)配置的readerIdleTime,WriteIdleTIme等超時(shí)事件參數(shù)往任務(wù)隊(duì)列taskQueue中添加定時(shí)任務(wù)task ;
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
    case 1:
    case 2:
        return;
    }

    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}
  • 定時(shí)任務(wù)添加到對(duì)應(yīng)線程EventLoopExecutor對(duì)應(yīng)的任務(wù)隊(duì)列taskQueue中,在對(duì)應(yīng)線程的run()方法中循環(huán)執(zhí)行
    • 用當(dāng)前時(shí)間減去最后一次channelRead方法調(diào)用的時(shí)間判斷是否空閑超時(shí);
    • 如果空閑超時(shí)則創(chuàng)建空閑超時(shí)事件并傳遞到channelPipeline中;
@Override
protected void run(ChannelHandlerContext ctx) {
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - lastReadTime;
    }

    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstReaderIdleEvent;
        firstReaderIdleEvent = false;

        try {
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
  • 在管道中傳遞調(diào)用自定義的userEventTrigger()方法
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
    ctx.fireUserEventTriggered(evt);
}

總結(jié):

  • IdleStateHandler心跳檢測(cè)主要是通過向線程任務(wù)隊(duì)列中添加定時(shí)任務(wù),判斷channelRead()方法或write()方法是否調(diào)用空閑超時(shí),如果超時(shí)則觸發(fā)超時(shí)事件執(zhí)行自定義userEventTrigger()方法;
  • Netty通過IdleStateHandler實(shí)現(xiàn)最常見的心跳機(jī)制不是一種雙向心跳的PING-PONG模式,而是客戶端發(fā)送心跳數(shù)據(jù)包,服務(wù)端接收心跳但不回復(fù),因?yàn)槿绻?wù)端同時(shí)有上千個(gè)連接,心跳的回復(fù)需要消耗大量網(wǎng)絡(luò)資源;如果服務(wù)端一段時(shí)間內(nèi)沒有收到客戶端的心跳數(shù)據(jù)包則認(rèn)為客戶端已經(jīng)下線,將通道關(guān)閉避免資源的浪費(fèi);在這種心跳模式下服務(wù)端可以感知客戶端的存活情況,無(wú)論是宕機(jī)的正常下線還是網(wǎng)絡(luò)問題的非正常下線,服務(wù)端都能感知到,而客戶端不能感知到服務(wù)端的非正常下線;
  • 要想實(shí)現(xiàn)客戶端感知服務(wù)端的存活情況,需要進(jìn)行雙向的心跳;Netty中的channelInactive()方法是通過Socket連接關(guān)閉時(shí)揮手?jǐn)?shù)據(jù)包觸發(fā)的,因此可以通過channelInactive()方法感知正常的下線情況,但是因?yàn)榫W(wǎng)絡(luò)異常等非正常下線則無(wú)法感知;

參考:
https://www.cnblogs.com/java-chen-hao/p/11453198.html

https://www.cnblogs.com/insaneXs/p/9776164.html

https://blog.csdn.net/u013967175/article/details/78591810

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文是Netty文集中“Netty 那些事兒”系列的文章。主要結(jié)合在開發(fā)實(shí)戰(zhàn)中,我們遇到的一些“奇奇怪怪”的問題,...
    tomas家的小撥浪鼓閱讀 7,974評(píng)論 2 17
  • 前言:Netty 提供的心跳介紹 Netty 作為一個(gè)網(wǎng)絡(luò)框架,提供了諸多功能,比如我們之前說的編解碼,Netty...
    莫那一魯?shù)?/span>閱讀 16,023評(píng)論 0 15
  • 什么是心跳機(jī)制? 心跳說的是在客戶端和服務(wù)端在互相建立ESTABLISH狀態(tài)的時(shí)候,如何通過發(fā)送一個(gè)最簡(jiǎn)單的包來保...
    tracy_668閱讀 5,176評(píng)論 1 5
  • 言語(yǔ)是這個(gè)世界最甜蜜的糖 也是最無(wú)力的解藥 沉默 看著花靜靜的落 你就只挨著我吧 一個(gè)人 看天空都在憂傷 明天 我就好了
    憐月戈閱讀 191評(píng)論 0 1
  • 趕在六月的尾巴,陰雨了許久的天氣終于放晴了。打掃廚房時(shí),我發(fā)現(xiàn)柜子里都有些生霉了呃,真希望天氣盡快晴好...
    小隱時(shí)光閱讀 476評(píng)論 0 3

友情鏈接更多精彩內(nèi)容