Netty之IdleStateHandler源碼閱讀

如何使用

1.我們構(gòu)造netty服務(wù)端的時候,在childHandler里,先獲取到pipeline,然后p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));

p.addLast(new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS));
public IdleStateHandler(long readerIdleTime, long writerIdleTime,
                        long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

2.我們還需要寫一個handler,來實(shí)現(xiàn)超時后需要做的事.netty把超時和超時后任務(wù)的觸發(fā)解耦了.(這是不是觀察者模式的具體應(yīng)用呢?)

HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
    //沒有并發(fā)問題,因?yàn)槊總€連接都會單獨(dú)new一個HeartBeatServerHandler對象.
    private int lossConnectCount = 0;
        
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("已經(jīng)5秒未收到客戶端的消息了!");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                lossConnectCount++;
                if (lossConnectCount > 2) {
                    System.out.println("關(guān)閉這個不活躍通道!");
                    ctx.channel().close();
                }
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        //重置計(jì)數(shù)器
        lossConnectCount = 0;
        System.out.println("client says: " + msg.toString());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

類注釋

Triggers an IdleStateEvent when a Channel has not performed read, write, or both operation for a while.

(觸發(fā)一個IdleStateEvent當(dāng)一個Channel一段時間內(nèi)沒有進(jìn)行讀或者寫,或者讀寫)

一些問題

假設(shè)netty里沒有這個相關(guān)的功能,需要我們自己設(shè)計(jì)一個IdleStateHandler,該怎么做呢?

需求分析:當(dāng)客戶端和服務(wù)端建立連接的時候,如果客戶端一段時間沒有操作,讀或者寫,那么我們就可以自定義的進(jìn)行一些操作.

問題1.初始化該執(zhí)行什么操作?

問題2.如何判斷超時?

問題3.讀寫如何分開判斷?

問題4.判斷超時該用哪個線程?netty的io線程還是自定義線程?

我們帶著這些問題來看看netty的設(shè)計(jì).

netty的設(shè)計(jì)

我們先看一下整體的流程.

1.當(dāng)我們new IdleStateHandler(30, 0, 0, TimeUnit.SECONDS)時,發(fā)生了什么?

這個就是把handler加入到pipeline里,后面有io事件觸發(fā)時,就被handler攔截.然后這個構(gòu)造器里會初始化一些值.

核心的就是三個時間:

readerIdleTimeNanos

writerIdleTimeNanos

allIdleTimeNanos

  public IdleStateHandler(boolean observeOutput,
                            long readerIdleTime, long writerIdleTime, long allIdleTime,
                            TimeUnit unit) {
        ObjectUtil.checkNotNull(unit, "unit");

        this.observeOutput = observeOutput;
                //會有個和最小時間比較的邏輯.
        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }

2.客戶端第一次創(chuàng)建連接的時候發(fā)生了什么?

//IdleStateHandler的channelActive()方法在socket通道建立時被觸發(fā)
//ctx的傳遞要搞清楚
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    // This method will be invoked only if this handler was added
    // before channelActive() event is fired.  If a user adds this handler
    // after the channelActive() event, initialize() will be called by beforeAdd().
    initialize(ctx);//schedule
    super.channelActive(ctx);//傳播事件
}

這里相當(dāng)于一個任務(wù)的生產(chǎn)者.任務(wù)的執(zhí)行就委托給io線程了.具體看SingleThreadEventExecutor里面的邏輯

//重要的入口,會開啟定時任務(wù)
private void initialize(ChannelHandlerContext ctx) {
    // Avoid the case where destroy() is called before scheduling timeouts.
    // See: https://github.com/netty/netty/issues/143
    switch (state) {
        case 1:
        case 2:
            return;
    }

    state = 1;
    initOutputChanged(ctx);
        //
    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

獲取execut然后執(zhí)行schedule.

ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
    //從cts獲取執(zhí)行器.然后調(diào)度.
    return ctx.executor().schedule(task, delay, unit);
}

AbstractScheduledEventExecutor里.

