Netty源碼(三)NioEventLoop三部曲

前言

本文將會具體分析NioEventLoop中的thread,它的啟動時機,以及所履行的職責(zé)。還會分析一些netty的實現(xiàn)細(xì)節(jié),比如解決NIO的bug和一些優(yōu)化等。

thread啟動

之前說到NioEventLoop是由一個thread處理I/O事件和提交的任務(wù)。先看一下這個thread啟動的流程。

execute 簡化流程

private void execute(Runnable task, boolean immediate) {
       //是當(dāng)前線程調(diào)用,直接加入隊列
        boolean inEventLoop = inEventLoop();
        addTask(task);
        if (!inEventLoop) {
            //啟動線程
            startThread();
         // ......
        }

        if (!addTaskWakesUp && immediate) {
            wakeup(inEventLoop);
        }
    }

可以看出啟動thread是一個延遲加載的過程,在執(zhí)行第一個任務(wù)的時候才會啟動thread。跟進去看startThread()

    private void startThread() {
      //判斷線程狀態(tài)是否已啟動
        if (state == ST_NOT_STARTED) {
           //CAS設(shè)置線程狀態(tài)為已啟動
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                boolean success = false;
                try {
                  //真正去啟動線程
                    doStartThread();
                    success = true;
                } finally {
                    if (!success) {
                        STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
                    }
                }
            }
        }
    }

doStartThread

