【netty學(xué)習(xí)筆記八】IdleStateHandler心跳檢測機(jī)制

在節(jié)點(diǎn)通信時(shí),經(jīng)常需要心跳機(jī)制來探測對(duì)方是否是存活的。在netty中,IdleStateHandler就能提供這種心跳檢測功能。讓我們先看看例子:
服務(wù)端添加的handler

.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
   ChannelPipeline p = ch.pipeline();
   //添加IdleStateHandler,5s檢測一次讀事件
   p.addLast(new IdleStateHandler(5, 0, 0));
   p.addLast(new TimeServerHandler());
   p.addLast(new HeartBeatServerHandler());
}

public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
    int lossConnectCount = 0;
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.READER_IDLE){
                lossConnectCount++;
                if (lossConnectCount>2){
                    System.out.println("關(guān)閉這個(gè)不活躍通道!");
                    ctx.channel().close();
                }
            }
        }else {
            super.userEventTriggered(ctx,evt);
        }
    }
}

服務(wù)端我們添加一個(gè)IdleStateHandler,若5s內(nèi)無讀事件則觸發(fā)心跳處理方法HeartBeatServerHandler#userEventTriggered,若連續(xù)2次無讀事件,則關(guān)閉這個(gè)客戶端channel。
客戶端:

.handler(new ChannelInitializer<SocketChannel>(){
@Override
protected void initChannel(SocketChannel ch) throws Exception {
        ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
    ch.pipeline().addLast(new TimeClientHandler());
    }
});

在看IdleStateHandler實(shí)現(xiàn)前,我們先想一下如果自己實(shí)現(xiàn)的話會(huì)怎么實(shí)現(xiàn)呢?
首先,IdleStateHandler在每個(gè)客戶端接入時(shí),會(huì)生成一個(gè)對(duì)象。同時(shí)要初始化做一些事,比如對(duì)讀或?qū)懯录M(jìn)行定時(shí)判斷,看在指定的時(shí)間內(nèi)有沒有感興趣的讀/寫事件,沒有則觸發(fā)事件,做一些自定義的事情。同時(shí)上次讀/寫事件完成后需更新時(shí)間,以便定時(shí)任務(wù)能及時(shí)感知。定時(shí)判斷可以利用EventLoop父類自帶的schedule調(diào)度方法, 更新上次讀/寫事件完成后的時(shí)間需要監(jiān)聽讀完成、寫完成事件,需要實(shí)現(xiàn)入站、出站接口。
讓我們看看netty中的實(shí)現(xiàn):

public class IdleStateHandler extends ChannelDuplexHandler

繼承了ChannelDuplexHandler類,此類實(shí)現(xiàn)了入站、出站接口。繼續(xù)看構(gòu)造方法:

public IdleStateHandler(boolean observeOutput,
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        this.observeOutput = observeOutput;

        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }

初始化了讀、寫、讀或?qū)懙某瑫r(shí)參數(shù)。再看看初始化調(diào)度方法:

private void initialize(ChannelHandlerContext ctx) {
        // Avoid the case where destroy() is called before scheduling timeouts.
        // See: https://github.com/netty/netty/issues/143
        //state: 0 - none, 1 - initialized, 2 - destroyed
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        initOutputChanged(ctx);

        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

initialize方法在addLast -> handlerAdded中會(huì)調(diào)用。比如我們?cè)O(shè)置了檢測讀事件,那readerIdleTimeNanos>0,會(huì)執(zhí)行schedule方法。這里會(huì)傳一個(gè)ReaderIdleTimeoutTask對(duì)象過去,讓我們先看看ReaderIdleTimeoutTask做了什么:

private final class ReaderIdleTimeoutTask extends AbstractIdleTask {

        ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
            super(ctx);
        }

        @Override
        protected void run(ChannelHandlerContext ctx) {
            //讀超時(shí)時(shí)間
            long nextDelay = readerIdleTimeNanos;
            //如果還在讀時(shí)間進(jìn)行中則不進(jìn)行判斷
            if (!reading) {
                //判斷當(dāng)前時(shí)間-上次讀時(shí)間是否大于超時(shí)時(shí)間
                nextDelay -= ticksInNanos() - lastReadTime;
            }

            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
               
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                     //觸發(fā)fireUserEventTriggered方法
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }
private abstract static class AbstractIdleTask implements Runnable {

        private final ChannelHandlerContext ctx;

        AbstractIdleTask(ChannelHandlerContext ctx) {
            this.ctx = ctx;
        }

        @Override
        public void run() {
            if (!ctx.channel().isOpen()) {
                return;
            }
            //調(diào)用子類的run方法
            run(ctx);
        }

        protected abstract void run(ChannelHandlerContext ctx);
    }

再看看schedule方法:

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
        return ctx.executor().schedule(task, delay, unit);
    }
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

schedule方法最終會(huì)調(diào)用AbstractScheduledEventExecutor#schedule方法,將定時(shí)任務(wù)包裝成ScheduledFutureTask放入scheduledTaskQueue隊(duì)列中。在eventLoop的runAllTasks中會(huì)拉取task進(jìn)行調(diào)度(如果到了此延遲任務(wù)執(zhí)行的時(shí)候)。
再看看上次讀事件更新:

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            //讀事件進(jìn)行中,會(huì)將reading設(shè)為true,此時(shí)不會(huì)除非讀檢測事件。
            reading = true;
            firstReaderIdleEvent = firstAllIdleEvent = true;
        }
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        //讀事件完成后會(huì)更新讀時(shí)間
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            lastReadTime = ticksInNanos();
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }

其他檢測事件也類似,這里就不分析了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容