//調(diào)度.
private <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
    //當(dāng)前的線程是否eventLoop里的線程.
    if (inEventLoop()) {
        scheduleFromEventLoop(task);
    } else {//什么時候會出現(xiàn)執(zhí)行的線程跟綁定的線程不一致呢?
        //獲取deadline
        final long deadlineNanos = task.deadlineNanos();
        // task will add itself to scheduled task queue when run if not expired
        if (beforeScheduledTaskSubmitted(deadlineNanos)) {
            execute(task);
        } else {
            lazyExecute(task);
            // Second hook after scheduling to facilitate race-avoidance
            if (afterScheduledTaskSubmitted(deadlineNanos)) {
                execute(WAKEUP_TASK);
            }
        }
    }
    return task;
}

就到了io線程的

IdleStateHandler

這個類繼承了ChannelDuplexHandler.(這個類細(xì)節(jié)比較多,后面單獨(dú)拎出來講一下)

ChannelHandler implementation which represents a combination out of a ChannelInboundHandler and the ChannelOutboundHandler. It is a good starting point if your ChannelHandler implementation needs to intercept operations and also state updates.

內(nèi)部類

AbstractIdleTask
private abstract static class AbstractIdleTask implements Runnable {
        //需要ctx來獲取executor.
    private final ChannelHandlerContext ctx;

    AbstractIdleTask(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }
        //模板的run方法.
    @Override
    public void run() {
        if (!ctx.channel().isOpen()) {
            return;
        }
        run(ctx);
    }

    protected abstract void run(ChannelHandlerContext ctx);
}

下面的三個類無非就是實(shí)現(xiàn)自己的run方法.把變化的點(diǎn)抽象出來,由子類來實(shí)現(xiàn).其實(shí)也就是初始化的時間的不同,其他都是一樣的.當(dāng)chanelIdle的時候,就會fireUserEventTriggered,這時候就完成一次超時的處理了.

AllIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    long nextDelay = allIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
    }
    if (nextDelay <= 0) {
        // Both reader and writer are idle - set a new timeout and
        // notify the callback.
        allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstAllIdleEvent;
        firstAllIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }

            IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Either read or write occurred before the timeout - set a new
        // timeout with shorter delay.
        allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}
ReaderIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {
    //下一次超時的時間.
    long nextDelay = readerIdleTimeNanos;
    //剛開始讀的時候是true,讀完變成false.
    if (!reading) {
        //下一次超時的時間減去當(dāng)前時間和上一次讀的時間差. why?
        nextDelay -= (ticksInNanos() - lastReadTime);
    }
        //如果小于0,已經(jīng)超時了
    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
                //
        boolean first = firstReaderIdleEvent;
        //
        firstReaderIdleEvent = false;
                //
        try {
                        //創(chuàng)建一個event
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

channel空閑了.fireUserEventTriggered,而我們的業(yè)務(wù)處理剛好實(shí)現(xiàn)了這個方法.

    /**
     * Is called when an {@link IdleStateEvent} should be fired. This implementation calls
     * {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
     */
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        //see.
        ctx.fireUserEventTriggered(evt);
    }
WriterIdleTimeoutTask
@Override
protected void run(ChannelHandlerContext ctx) {

    long lastWriteTime = IdleStateHandler.this.lastWriteTime;
    long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
    //已經(jīng)超時.
    if (nextDelay <= 0) {
        // Writer is idle - set a new timeout and notify the callback.
        writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstWriterIdleEvent;
        firstWriterIdleEvent = false;

        try {
            if (hasOutputChanged(ctx, first)) {
                return;
            }
                        //
            IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
            //已經(jīng)idle了.
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Write occurred before the timeout - set a new timeout with shorter delay.
        writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

ReadTimeoutHandler

ReadTimeoutHandler extends IdleStateHandler

讀超時的handler,會報(bào)ReadTimeoutException的錯,當(dāng)一定時間內(nèi)沒有讀到數(shù)據(jù).

WriteTimeoutHandler

當(dāng)寫操作不能在一定的時間完成的化,報(bào)WriteTimeoutException錯.

這個類并沒有繼承IdleStateHandler,就不在這里講了,有興趣的可以去看看,也很簡單.

IdleStateEvent

可以理解為netty內(nèi)部把這個空閑狀態(tài)事件封裝好了,傳個最終的業(yè)務(wù)調(diào)用方法.

源碼沒什么特殊的邏輯就不貼了.

IdleState

簡單的枚舉值.

public enum IdleState {
    /**
     * No data was received for a while.
     */
    READER_IDLE,
    /**
     * No data was sent for a while.
     */
    WRITER_IDLE,
    /**
     * No data was either received or sent for a while.
     */
    ALL_IDLE
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容