Netty源碼分析之eventLoop和eventLoopGroup

轉(zhuǎn):https://blog.csdn.net/lz710117239/article/details/77414726

Reactor單線程模型

1.作為NIO服務(wù)端,接收客戶端的TCP連接
2.作為NIO客戶端,向服務(wù)端發(fā)起TCP連接
3.讀取通信對(duì)端的請(qǐng)求或者應(yīng)答消
4.向通信對(duì)端發(fā)送消息請(qǐng)求或者應(yīng)答消息

Reactor單線程模型如下圖所示:


圖片.png

由于Reactor模式使用的是異步非阻塞I/O,所有的I/O操作都不會(huì)導(dǎo)致阻塞,理論上一個(gè)線程可以獨(dú)立處理所有I/O相關(guān)的操作。從架構(gòu)層面上看,一個(gè)NIO線程確實(shí)可以完成其承擔(dān)的職責(zé)。例如,通過(guò)Acceptor類接收客戶端的TCP連接請(qǐng)求信息,當(dāng)鏈路建立成功之后,通過(guò)Dispatch將對(duì)應(yīng)的ByteBuffer派發(fā)到指定的Handler上,進(jìn)行消息解碼。用戶線程消息編碼后通過(guò)NIO線程將消息發(fā)送給客戶端。

在一些小容量的應(yīng)用場(chǎng)景下,可以使用單線程模型。但是這對(duì)于高負(fù)載,大并發(fā)的應(yīng)用場(chǎng)景卻不合適,主要原因如下。

1.一個(gè)NIO線程同時(shí)處理成百上千的鏈路,性能上無(wú)法支撐,即便NIO線程的CPU負(fù)荷達(dá)到100%,也無(wú)法滿足海量消息的編碼,解碼,讀取和發(fā)送。
2.當(dāng)NIO線程負(fù)載過(guò)重之后,處理速度將變慢,這會(huì)導(dǎo)致大量客戶端連接超時(shí),超時(shí)之后往往會(huì)進(jìn)行重發(fā),這更加NIO線程的負(fù)載,最終會(huì)導(dǎo)致大量消息擠壓和處理超時(shí),成為系統(tǒng)的性能瓶頸。
3.可靠性問(wèn)題:一旦NIO線程意外跑飛,或者進(jìn)入死循環(huán),會(huì)導(dǎo)致整個(gè)系統(tǒng)通信模塊不可用,不能接收和處理外部消息,造成節(jié)點(diǎn)故障。

Reactor多線程模型

Reactor多線程模型與單線程模型最大的卻別就是有一組NIO線程來(lái)處理I/O操作,它的原理如圖所示:


圖片.png

Reactor多線程模型的特點(diǎn)如下所示:
1.有專門(mén)一個(gè)NIO線程——Acceptor線程用于監(jiān)聽(tīng)服務(wù)端,接收客戶端的TCP連接請(qǐng)求。
2.網(wǎng)絡(luò)I/O操作——讀,寫(xiě)等由一個(gè)NIO線程負(fù)責(zé),線程池可以采用標(biāo)準(zhǔn)的JDK線程池實(shí)現(xiàn),它包含一個(gè)任務(wù)隊(duì)列和N個(gè)可用的線程,由這些NIO線程負(fù)責(zé)消息的讀取,解碼,編碼和發(fā)送。
3.一個(gè)NIO線程可以同時(shí)處理N條鏈路,但是一個(gè)鏈路只對(duì)應(yīng)一個(gè)NIO線程,防止發(fā)生并發(fā)操作問(wèn)題。