private void doStartThread() {
        assert thread == null;
        //調(diào)用傳入?yún)?shù)的executor的execute方法,
        //executor會新建一個線程去執(zhí)行任務(wù)
        executor.execute(new Runnable() {
            @Override
            public void run() {
                //將執(zhí)行該任務(wù)的線程賦值給thread 
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    //執(zhí)行任務(wù)
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                // ......
            }
    }

前文分析了executor為ThreadPerTaskExecutor,執(zhí)行execute方法時候為新建一個線程去執(zhí)行任務(wù),NioEventLoop的thread就是在此時賦值。
thread的啟動流程簡化為,首先thread啟動是一個懶加載的過程,在第一次執(zhí)行任務(wù)才會啟動。在啟動的過程中,會有一個CAS的狀態(tài)判斷當(dāng)前線程是否已經(jīng)被啟動,如果thread沒有啟動,則通過傳入的executor對象去創(chuàng)建thread對象,并執(zhí)行SingleThreadEventExecutor.this.run()這個方法。

下面分析SingleThreadEventExecutor.this.run()這個方法,

    /**
     * Run the tasks in the {@link #taskQueue}
     */
    protected abstract void run();

可以看見是一個抽象方法,然后找到文本分析的NioEventLoop對于run的實現(xiàn),這里做一個將代碼做一個簡化,只有主要流程

    protected void run() {
        int selectCnt = 0;
        for (; ; ) {
            //1、檢測IO事件
            select();
            try {
              //2、處理準(zhǔn)備就緒的IO事件
                processSelectedKeys();
            } finally {
                // 3、執(zhí)行隊列里的任務(wù)
                final long ioTime = System.nanoTime() - ioStartTime;
                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
            }

        }
    }

NioEventLoop的職責(zé)只有三個,1、檢測IO事件 ;2、處理準(zhǔn)備就緒的IO事件;3、執(zhí)行隊列里的任務(wù),用一個死循環(huán)去不斷執(zhí)行這三件事情。如之前畫的圖所示:


run

接下來就著重分析這三個步驟。

select

select步驟的核心是調(diào)用通過NIO中的selector的select()方法,返回selector上所監(jiān)聽到IO事件。

                    case SelectStrategy.SELECT:
                        // 獲取當(dāng)前任務(wù)隊列的延遲時間
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                        if (curDeadlineNanos == -1L) {
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            //當(dāng)前任務(wù)隊列為空,監(jiān)聽IO事件
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }

select方法

    private int select(long deadlineNanos) throws IOException {
        if (deadlineNanos == NONE) {
            return selector.select();
        }
        // Timeout will only be 0 if deadline is within 5 microsecs
        long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
        return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
    }

流程整體比較簡單,如果時間參數(shù)deadlineNanos為NONE,就調(diào)用selector.select()方法,這個方法會一直阻塞直到有IO事件返回。否則再判斷deadlineNanos是否小于等于0,如果是調(diào)用selectNow()會立即返回當(dāng)前selector上準(zhǔn)備就緒的IO事件,否則調(diào)用selector.select(timeoutMillis)方法,會在指定時間內(nèi)返回,不管是否有IO事件發(fā)生。然后跟select()方法,找到實現(xiàn)類io.netty.channel.nio.SelectedSelectionKeySetSelector,

    public int select() throws IOException {
        selectionKeys.reset();
        return delegate.select();
    }

一共有兩步操作,第一步是將之前的selectionKeys清空,檢測到就緒的IO事件都會放入selectionKeys中,這里表示新的一輪IO循環(huán)開始,所以要將之前的清空(selectionKeys后續(xù)會在詳細(xì)介紹)。第二步是調(diào)用NIO中的Selector對象的select(),將最后底層的IO實現(xiàn)委托給它。

processSelectedKeys

processSelectedKeys這一步將會處理監(jiān)測到的IO事件,比如連接、讀寫的IO操作。

    private void processSelectedKeys() {
        if (selectedKeys != null) {
            processSelectedKeysOptimized();
        } else {
            processSelectedKeysPlain(selector.selectedKeys());
        }
    }

這里有個細(xì)節(jié),處理優(yōu)化過后的selectedKeys還是處理原生的selectedKeys。所謂優(yōu)化的selectedKeys就是將原生的selectedKeys的HashSet替換成數(shù)組實現(xiàn),提高空間利用率和遍歷的效率,待會兒會詳細(xì)將到是怎么替換的selectedKeys。

然后跟進去看processSelectedKeysOptimized()的具體實現(xiàn):

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

整體流程就是在遍歷selectedKeys,將綁定在SelectionKey上的Channel取下來,然后做對應(yīng)的IO操作,最后再判斷是否需要重置selectedKeys。下面我會逐步分析里面的細(xì)節(jié)

第一: selectedKeys.keys[i] = null;

將SelectionKey取出之后把數(shù)組這個位置的地方置為null。為什么這么做?https://github.com/netty/netty/issues/2363描述的很清楚,簡單來說就是我們并不會去清空selectedKeys數(shù)組,這就會導(dǎo)致在Channel關(guān)閉之后,依然會保持SelectionKey的強引用。

selectedKeys.jpg

如上圖所示,假如數(shù)組原有長度為2,一次高峰期的IO事件導(dǎo)致數(shù)組擴容到8,之后新的IO事件的數(shù)量又達不到之前數(shù)組的位置,為導(dǎo)致上圖坐標(biāo)[6]、[7]位置會長時間持有已經(jīng)關(guān)閉的Channel的引用,所以這里將其置為null,有助于GC。

第二: processSelectedKey

            
            final Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } 

首先是將SelectionKey綁定的屬性取下來,判斷是否是AbstractNioChannel的類型。這里可以追蹤一下netty是什么時候?qū)bstractNioChannel設(shè)置進去的。在AbstractNioChannel的doRegister方法

  //最后一個參數(shù)就是att
  selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

其Channel注冊到底層jdk的組件中,然后將AbstractNioChannel作為參數(shù)傳遞進去,后續(xù)輪詢出IO事件之后,再將AbstractNioChannel取出做后續(xù)操作。
具體處理IO事件
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
這里貼一點核心流程,主要是判斷當(dāng)前Channel的操作類型,是連接還是讀、寫

           int readyOps = k.readyOps();
            //連接事件
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }
           //寫事件
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }
            //讀事件
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }

這里面的內(nèi)部流程就不具體分析了,大致分為兩個部分bossGroup監(jiān)聽的連接事件,將接受到的Channel轉(zhuǎn)交給workGroup,然后workGroup處理讀寫事件,然后將事件通過ChannelPipeline將事件傳播出去。具體細(xì)節(jié)可以看AbstractNioMessageChannel和AbstractNioByteChannel的read()方法,后續(xù)可能會具體分析這里的代碼。

第三: needsToSelectAgain

最后一個步驟,重新設(shè)置selectedKeys

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);
                selectAgain();
                i = -1;
            }

