Java進(jìn)階-Netty-進(jìn)階

一、Reactor線程

??源碼基于4.1.6.Final版本。

1.1 Reactor線程啟動(dòng)

??NioEventLoop的run方法是reactor線程的主體,在第一次添加任務(wù)的時(shí)候被啟動(dòng)。

  • 入口:NioEventLoop父類SingleThreadEventExecutor的execute方法
@Override
public void execute(Runnable task) {
    ...
    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        ...
    }
    ...
}
  • netty的reactor線程在添加一個(gè)任務(wù)的時(shí)候被創(chuàng)建,該線程實(shí)體為FastThreadLocalThread。
  • 最后線程執(zhí)行主體為NioEventLoop的run方法。

1.2 Reactor線程執(zhí)行

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                default:
                    // fallthrough
            }
            processSelectedKeys();
            runAllTasks(...);
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        ...
    }
image.png
  • 輪詢注冊到reactor線程對應(yīng)的selector上的所有的channel的IO事件
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
      selector.wakeup();
}

定時(shí)任務(wù)截止事時(shí)間快到了,中斷本次輪詢
輪詢過程中發(fā)現(xiàn)有任務(wù)加入,中斷本次輪詢
阻塞式select操作

??netty會(huì)在每次進(jìn)行selector.select(timeoutMillis)之前記錄一下開始時(shí)間currentTimeNanos,在select之后記錄一下結(jié)束時(shí)間,判斷select操作是否至少持續(xù)了timeoutMillis秒,如果持續(xù)的時(shí)間大于等于timeoutMillis,說明就是一次有效的輪詢,重置selectCnt標(biāo)志,否則,表明該阻塞方法并沒有阻塞這么長時(shí)間,可能觸發(fā)了jdk的空輪詢bug,當(dāng)空輪詢的次數(shù)超過一個(gè)閥值的時(shí)候,默認(rèn)是512,就開始重建selector。

  • 處理產(chǎn)生網(wǎng)絡(luò)IO事件的channel
processSelectedKeys();

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized(selectedKeys.flip());
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

取出IO事件以及對應(yīng)的netty channel類
處理該channel
判斷是否該再來次輪詢

對于boss NioEventLoop來說,輪詢到的是基本上就是連接事件,后續(xù)的事情就通過他的pipeline將連接扔給一個(gè)worker NioEventLoop處理
對于worker NioEventLoop來說,輪詢到的基本上都是io讀寫事件,后續(xù)的事情就是通過他的pipeline將讀取到的字節(jié)流傳遞給每個(gè)channelHandler來處理SelectedSelectionKeySet

??netty使用數(shù)組替換掉jdk原生的HashSet來保證IO事件的高效處理,每個(gè)SelectionKey上綁定了netty類AbstractChannel對象作為attachment,在處理每個(gè)SelectionKey的時(shí)候,就可以找到AbstractChannel,然后通過pipeline的方式將處理串行到ChannelHandler,回調(diào)到用戶方法。

  • 處理任務(wù)隊(duì)列

用戶自定義普通任務(wù)
非當(dāng)前reactor線程調(diào)用channel的各種方法
用戶自定義定時(shí)任務(wù):1)若干時(shí)間后執(zhí)行一次 2)每隔一段時(shí)間執(zhí)行一次 3)每次執(zhí)行結(jié)束,隔一定時(shí)間再執(zhí)行一次

??taskQueue在NioEventLoop中默認(rèn)是mpsc隊(duì)列,mpsc隊(duì)列,即多生產(chǎn)者單消費(fèi)者隊(duì)列,netty使用mpsc,方便的將外部線程的task聚集,在reactor線程內(nèi)部用單線程來串行執(zhí)行。

??reactor線程task調(diào)度:

  • 從scheduledTaskQueue轉(zhuǎn)移到期的定時(shí)任務(wù)到taskQueue(mpsc queue)
  • 計(jì)算本次任務(wù)循環(huán)的截止時(shí)間
  • 執(zhí)行任務(wù)
  • 收尾