在大多數(shù)場(chǎng)景下,Reactor多線程模型可以滿足性能需求。但是,在個(gè)別特殊場(chǎng)景中,一個(gè)NIO線程負(fù)責(zé)監(jiān)聽(tīng)和處理所有的客戶端鏈接可能會(huì)存在性能問(wèn)題。例如并發(fā)百萬(wàn)客戶端連接,或者服務(wù)端需要對(duì)客戶端握手進(jìn)行安全認(rèn)證,但是認(rèn)證本身非常損耗性能。在這類場(chǎng)景下,單獨(dú)一個(gè)Acceptor線程可能會(huì)存在性能不足的問(wèn)題,為了解決性能問(wèn)題,產(chǎn)生了第三種Reactor線程模型——主從Reactor多線程模型。

主從Reactor多線程模型

主從Reactor線程模型的特點(diǎn)是:服務(wù)端用于接收客戶端鏈接的不在是一個(gè)單獨(dú)的NIO線程,而是一個(gè)獨(dú)立的NIO線程池。Acceptor接收到客戶端TCP鏈接請(qǐng)求并處理完成后(可能包含接入認(rèn)證等),將新創(chuàng)建的SocketChannel注冊(cè)到I/O線程池(sub reacotr線程池)的某個(gè)I/O線程上,由它負(fù)責(zé)SocketChannel的讀寫(xiě)和編碼工作。Acceptor線程池僅僅用于客戶端登錄,握手,和安全認(rèn)證,一旦鏈路建立成功,就將鏈路注冊(cè)到后端subReactor線程池的I/O線程上,有I/O線程負(fù)責(zé)后續(xù)的I/O操作。


圖片.png

利用主從NIO線程模型,可以解決一個(gè)服務(wù)端監(jiān)聽(tīng)線程無(wú)法有效處理所有客戶端連接的性能不足問(wèn)題。因此,在Netty的官方Demo中,推薦使用該線程模型。

Netty的線程模型

Netty的線程模型并不是一成不變的,它實(shí)際取決于用戶的啟動(dòng)參數(shù)配置。通過(guò)設(shè)置不同的啟動(dòng)參數(shù),Netty可以同時(shí)支持Reactor單線程模型,多線程模型和主從Reactor多線程模型。如下圖:


圖片.png

通過(guò)服務(wù)端啟動(dòng)代碼來(lái)了解它的線程模型:

