netty極簡教程(七):Netty中的Selector是如何配合NioEventLoop工作的

上一節(jié)我們找到了ServerSocketChannel的生成,注冊Selector,綁定端口啟動等等:netty極簡教程(六):Netty是如何屏蔽ServerSocketChannel啟動的,
接下來接續(xù)驗(yàn)證在Netty中Selector的生成使用以及我們jdk 原生工作線程再netty中是怎么啟動工作的: NioEventLoopGroup


示例源碼: https://github.com/jsbintask22/netty-learning

NioEventLoopGroup

image

還記得我們前面粗談的boss以及work線程嗎,它是一個線程池,既然它是一個線程池,那它內(nèi)部肯定有成員變量用來訪問它所持有的線程,就在NioEventLoopGroup的構(gòu)造函數(shù)中:
image

從這里我們就知道這個線程池內(nèi)部有一個SelectProvider,接著繼續(xù)往下面走:
image

我們知道這里的executor肯定是null的,所以它默認(rèn)構(gòu)造了一個ThreadPerTaskExecutor
image

這里值得注意的是,它沒有做其他任何操作,直接就是new了一個線程并且start了,所以這個使用這個executor提交的任務(wù)它直接就是使用線程并且直接啟動了;


接著繼續(xù)往下面走,它將children全部實(shí)例化并且該children是一個NioEventLoop實(shí)例數(shù)組(將上面的executor丟了進(jìn)去);


image

所以現(xiàn)在關(guān)鍵地方在于,這個NioEventLoop是什么時候往ThreadPerTaskExecutor丟了一個任務(wù),我們繼續(xù)追蹤它NioEventLoop

image

觀察它的構(gòu)造方法: 這里指的注意的是,我們在NioEventLoop的構(gòu)造方法中發(fā)現(xiàn)了Selector已經(jīng)生成,
image

也就是說:children數(shù)組多大就有多少個線程就有多少個Selector,到這里,我們第5節(jié)用jdk寫的reactor子線程都對應(yīng)了一個selector在這里得以驗(yàn)證;


我們回到上一節(jié)解析的registerAndInit()方法

image

最后有一個register的步驟,我們上一節(jié)說在這個方法中注冊ServerChannel到Selector,其實(shí)除此之外,還有另外的步驟:
image

由于剛啟動時是在main線程中,所以當(dāng)前線程不等于boss線程:

public boolean inEventLoop(Thread thread) {
    return thread == this.thread;
}

即先走下面的

eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        register0(promise);
    }
});

最終使用executor提交開啟了一個新線程(忘記的回憶一下ThreadPerTaskExecutor):

image

最終調(diào)用NioEventLoop.run()方法,開啟Selector的無限select操作:
image

select到準(zhǔn)備好的事件或者任務(wù)隊(duì)列中有任務(wù)時(我們一開始的Channel注冊就添加到了任務(wù)隊(duì)列),開始執(zhí)行processSelectedKeys處理事件:
image

和我們寫的原生jdk一樣,開始遍歷selectedKey,并且根據(jù)不同的事件類型在processSelectedKey中處理:
image

所以,如果我們進(jìn)入的accept事件,說明channel是ServerSocketChannel,則執(zhí)行NioMessageUnsafe.read ()方法,接著調(diào)用NioServerSocketChannel的doReadMessages從而接受一個新連接:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

最后會觸發(fā)SererSocketChannel中pipeline中的read方法,此時我們在上一節(jié)已經(jīng)埋好伏筆,就是ServerBootstrapAcceptor的read會觸發(fā),從而將新接受的連接SocketChannel再次注冊到selector,并且work子線程也會開始無限循環(huán)并進(jìn)行上面的操作:

image

如圖,上面的ch.eventLoop().execute我們已經(jīng)說過會直接開啟一個新線程并且執(zhí)行接著再初始化我們的子Socket應(yīng)該初始化的各種Handler(具體示什么后面詳解); 這樣,work線程也會就會開始工作。所有對應(yīng)原生JDK的啟動操作步驟就全部找出;

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容