當(dāng)前reactor線程調(diào)用當(dāng)前eventLoop執(zhí)行任務(wù),直接執(zhí)行,否則,添加到任務(wù)隊(duì)列稍后執(zhí)行
netty內(nèi)部的任務(wù)分為普通任務(wù)和定時(shí)任務(wù),分別落地到MpscQueue和PriorityQueue
netty每次執(zhí)行任務(wù)循環(huán)之前,會(huì)將已經(jīng)到期的定時(shí)任務(wù)從PriorityQueue轉(zhuǎn)移到MpscQueue
netty每隔64個(gè)任務(wù)檢查一下是否該退出任務(wù)循環(huán)

二、服務(wù)端啟動(dòng)

b.bind(8888).sync();

public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
} 

public ChannelFuture bind(SocketAddress localAddress) {
    validate();
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    return doBind(localAddress);
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    //...
    final ChannelFuture regFuture = initAndRegister();
    //...
    final Channel channel = regFuture.channel();
    //...
    doBind0(regFuture, channel, localAddress, promise);
    //...
    return promise;
}
  • new一個(gè)channel
image.png

用戶調(diào)用方法Bootstrap.bind(port)第一步就是通過反射的方式new一個(gè)NioServerSocketChannel對象,并且在new的過程中創(chuàng)建了一系列的核心組件。

  • init這個(gè)channel

設(shè)置option和attr
設(shè)置新接入channel的option和attr
加入新連接處理器

  • 將這個(gè)channel register到某個(gè)對象

1)設(shè)置啟動(dòng)類參數(shù),最重要的就是設(shè)置channel創(chuàng)建server對應(yīng)的channel,創(chuàng)建各大組件,包括ChannelConfig,ChannelId,ChannelPipeline,ChannelHandler,Unsafe等
2)初始化server對應(yīng)的channel,設(shè)置一些attr,option,以及設(shè)置子channel的attr,option,給server的channel添加新channel接入器,并觸發(fā)addHandler,register等事件
3)調(diào)用jdk底層做端口綁定,并觸發(fā)active事件,active觸發(fā)的時(shí)候,真正做服務(wù)端口綁定

三、新連接接入

所有的channel底層都會(huì)有一個(gè)與unsafe綁定,每種類型的channel實(shí)際的操作都由unsafe來實(shí)現(xiàn)

image.png

??流水線的開始就是HeadContxt,流水線的結(jié)束就是TailConext,HeadContxt中調(diào)用Unsafe做具體的操作,TailConext中用于向用戶拋出pipeline中未處理異常以及對未處理消息的警告。

  • 檢測到有新連接進(jìn)入
  • 將新的連接注冊到worker線程組
  • 注冊新連接的讀事件
//NioEventLoop.java
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    int readyOps = k.readyOps();
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
    }
}
  • boos reactor線程輪詢到有新的連接進(jìn)入
  • 通過封裝jdk底層的channel創(chuàng)建NioSocketChannel以及一系列的netty核心組件
  • 將該條連接通過chooser,選擇一條worker reactor線程綁定上去
  • 注冊讀事件,開始新連接的讀寫

四、pipeline

4.1 pipeline 初始化

??pipeline是channel其中的一員,在AbstractChannel中被創(chuàng)建

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

??pipeline中保存了channel的引用,默認(rèn)情況下,一條pipeline會(huì)有兩個(gè)節(jié)點(diǎn),head和tail。

image.png

4.2 pipeline添加節(jié)點(diǎn)

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});
image.png
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.檢查是否有重復(fù)handler
        checkMultiplicity(handler);
        // 2.創(chuàng)建節(jié)點(diǎn)
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加節(jié)點(diǎn)
        addLast0(newCtx);
    }
   
    // 4.回調(diào)用戶方法
    callHandlerAdded0(handler);
    
    return this;
}