//配置服務(wù)端的NIO線程組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());
            //綁定端口,同步等待成功
            ChannelFuture f = b.bind(port).sync();
            //等待服務(wù)端監(jiān)聽(tīng)端口關(guān)閉
            f.channel().closeFuture().sync();

        }finally {
            //優(yōu)雅退出,釋放線程池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

服務(wù)端啟動(dòng)的時(shí)候,創(chuàng)建了兩個(gè)NioEventLoopGroup,它們實(shí)際是兩個(gè)獨(dú)立的Reactor線程池。一個(gè)用于接收客戶端的TCP連接,另一個(gè)用于處理I/O相關(guān)的讀寫(xiě)操作,或者執(zhí)行系統(tǒng)Task,定時(shí)任務(wù)Task等。
Netty用于接收客戶端請(qǐng)求的線程池職責(zé)如下。
(1)接收客戶端TCP連接欸,初始化Channel參數(shù);
(2)將鏈路狀態(tài)變更事件通知給ChannelPipeline。
Netty處理I/O操作的Reactor線程池職責(zé)如下。
(1)異步讀取通信對(duì)端的數(shù)據(jù)報(bào),發(fā)送讀事件到ChannelPipeline;
(2)異步發(fā)送消息到通信對(duì)端,調(diào)用ChannelPipeline的消息發(fā)送接口;
(3)執(zhí)行系統(tǒng)調(diào)用Task;
(4)執(zhí)行定時(shí)任務(wù)Task,例如鏈路空閑狀態(tài)檢測(cè)定時(shí)任務(wù)。
通過(guò)調(diào)整線程池的線程個(gè)數(shù),是否共享線程池等方式,Netty的Reactor線程模型可以在單線程,多線程和主從多線程間切換,這種靈活的配置方式可以最大程度地滿足不同用戶的個(gè)性化定制。

為了盡可能地提升性能,Netty在很多地方進(jìn)行了無(wú)鎖化的設(shè)計(jì),例如在I/O線程內(nèi)部進(jìn)行穿行操作,避免多線程競(jìng)爭(zhēng)導(dǎo)致的性能下降問(wèn)題。表面上看,串行化似乎CPU利用率不高,并發(fā)程度不夠。但是,通過(guò)調(diào)整NIO線程池的線程參數(shù),可以同hi啟動(dòng)多個(gè)串行化的線程并行運(yùn)行,這種局部無(wú)鎖化的串行線程設(shè)計(jì)相比一個(gè)隊(duì)列——多個(gè)工作線程的模型性能更好。

它的設(shè)計(jì)原理如下如所示:


圖片.png

Netty的NioEventLoop讀取到消息之后,直接調(diào)用ChannelPipeline的fireChannelRead(Object msg)。只要用戶不主動(dòng)切換線程,一直都是由NioEventLoop調(diào)用用戶的handler,期間不進(jìn)行線程切換。這種串行化處理方式避免了多線程操作導(dǎo)致的鎖的競(jìng)爭(zhēng),從性能角度看是最優(yōu)的。

NioEventLoop源碼分析

NioEventLoop設(shè)計(jì)原理

Netty的NioEventLoop并不是要給純粹的I/O線程,它除了負(fù)責(zé)I/O的讀寫(xiě)之外,還兼顧處理以下兩類任務(wù)。

系統(tǒng)Task:通過(guò)調(diào)用NioeventLoop的execute(Runnable task)方法實(shí)現(xiàn),Netty有很多系統(tǒng)Task,創(chuàng)建它們的主要原因是:當(dāng)I/O線程和用戶線程同時(shí)操作網(wǎng)絡(luò)資源時(shí),為了防止并發(fā)操作導(dǎo)致的鎖競(jìng)爭(zhēng),將用戶線程的操作封裝成Task放入消息隊(duì)列中,由I/O線程負(fù)責(zé)執(zhí)行,這樣就實(shí)現(xiàn)了局部無(wú)鎖化。
定時(shí)任務(wù):調(diào)用NioEventLoop的schedule(Runnable command,long delay,TimeUnit unit)方法實(shí)現(xiàn)。
正是因?yàn)镹ioEventLoop具備多種職責(zé),所以它的實(shí)現(xiàn)比較特殊,它并不是簡(jiǎn)單的Runnable,我們來(lái)看下它們的繼承關(guān)系。如下:


圖片.png

它實(shí)現(xiàn)了EventLoop接口、EventExecutorGroup接口和ScheduledExecutorService接口,正是因?yàn)檫@種設(shè)計(jì),導(dǎo)致NioEventLoop和其父類功能實(shí)現(xiàn)非常復(fù)雜。下面我們重點(diǎn)分析下它的源碼實(shí)現(xiàn):

NioEventLoop

作為NIO框架的Reactor線程,NioEventLoop需要處理網(wǎng)絡(luò)I/O讀寫(xiě)事件,因此它必須聚合一個(gè)多路復(fù)用器對(duì)象,看下它的selector定義:

Selector selector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;

直接調(diào)用Selector.open()就能創(chuàng)建并打開(kāi)一個(gè)新的Selector。
然后通過(guò)反射對(duì)selectedKeys進(jìn)行優(yōu)化:

try {
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

            Class<?> selectorImplClass =
                    Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());

            // Ensure the current selector implementation is what we can instrument.
            if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
                return selector;
            }

            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

            selectedKeysField.setAccessible(true);
            publicSelectedKeysField.setAccessible(true);

            selectedKeysField.set(selector, selectedKeySet);
            publicSelectedKeysField.set(selector, selectedKeySet);

            selectedKeys = selectedKeySet;
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);

