Netty NioEventLoop源碼解讀

Netty NioEventLoop

Reactor 模型

Netty實(shí)現(xiàn)并擴(kuò)展了Reactor模型,為了更好的了解EventLoop,我們有必要先看一下Reactor模型的定義。

Reactor.png

在wiki對reactor pattern的定義中,指出了一下集中角色:

  • Resource:資源指的是提供系統(tǒng)輸入或者消費(fèi)系統(tǒng)輸出的資源。在Netty中它指的是SocketChannel,它們應(yīng)支持select。
  • Demultiplexer:事件分離器負(fù)責(zé)對資源進(jìn)行輪尋等待,當(dāng)資源ready的時(shí)候,分離器負(fù)責(zé)將數(shù)據(jù)發(fā)送給Dispatcher。
  • Dispatcher:處理Handler的注冊和反注冊。當(dāng)資源到達(dá)時(shí)負(fù)責(zé)把資源分發(fā)到相應(yīng)的Handler中。
  • Handler:負(fù)責(zé)處理數(shù)據(jù)。

在Netty中EventLoop兼負(fù)了Demultiplexer以及Dispatcher兩個(gè)角色。下邊我們通過來看NioEventLoop的源碼學(xué)習(xí)學(xué)習(xí)并了解Netty中的EventLoop。

EventLoop源碼

NioEventLoop的核心方法是run()方法,一旦Netty程序啟動(dòng)之后,這個(gè)就一直循環(huán)跑下去,不間斷的查詢IO和處理task。

protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));

                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                        // fallthrough
                }
        }
        ///
}

關(guān)于selectionStrategy

首先我們來看第一個(gè)邏輯Select Strategy。這段邏輯主要控制這次循環(huán)是執(zhí)行:跳過;select操作;還是fall through。判斷依據(jù)是這樣的:

public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

如果當(dāng)前EventLoop中有未處理的task,則執(zhí)行selectorNowSupplier。selectorNowSupplier調(diào)用了selectNow。selectNow調(diào)用的是Selector的selectNow這個(gè)非阻塞方法。執(zhí)行完selectNow則跳出switch運(yùn)行下邊的processSelectedKeys邏輯。

為了高效的利用CPU,EventLoop中只要有未消費(fèi)的task則優(yōu)先消費(fèi)task。

Nio中Selector.select()是阻塞的,直到某個(gè)selection key可用select方法才會(huì)返回。Selector.selectNow()則檢查自從上次select到現(xiàn)在有沒有可用的selection key,然后立即返回。

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();
    }
};

int selectNow() throws IOException {
    try {
        return selector.selectNow();
    } finally {
        // restore wakeup state if needed
        if (wakenUp.get()) {
            selector.wakeup();
        }
    }
}

select操作

select操作主要是檢查當(dāng)前的selection key,看哪些已a(bǔ)vailable。

上邊我們說到了Selector.select操作是阻塞的,那么如果我不想等了,可以中斷它嗎?可以,Selector.wakeup可以喚醒正在阻塞的select()操作。但是如果當(dāng)前沒有select操作,執(zhí)行了wakeUp操作,那么下次執(zhí)行的select()或者selectNow()操作將被立即喚醒。

但是Selector.wakeup是開銷比較大的操作,不能每次都直接調(diào)用wakeup,于是NioEventLoop中聲明了wakenUp(AtomicBoolean)字段,用于控制selector.wakeup()的調(diào)用。調(diào)用wakeup之前先wakenUp.compareAndSet(false, true),如果set成功才執(zhí)行Selector.wakeup()操作。

當(dāng)用戶提交新的任務(wù)時(shí)executor.execute(...),會(huì)觸發(fā)wakeup操作。

select(wakenUp.getAndSet(false));

if (wakenUp.get()) {
    selector.wakeup();
}

這段代碼有一段非常長的注釋,解釋了為什么這段邏輯這樣實(shí)現(xiàn)。并且給出了什么情況下會(huì)產(chǎn)生競態(tài)條件:

wakenUp.set(false)
selector.select(...)

wakenUp.set(false)執(zhí)行后,用戶出發(fā)了wakeup操作,然后執(zhí)行select操作,這時(shí)select將立即返回。直到下次循環(huán)把wakenUp重置為false,期間所有的wakenUp.compareAndSet(false, true)都是執(zhí)行失敗的,因?yàn)楝F(xiàn)在wakenUp的值是true。所以接下來的select()都不能被wakeup。

select 內(nèi)部邏輯

接下來我們看select是如何實(shí)現(xiàn)的:

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) { // 1
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }

                if (hasTasks() && wakenUp.compareAndSet(false, true)) { // 2
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // 3
                    break;
                }

                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
                            selectCnt, selector);

                    // 重建Selector,舊的Selector中的Selection Key要拷貝到新的Selector中
                    rebuildSelector();
                    selector = this.selector;

                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }
        ///    
    }