什么時候需要重新select?找到needsToSelectAgain被設(shè)置為true的地方,只有唯一的一處cancel

    void cancel(SelectionKey key) {
        key.cancel();
        cancelledKeys ++;
        if (cancelledKeys >= CLEANUP_INTERVAL) {
            cancelledKeys = 0;
            needsToSelectAgain = true;
        }
    }

然后看cancel被調(diào)用的地方doDeregister

    protected void doDeregister() throws Exception {
        eventLoop().cancel(selectionKey());
    }

由上面的兩部分代碼分析可以知道,channel的關(guān)閉是通過移除在selector上的注冊實現(xiàn)的,同時會把cancelledKeys加一 。當(dāng)達到了閾值CLEANUP_INTERVAL(默認(rèn)256)后將cancelledKeys重置為0、needsToSelectAgain 為true。
當(dāng)needsToSelectAgain 為true之后,有兩個步驟:
1.selectedKeys清空 -> selectedKeys.reset(i + 1);

    void reset(int start) {
        Arrays.fill(keys, start, size, null);
        size = 0;
    }
  1. 再次填充selectedKeys ->selectAgain
    private void selectAgain() {
        needsToSelectAgain = false;
        try {
            selector.selectNow();
        } catch (Throwable t) {
            logger.warn("Failed to update SelectionKeys.", t);
        }
    }

至于為什么需要重新去填充selectedKeys,可能是需要保持selectedKeys里面的Channel都隨時保持的是活躍的。

processSelectedKeys到這就分析完了,總共分為三步

  1. 遍歷selectedKeys
  2. 處理IO事件
  3. 是否需要重置selectedKeys

ranTasks

現(xiàn)在分析thread的最后一步工作ranTasks,執(zhí)行隊列里的任務(wù)。
1. 任務(wù)類型
NioEventLoop里的任務(wù)類型分為兩部分,一個是由taskQueue(MpscUnboundedArrayQueue)存放普通的任務(wù),還有一個scheduledTaskQueue存放定時任務(wù)的隊列。之前分析過EventLoop繼承自ScheduledExecutorService,所以也需要提供執(zhí)行定時任務(wù)的功能,而這里的定時任務(wù)是通過PriorityQueue來實現(xiàn)的。(定時任務(wù)的實現(xiàn)方式有很多,優(yōu)先隊列只是其中一種)ranTasks執(zhí)行的任務(wù)其實就是兩部分的內(nèi)容,一個是普通隊列中的任務(wù)和定時隊列中的任務(wù)。
2. ioRatio
在分析執(zhí)行細(xì)節(jié)之前,在提一個很重要的參數(shù)ioRatio,代表設(shè)置事件循環(huán)中I/O所需時間的百分比,意思就是在一次循環(huán)中,處理IO事件的時間與處理隊列任務(wù)所占時間做一個百分比的分配,范圍是1到100,當(dāng)設(shè)置為100時,這個參數(shù)就失效了,默認(rèn)參數(shù)為50。下面代碼就是對ioRatio的使用

                //等于100的時候,參數(shù)失效,不再平衡IO事件所占時間的比例
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }
                } else if (strategy > 0) {
                    //開始執(zhí)行IO事件的時間
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // 獲得IO執(zhí)行總共耗時
                        final long ioTime = System.nanoTime() - ioStartTime;
                        //按照ioRatio計算出將花費多少時間執(zhí)行ranTasks 
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }

3. runAllTasks

    protected boolean runAllTasks(long timeoutNanos) {
        //將scheduledTaskQueue隊列中的任務(wù)轉(zhuǎn)移到taskQueue中
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        //任務(wù)為空結(jié)束
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }
        //計算本次執(zhí)行任務(wù)最遲的時間
        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            //執(zhí)行任務(wù)
            safeExecute(task);

            runTasks ++;

            //每執(zhí)行64個任務(wù)之后判斷時間是否超出,若超出結(jié)束循環(huán)
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }
            //沒有任務(wù)結(jié)束循環(huán)
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