上面代碼,如果開(kāi)啟了selectedKeys優(yōu)化功能,通過(guò)反射的方法從Selector實(shí)例中獲取selectedKeys和publicSelectedKeys將上述兩個(gè)成員變量設(shè)置為可寫(xiě),通過(guò)反射的方式使用Netty構(gòu)造的selectedKeys包裝類selectedKeySet將原JDK的selectedKeys替換掉。默認(rèn)為開(kāi)啟優(yōu)化功能。
下面看下run()方法的實(shí)現(xiàn):

for (;;) {
            oldWakenUp = wakenUp.getAndSet(false);
            try {
                if (hasTasks()) {
                    selectNow();
                } else {
                    select();

Selector的selectNow()方法會(huì)立即觸發(fā)Selector的選擇操作,如果有準(zhǔn)備就緒的Channel,則返回就緒的Channel集合。選擇完成后在此判斷用戶是否調(diào)用了Selector的wakeup方法,如果調(diào)用,執(zhí)行selector.wakeup()操作。下面我們返回到run()方法,繼續(xù)分析代碼。如果消息隊(duì)列中沒(méi)有消息需要處理,則執(zhí)行select()方法,有Selector多路復(fù)用器輪詢,看是否有準(zhǔn)備就緒的channel。它的實(shí)現(xiàn)如下:
取當(dāng)前系統(tǒng)的納秒時(shí)間,調(diào)用delayNanos()方法計(jì)算獲得NioEventLoop中定時(shí)任務(wù)的觸發(fā)時(shí)間。

private void select() throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks()) {
                    // Selected something,
                    // waken up by user, or
                    // the task queue has a pending task.
                    break;
                }

                if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                            selectCnt);

                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

計(jì)算下一個(gè)將要觸發(fā)的定時(shí)任務(wù)的剩余超時(shí)時(shí)間,將它轉(zhuǎn)換為毫秒,為超時(shí)時(shí)間增加0.5毫秒的調(diào)整值。對(duì)剩余的超時(shí)時(shí)間進(jìn)行判斷,如果需要立即執(zhí)行或者已經(jīng)超時(shí),則調(diào)用selector.selectNow()進(jìn)行輪詢操作,將selectCnt設(shè)置為1,并退出當(dāng)前循環(huán)。
將定時(shí)任務(wù)剩余的超時(shí)時(shí)間作為參數(shù)進(jìn)行select操作,沒(méi)完成一次select操作,對(duì)select計(jì)數(shù)器selectCnt加1.
Select操作完成之后,需要對(duì)結(jié)果進(jìn)行判斷,如果存在下列任意一種情況,則退出當(dāng)前循環(huán)。
1.有Channel處于就緒狀態(tài),selectedKeys不為0,說(shuō)明有讀寫(xiě)事件需要處理。
2.oldWakenUp為true;
3.系統(tǒng)或者用戶調(diào)用了wakeup操作,喚醒當(dāng)前的多路復(fù)用器;
4.消息隊(duì)列中有新的任務(wù)需要處理。
如果本次Selector的輪詢結(jié)果為空,也沒(méi)有wakeup操作或者是新的消息需要處理,則說(shuō)明是個(gè)空輪詢,有可能觸發(fā)了JDK的epoll bug,它會(huì)導(dǎo)致Selector的空輪詢,使I/O線程一致處于100%狀態(tài)。這個(gè)問(wèn)題一直到JDK1.8才得到解決。
解決方式就是上面代碼最下面的那層if語(yǔ)句。
(1)對(duì)Selector的select周期進(jìn)行統(tǒng)計(jì);
(2)每完成一次select操作進(jìn)行一次計(jì)數(shù);
(3)當(dāng)select的操作達(dá)到一定次數(shù)后,rebuildSelector()重新輪詢。

 newSelector = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }

        // Register all channels to the new Selector.
        int nChannels = 0;
        for (;;) {
            try {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    try {
                        if (key.channel().keyFor(newSelector) != null) {
                            continue;
                        }

                        int interestOps = key.interestOps();
                        key.cancel();
                        key.channel().register(newSelector, interestOps, a);
                        nChannels ++;
                    } catch (Exception e) {
                        logger.warn("Failed to re-register a Channel to the new Selector.", e);
                        if (a instanceof AbstractNioChannel) {
                            AbstractNioChannel ch = (AbstractNioChannel) a;
                            ch.unsafe().close(ch.unsafe().voidPromise());
                        } else {
                            @SuppressWarnings("unchecked")
                            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                            invokeChannelUnregistered(task, key, e);
                        }
                    }
                }
            } catch (ConcurrentModificationException e) {
                // Probably due to concurrent modification of the key set.
                continue;
            }

            break;
        }

        selector = newSelector;