selectCnt標(biāo)記select執(zhí)行的次數(shù),用于檢測NIO的epoll bug。在這個(gè)方法尾部有一個(gè)判斷:

 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {}

判斷select記次是否超過了伐值,如果是的話有可能觸發(fā)了Nio epoll bug,執(zhí)行重建selector的邏輯:新建一個(gè)Selector,把原來老的selection key都復(fù)制過去。重建完成之后再執(zhí)行一次selectNow。

因?yàn)閟elect操作是阻塞的,如果長時(shí)間沒有IO可用,就會(huì)造成NioEventLoop中的task積壓。因此每次執(zhí)行select操作都設(shè)定一個(gè)超時(shí):
1.查詢定時(shí)任務(wù)重最近要被執(zhí)行的task還有多長時(shí)間執(zhí)行.
2.這個(gè)時(shí)間加上0.5ms就是最大超時(shí)時(shí)間。

long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;

整體來看一下這個(gè)for循環(huán):

  • 第1個(gè)if:如果timeoutMillis小于0,則立即執(zhí)行一次異步的selectNow,跳出循環(huán)消費(fèi)task。
  • 第2個(gè)if:如果當(dāng)前taskQueue中有task,并且沒有被wakeup,則執(zhí)行一次異步的selectNow,跳出循環(huán)消費(fèi)task。
  • 接下來執(zhí)行select,并記次。
  • 第3個(gè)if:如果有available keys 或者 被用戶喚醒 或者 任務(wù)隊(duì)列定時(shí)隊(duì)列有任務(wù)則中斷。
  • 最后就是重建selector的過程。

processSelectedKeys

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
    try {
        processSelectedKeys();
    } finally {
        runAllTasks();
    }
} else {
    final long ioStartTime = System.nanoTime();
    try {
        processSelectedKeys();
    } finally {
        final long ioTime = System.nanoTime() - ioStartTime;
        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
    }
}

NioEventLoop.run方法的后半段邏輯主要是processSelectedKeys(處理IO)和runTasks(消費(fèi)任務(wù))。這里有一個(gè)參數(shù)用于控制處理這兩種任務(wù)的時(shí)間配比:ioRatio。

先來看一下processSelectedKeys,它的邏輯由processSelectedKeysOptimized和processSelectedKeysPlain實(shí)現(xiàn),調(diào)用那個(gè)函數(shù)取決于你是否開啟了DISABLE_KEYSET_OPTIMIZATION。如果開啟了Selection 優(yōu)化選項(xiàng),則在創(chuàng)建Selector的時(shí)候以反射的方式把SelectedSelectionKeySet selectedKeys設(shè)置到selector中。具體實(shí)現(xiàn)在openSelector中,代碼就不貼出來了。SelectedSelectionKeySet內(nèi)部是基于Array實(shí)現(xiàn)的,而Selector內(nèi)部selectedKeys是Set類型的,遍歷效率Array效率更好一下。

我們來分析processSelectedKeysPlain方法:

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    if (selectedKeys.isEmpty()) {
        return;
    }

    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        // 處理channel中的數(shù)據(jù)
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}

SelectionKey上邊可以掛載Attachment,一般情況下新的鏈接對象Channel會(huì)掛到attachment上。我們在遍歷selectedKeys時(shí),首先取出selection key上的attachment,key的類型可能是AbstractNioChannel和NioTask。根據(jù)不同的類型調(diào)用不同的處理函數(shù)。我們著重看處理channel的邏輯:

1.如果selection key是:SelectionKey.OP_CONNECT,那表明這是一個(gè)鏈接操作。對于鏈接操作,我們需要把這個(gè)selection key從intrestOps中清除掉,否則下次select操作會(huì)直接返回。接下來調(diào)用finishConnect方法。

2.如果selection key是:SelectionKey.OP_WRITE。則執(zhí)行flush操作,把數(shù)據(jù)刷到客戶端。

3.如果是read操作則調(diào)用unsafe.read()。這個(gè)操作就不展開了,等到接下來的文章,專門分析read操作。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

整體來看NioEventLoop的實(shí)現(xiàn)也不復(fù)雜,主要就干了兩件事情:select IO以及消費(fèi)task。因?yàn)閟elect操作是阻塞的(盡管設(shè)置了超時(shí)時(shí)間),每次執(zhí)行select時(shí)都會(huì)檢查是否有新的task,有則優(yōu)先執(zhí)行task。這么做也是做大限度的提高EventLoop的吞吐量,減少阻塞時(shí)間。

除了這兩件事兒呢,NioEventLoop還解決了JDK中注明的EPoll bug。到此NioEventLoop源碼分析完結(jié)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容