主要流程為

  1. scheduledTaskQueue隊列中的任務(wù)轉(zhuǎn)移到taskQueue中;
  2. 安全的執(zhí)行任務(wù)(其實就是將任務(wù)try catch,以免任務(wù)執(zhí)行發(fā)生異常,影響其他任務(wù)執(zhí)行);
    protected static void safeExecute(Runnable task) {
        try {
            task.run();
        } catch (Throwable t) {
            logger.warn("A task raised an exception. Task: {}", task, t);
        }
    }
  1. 每執(zhí)行64個任務(wù)之后判斷執(zhí)行時間是否超出deadline,這里采用64個任務(wù)為一個批次,沒有每次任務(wù)執(zhí)行去判斷,也是對性能的一個優(yōu)化;
  2. 執(zhí)行afterRunningAllTasks方法,其實就是執(zhí)行tailTasks隊列中的任務(wù),然后記錄一下最后的執(zhí)行時間this.lastExecutionTime = lastExecutionTime;

一些細(xì)節(jié)

selectedKeySet

前面提到過netty將NIO中Selector的selectedKeys替換,這里分析一下為什么需要替換和么去替換的selectedKeys。

  1. 為什么替換

NIO原生的selectedKeys使用的是HashSet,而NioEventLoop將其替換成了SelectedSelectionKeySet

//SelectorImpl
protected Set<SelectionKey> selectedKeys = new HashSet();
//NioEventLoop
private SelectedSelectionKeySet selectedKeys;

SelectedSelectionKeySet構(gòu)造函數(shù)

    SelectedSelectionKeySet() {
        keys = new SelectionKey[1024];
    }

SelectedSelectionKeySet使用的是數(shù)組存儲元素,而HashSet是基于HashMap去存儲數(shù)據(jù),采用數(shù)組使得空間利用率和遍歷的效率有所提高。

2.怎么替換

要在運行時替換掉類的屬性,很明顯是通過反射來做到的。

  • 獲取sun.nio.ch.SelectorImpl Class對象
        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });
  • 創(chuàng)建selectedKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
  • 設(shè)置屬性
    //獲取屬性
    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
    ......
    //將selectedKeySet設(shè)置到屬性中
    selectedKeysField.set(unwrappedSelector, selectedKeySet);
    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);

NIO空輪詢bug

NIO有一個很出名的bug就是epoll空輪詢的bug,這會導(dǎo)致CPU占有率到100%,java也并沒有修復(fù)這個bug,netty采用了一個很巧妙的方法來繞過這個bug。
主要思想就是,通過檢測發(fā)生空輪詢的次數(shù),當(dāng)超過一定的閾值之后,netty將會重新創(chuàng)建一個selector,并將之前selector上的channel轉(zhuǎn)移到新的selector上。通過重新創(chuàng)建selector的方式來解決NIO空輪詢的bug。

unexpectedSelectorWakeup

        //空輪詢的次數(shù)超過閾值,默認(rèn)為512
        if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
            // The selector returned prematurely many times in a row.
            // Rebuild the selector to work around the problem.
            //重新構(gòu)建selector
            rebuildSelector();
            return true;
        }

跟進去找到具體的實現(xiàn)方法rebuildSelector0

        final Selector oldSelector = selector;
        final SelectorTuple newSelectorTuple;
        try {
            //創(chuàng)建新的selector
            newSelectorTuple = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }
        // Register all channels to the new Selector.
        int nChannels = 0;
        for (SelectionKey key: oldSelector.keys()) {
        //將舊的selector上的channel全部注冊到新的selector上
        }
        //賦值
        selector = newSelectorTuple.selector;
        unwrappedSelector = newSelectorTuple.unwrappedSelector;
        try {
            // 關(guān)閉舊的selector
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }

總結(jié)

本文分析NioEventLoop中所對應(yīng)的唯一的thread,啟動是一個懶加載的過程,當(dāng)?shù)谝淮稳蝿?wù)執(zhí)行的時候才會初始化。后續(xù)thread開始循環(huán)處理三件事件

  1. 檢測IO事件 ;
  2. 處理準(zhǔn)備就緒的IO事件;
  3. 執(zhí)行隊列里的任務(wù)

本文也對具體的代碼進行了分析,還有一些netty對NIO的優(yōu)化和bug處理,當(dāng)然netty的精妙之處遠不止本文分析的這些,更多的還需要自己去探索和學(xué)習(xí)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容