通過(guò)銷毀舊的、有問(wèn)題的多路復(fù)用器,使用新建的Selector就可以解決空輪詢Selector導(dǎo)致的I/O線程CPU占用100%的問(wèn)題。
如果輪詢到了處于就緒狀態(tài)的SocketChannel,則需要處理網(wǎng)絡(luò)I/O事件,相關(guān)代碼如下:

if (selectedKeys != null) {
                    processSelectedKeysOptimized(selectedKeys.flip());
                } else {
                    processSelectedKeysPlain(selector.selectedKeys());
                }

由于默認(rèn)開(kāi)啟了selectedKeys的優(yōu)化功能,所以會(huì)進(jìn)入processSelectedKeysOptimized分支執(zhí)行。進(jìn)入該方法,如果有需要處理的channel則進(jìn)入processSelectedKey方法中,處理I/O事件,其代碼如下:

final NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
            return;
        }

首先從NioServerSocketChannel或者NioSocketChannel中獲取其內(nèi)部類Unsafe,判斷當(dāng)前選擇鍵是否可用,如果不可用,調(diào)用Unsafe的close()方法,釋放連接資源。
如果選擇鍵可用,則繼續(xù)對(duì)網(wǎng)絡(luò)操作位進(jìn)行判斷,代碼如下:

int readyOps = k.readyOps();
            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
                if (!ch.isOpen()) {
                    // Connection already closed - no need to handle write.
                    return;
                }
            }

如果是讀或者連接操作,則調(diào)用Unsafe的read方法。此處Unsafe的實(shí)現(xiàn)是個(gè)多態(tài),對(duì)于NioServerSocketChannel,它的讀操作就是接收客戶端的TCP連接,相關(guān)代碼如下:

protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }

對(duì)于NioSocketChannel,它的讀操作就是從SocketChannel中讀取ByteBuffer,相關(guān)代碼如下:

    protected int doReadBytes(ByteBuf byteBuf) throws Exception {
        return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
    }

如果網(wǎng)絡(luò)操作位為寫(xiě),則說(shuō)明有半包消息尚未發(fā)送完成,需要繼續(xù)調(diào)用flush方法進(jìn)行發(fā)送,相關(guān)代碼如下:

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

如果網(wǎng)絡(luò)操作位為連接狀態(tài),則需要對(duì)連接結(jié)果進(jìn)行判讀,如下:

if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

需要注意的是,在進(jìn)行finishConnect之前,需要將網(wǎng)絡(luò)操作位進(jìn)行修改,注銷掉SelectionKey.OP_CONNECT。

處理完I/O事件后,NioEventLoop需要執(zhí)行非I/O操作的系統(tǒng)Task和定時(shí)任務(wù),代碼如下:

final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

由于NioEventLoop同時(shí)處理I/O事件和非I/O事件,Netty提供了兩者的比例。
Task的執(zhí)行時(shí)間根據(jù)本次I/O的執(zhí)行時(shí)間得到,方法如下

fetchFromDelayedQueue();
        Runnable task = pollTask();
        if (task == null) {
            return false;
        }

首先從定時(shí)任務(wù)消息隊(duì)列中彈出消息技能型處理,如果消息隊(duì)列為空,則退出循環(huán)。根據(jù)當(dāng)前的時(shí)間戳進(jìn)行判斷,如果該定時(shí)任務(wù)已經(jīng)或者正處于超時(shí)狀態(tài),則將其加入到執(zhí)行TaskQueue中,同時(shí)從延時(shí)隊(duì)列中刪除。定時(shí)任務(wù)如果沒(méi)有超時(shí),說(shuō)明本循環(huán)不需要處理,直接退出即可,如下:

private void fetchFromDelayedQueue() {
        long nanoTime = 0L;
        for (;;) {
            ScheduledFutureTask<?> delayedTask = delayedTaskQueue.peek();
            if (delayedTask == null) {
                break;
            }

            if (nanoTime == 0L) {
                nanoTime = ScheduledFutureTask.nanoTime();
            }

            if (delayedTask.deadlineNanos() <= nanoTime) {
                delayedTaskQueue.remove();
                taskQueue.add(delayedTask);
            } else {
                break;
            }
        }
    }

執(zhí)行Task Queue中原有的任務(wù)和從延時(shí)隊(duì)列中復(fù)制的已經(jīng)超時(shí)或者正處于超時(shí)狀態(tài)的定時(shí)任務(wù),
由于獲取系統(tǒng)納秒時(shí)間是個(gè)耗時(shí)的操作,每次循環(huán)都獲取當(dāng)前系統(tǒng)納秒時(shí)間進(jìn)行超時(shí)判斷會(huì)降低性能。為了提升性能,每執(zhí)行60次循環(huán)判斷一次,如果當(dāng)前系統(tǒng)已經(jīng)到了分配給非I/O操作的超時(shí)時(shí)間,則退出循環(huán)。這是為了防止由于I/O操作過(guò)多導(dǎo)致I/O操作被長(zhǎng)時(shí)間阻塞。

for (;;) {
            try {
                task.run();
            } catch (Throwable t) {
                logger.warn("A task raised an exception.", t);
            }

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

最后判斷系統(tǒng)是否進(jìn)入優(yōu)雅停機(jī)狀態(tài),如果處于關(guān)閉狀態(tài),則需要調(diào)用closeAll方法釋放資源,并讓NioEventLoop線程退出循環(huán),結(jié)束運(yùn)行。關(guān)閉方法就在NioEventLoop的runAllTasks之后,進(jìn)入其中,如下:

private void closeAll() {
        selectAgain();
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                channels.add((AbstractNioChannel) a);
            } else {
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }

        for (AbstractNioChannel ch: channels) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }

遍歷所有的channel,調(diào)用它的Unsafe.close()方法關(guān)閉所有鏈路,釋放線程池,ChannelPipeline和ChannelHandler等資源。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 何為Reactor線程模型? Reactor模式是事件驅(qū)動(dòng)的,有一個(gè)或多個(gè)并發(fā)輸入源,有一個(gè)Service Han...
    未名枯草閱讀 3,765評(píng)論 2 11
  • Netty的簡(jiǎn)單介紹 Netty 是一個(gè) NIO client-server(客戶端服務(wù)器)框架,使用 Netty...
    AI喬治閱讀 8,599評(píng)論 1 101
  • 本文是Netty文集中“Netty 那些事兒”系列的文章。主要結(jié)合在開(kāi)發(fā)實(shí)戰(zhàn)中,我們遇到的一些“奇奇怪怪”的問(wèn)題,...
    tomas家的小撥浪鼓閱讀 15,919評(píng)論 3 35
  • 該文章為轉(zhuǎn)載,原文章請(qǐng)點(diǎn)擊 1. 背景 1.1. Netty 3.X系列版本現(xiàn)狀 根據(jù)對(duì)Netty社區(qū)部分用戶的調(diào)...
    Pramyness閱讀 2,145評(píng)論 1 14
  • 今天許多人在看自己18歲時(shí)候的照片。 翻一翻我18歲那的照片,生日那天覺(jué)得自己已經(jīng)是成年人了,可以拿著身份證光明正...
    方思辰閱讀 677評(píng)論 11 17

友情鏈接更多精彩內(nèi)容