netty中用兩個(gè)字段來表示這個(gè)channelHandlerContext屬于inBound還是outBound,或者兩者都是。

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}
image.png
  • 檢查是否有重復(fù)handler
  • 創(chuàng)建節(jié)點(diǎn)
  • 添加節(jié)點(diǎn)
  • 回調(diào)用戶方法
private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}
image.png

4.3 pipeline刪除節(jié)點(diǎn)

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}
  • 找到待刪除的節(jié)點(diǎn)
  • 調(diào)整雙向鏈表指針刪除
  • 回調(diào)用戶函數(shù)
image.png

4.4 pipeline其他

  • 一個(gè)Channel對應(yīng)一個(gè)Unsafe,Unsafe處理底層操作,NioServerSocketChannel對應(yīng)NioMessageUnsafe, NioSocketChannel對應(yīng)NioByteUnsafe
  • inBound事件從head節(jié)點(diǎn)傳播到tail節(jié)點(diǎn),outBound事件從tail節(jié)點(diǎn)傳播到head節(jié)點(diǎn)
  • 異常傳播只會(huì)往后傳播,而且不分inbound還是outbound節(jié)點(diǎn),不像outBound事件一樣會(huì)往前傳播

五、writeAndFlush

  • pipeline中的標(biāo)準(zhǔn)鏈表結(jié)構(gòu)
image.png
  • Encoder節(jié)點(diǎn)分配一個(gè)ByteBuf,調(diào)用encode方法,將java對象根據(jù)自定義協(xié)議寫入到ByteBuf,然后再把ByteBuf傳入到下一個(gè)節(jié)點(diǎn)。
  • pipeline中的編碼器原理是創(chuàng)建一個(gè)ByteBuf,將java對象轉(zhuǎn)換為ByteBuf,然后再把ByteBuf繼續(xù)向前傳遞
  • 調(diào)用write方法并沒有將數(shù)據(jù)寫到Socket緩沖區(qū)中,而是寫到了一個(gè)單向鏈表的數(shù)據(jù)結(jié)構(gòu)中,flush才是真正的寫出
  • writeAndFlush等價(jià)于先將數(shù)據(jù)寫到netty的緩沖區(qū),再將netty緩沖區(qū)中的數(shù)據(jù)寫到Socket緩沖區(qū)中,寫的過程與并發(fā)編程類似,用自旋鎖保證寫成功
  • netty中的緩沖區(qū)中的ByteBuf為DirectByteBuf

六、拆包器

  • 不斷從TCP緩沖區(qū)中讀取數(shù)據(jù),每次讀取完都需要判斷是否是一個(gè)完整的數(shù)據(jù)包
  • 如果當(dāng)前讀取的數(shù)據(jù)不足以拼接成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,那就保留該數(shù)據(jù),繼續(xù)從tcp緩沖區(qū)中讀取,直到得到一個(gè)完整的數(shù)據(jù)包
  • 如果當(dāng)前讀到的數(shù)據(jù)加上已經(jīng)讀取的數(shù)據(jù)足夠拼接成一個(gè)數(shù)據(jù)包,那就將已經(jīng)讀取的數(shù)據(jù)拼接上本次讀取的數(shù)據(jù),夠成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包傳遞到業(yè)務(wù)邏輯,多余的數(shù)據(jù)仍然保留,以便和下次讀到的數(shù)據(jù)嘗試拼接

netty中的拆包內(nèi)部會(huì)有一個(gè)累加器,每次讀取到數(shù)據(jù)都會(huì)不斷累加,然后嘗試對累加到的數(shù)據(jù)進(jìn)行拆包,拆成一個(gè)完整的業(yè)務(wù)數(shù)據(jù)包,這個(gè)基類叫做ByteToMessageDecoder

netty將具體如何拆包抽象出一個(gè)decode方法,不同的拆包器實(shí)現(xiàn)不同的decode方法,就能實(shí)現(xiàn)不同協(xié)議的拆包

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容