Netty源碼(二)線(xiàn)程模型EventLoop

前言

Netty源碼(一)Netty架構(gòu)解析分析了netty的基本原理和工作流程,其中EventLoop是netty中最核心的組件,本文將會(huì)分析EventLoop的主要設(shè)計(jì)思想,至于具體的代碼細(xì)節(jié)下一篇會(huì)具體分析。本文將主要分析NIOEventLoop基于4.1.49.Final版本。

核心思想

  1. 單線(xiàn)程
    之前介紹過(guò)EventLoop在它的生命周期內(nèi)只和一個(gè) Thread 綁定,一個(gè)EventLoop管理一個(gè)或者多個(gè)Channel。為什么是單線(xiàn)程?首先多線(xiàn)程編程是很難很麻煩的事情,而且為了保證線(xiàn)程安全加鎖或者CAS都會(huì)影響執(zhí)行效率,某些場(chǎng)景下線(xiàn)程數(shù)量越多導(dǎo)致的效率還會(huì)更低。采用單線(xiàn)程模型第一不用考慮線(xiàn)程安全,第二沒(méi)有線(xiàn)程上下文切換帶來(lái)的損耗。
  2. 執(zhí)行流程
    與EventLoop所綁定的Channel的事件都由EventLoop所擁有的Thread處理,實(shí)現(xiàn)的核心流程如下:
        //當(dāng)前調(diào)用線(xiàn)程是否EventLoop所屬的Thread
        if (executor.inEventLoop()) {
            //do something
         } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                     //do something
                }
            });

如果當(dāng)前調(diào)用線(xiàn)程正是支撐 EventLoop 的線(xiàn)程,那么所提交的任務(wù)會(huì)直接執(zhí)行。否則,EventLoop 將調(diào)度該任務(wù)以便稍后執(zhí)行,并將它放入到內(nèi)部隊(duì)列中。EventLoop下次處理它的事件時(shí),它會(huì)執(zhí)行隊(duì)列中的那些任務(wù)。這樣的任務(wù)執(zhí)行流程保證,Channel上的所有任務(wù)都是由它所注冊(cè)的EventLoop所擁有的Thread執(zhí)行,不會(huì)出現(xiàn)并發(fā)問(wèn)題。

構(gòu)造參數(shù)

NioEventLoop構(gòu)造參數(shù):

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
                 EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
                rejectedExecutionHandler);
        this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
        this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
        final SelectorTuple selectorTuple = openSelector();
        this.selector = selectorTuple.selector;
        this.unwrappedSelector = selectorTuple.unwrappedSelector;
    }
  • parent:當(dāng)前EventLoop所屬的NioEventLoopGroup;
  • executor:為EventLoop創(chuàng)建線(xiàn)程,并且執(zhí)行管理該線(xiàn)程生命周期的任務(wù),默認(rèn)為T(mén)hreadPerTaskExecutor;
  • selectorProvider:NIO中selector的幫助類(lèi);
  • selectStrategy:對(duì)selector.select()提供選擇策略;
  • queueFactory:創(chuàng)建存放任務(wù)隊(duì)列的工廠(chǎng)。

從構(gòu)造參數(shù)可以大概知道NioEventLoop的職責(zé),前面提過(guò)一個(gè)NioEventLoopGroup管理一個(gè)或者多個(gè)NioEventLoop,parent指向NioEventLoop所屬的NioEventLoopGroup。
NioEventLoop的Thread由executor去創(chuàng)建管理,這個(gè)thread只做三件事情:

  1. select: 返回EventLoop所持有的selector上已經(jīng)準(zhǔn)備就緒的Channel;
  2. processSelectedKeys: 處理準(zhǔn)備就緒的IO操作;
  3. ranTasks: 執(zhí)行隊(duì)列里的任務(wù);

由selectorProvider提供IO操作,queueFactory創(chuàng)建隊(duì)列存放待執(zhí)行的任務(wù)。

Executor

EventLoop類(lèi)圖

可見(jiàn)EventLoop繼承自Executor,Executor的核心思想是將任務(wù)調(diào)用,和任務(wù)自身的執(zhí)行邏輯給分離開(kāi),使用Executor去執(zhí)行任務(wù),調(diào)用方只需要關(guān)注自身任務(wù)的邏輯,把任務(wù)執(zhí)行交給Executor去處理,將職責(zé)分離開(kāi)。
JDK中自帶的線(xiàn)程池采用池化的思想,基本思路是從池中取一個(gè)空閑的線(xiàn)程去執(zhí)行任務(wù),執(zhí)行完之后將線(xiàn)程返回空閑列表,讓其可以重復(fù)使用。因?yàn)榫€(xiàn)程是很珍貴的資源,采用池化的技術(shù)可以減少線(xiàn)程創(chuàng)建、銷(xiāo)毀的損耗,但不能減少線(xiàn)程上下文切換所帶來(lái)的消耗,隨著線(xiàn)程數(shù)量的增加這種消耗會(huì)更多。所以EventLoop采用單線(xiàn)程的模型去實(shí)現(xiàn)Executor避免多線(xiàn)程的上下文切換,和多線(xiàn)程的并發(fā)編程問(wèn)題。

Feature

Netty中所有操作都是異步進(jìn)行的,實(shí)現(xiàn)異步編程很關(guān)鍵的對(duì)象就是Feature,因?yàn)楫惒讲僮鞫际橇⒓捶祷氐牟粫?huì)等待任務(wù)完成,所以需要有一種機(jī)制去表示異步執(zhí)行的結(jié)果 ,這種機(jī)制就是Feature。
JDK Feature

public interface Future<V> {
    //取消任務(wù)
    boolean cancel(boolean mayInterruptIfRunning);
    //任務(wù)是否取消
    boolean isCancelled();
    //任務(wù)是否執(zhí)行完成
    boolean isDone();
    //阻塞獲取執(zhí)行結(jié)果
    V get() throws InterruptedException, ExecutionException;
    //指定時(shí)間內(nèi)獲取執(zhí)行結(jié)果
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以看出jdk的Feature接口提供了對(duì)異步執(zhí)行結(jié)果的幾個(gè)操作,取消、獲取結(jié)果、判斷是否完成等。但是這有個(gè)缺點(diǎn)就是調(diào)用方其實(shí)不知道任務(wù)什么時(shí)候結(jié)束,沒(méi)有一種機(jī)制去通知,只有自己不斷的去檢測(cè)或者阻塞等待執(zhí)行完成,這都不是很好的方法。所以netty自己實(shí)現(xiàn)了一套Feature,相比jdk的Feature對(duì)重要的就是提供了一種通知機(jī)制。

Netty Feature

public interface Future<V> extends java.util.concurrent.Future<V> {
    //省略其他方法
    ......
    //添加監(jiān)聽(tīng)器
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);

    //添加多個(gè)監(jiān)聽(tīng)器
    Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

     //移除監(jiān)聽(tīng)器
    Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);

   //移除多個(gè)監(jiān)聽(tīng)器
    Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
}

Netty的Feature與jdk自帶的Feature,最大的不同就是Listener。使用是一種觀(guān)察者模式,將執(zhí)行結(jié)果通知出去。

GenericFutureListener

public interface GenericFutureListener<F extends Future<?>> extends EventListener {

    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future  the source {@link Future} which called this callback
     */
    void operationComplete(F future) throws Exception;
}

GenericFutureListener只有一個(gè)operationComplete方法,通過(guò)實(shí)現(xiàn)GenericFutureListener接口的operationComplete調(diào)用方就能知道異步執(zhí)行的結(jié)果。

MpscUnboundedArrayQueue

前面提到,NioEventLoop中有一個(gè)用于存放任務(wù)的隊(duì)列,如果沒(méi)有指定特定的queueFactory,默認(rèn)為MpscUnboundedArrayQueue
MpscUnboundedArrayQueue是JCTools所提供的的一種隊(duì)列,JCTools提供了幾種目前jdk沒(méi)有實(shí)現(xiàn)的隊(duì)列MpscUnboundedArrayQueue就是其中一種。從名字可知(MPSC - Multi Producer Single Consumer )多生產(chǎn)者單消費(fèi)者,是一種使用于多生產(chǎn)者單消費(fèi)者場(chǎng)景的隊(duì)列。而EventLoop正符合這種多生產(chǎn)者單消費(fèi)者的場(chǎng)景,所以采用MpscUnboundedArrayQueue隊(duì)列來(lái)存放任務(wù)。
下面說(shuō)一下MpscUnboundedArrayQueue的幾個(gè)好處,具體細(xì)節(jié)就不深入研究了
1.緩存行填充
為什么要有緩存行填充?首先要知道“偽共享”的概念,偽共享和 CPU 內(nèi)部的 Cache 有關(guān),Cache 內(nèi)部是按照緩存行(Cache Line)管理的,緩存行的大小通常是 64 個(gè)字節(jié)。CPU加載數(shù)據(jù)是按照一個(gè)緩存行為單位加載的。

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

如上圖如果出隊(duì)索引和入隊(duì)索引用32位的int類(lèi)型,CPU會(huì)把takeIndex、putIndex加載到一個(gè)緩存行。當(dāng)一個(gè)入隊(duì)操作的時(shí)候會(huì)修改putIndex,導(dǎo)致整個(gè)緩存行失效,需要重新加載到緩存中,但是入隊(duì)操作并不會(huì)修改takeIndex,由于putIndex與takeIndex共享同一個(gè)緩存行,所以也會(huì)導(dǎo)致takeIndex失效,這種情況就是偽共享。MpscUnboundedArrayQueue采用讓變量獨(dú)占緩存行的形式解決偽共享的問(wèn)題。

abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue
{
    long p01, p02, p03, p04, p05, p06, p07;
    long p10, p11, p12, p13, p14, p15, p16, p17;
}
  1. 無(wú)鎖
    首先offer方法,這里只分析關(guān)鍵部分
while (true)
        {
          //省略代碼
          ......
          //cas設(shè)置生產(chǎn)者索引
            if (casProducerIndex(pIndex, pIndex + 2))
            {
                break;
            }
        }
        // INDEX visible before ELEMENT
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        //放置元素
        soRefElement(buffer, offset, e); // release element e
        return true;
    }

offer方法采用cas的方式去放入元素,而poll由于是單線(xiàn)程消費(fèi),無(wú)需加鎖和cas操作。

EventLoopGroup

EventLoopGroup是對(duì)EventLoop進(jìn)行管理,那么EventLoopGroup是如何對(duì)EventLoop進(jìn)行管理的呢?先看一個(gè)簡(jiǎn)單的execute方法

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

是調(diào)用next返回一個(gè)對(duì)象后執(zhí)行的execute方法,跟進(jìn)去看一下next方法

    /**
     * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
     */
    EventExecutor next();

返回一個(gè)EventExecutorGroup所管理的EventExecutor對(duì)象,對(duì)于EventLoopGroup來(lái)說(shuō)next返回的就是所管理的EventLoop對(duì)象,next方法就是EventLoopGroup管理的EventLoop關(guān)鍵方法。
最后找一下next的實(shí)現(xiàn),在MultithreadEventExecutorGroup類(lèi)中

    @Override
    public EventExecutor next() {
        return chooser.next();
    }

chooser變量

private final EventExecutorChooserFactory.EventExecutorChooser chooser;

最后再看一下EventExecutorChooserFactory的實(shí)現(xiàn),只有一個(gè)實(shí)現(xiàn)類(lèi)EventExecutorChooserFactory

/**
 * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
 */
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {

    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }

    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }
}

很簡(jiǎn)單的實(shí)現(xiàn),按照管理executors數(shù)量的奇偶數(shù)返回不同的策略,核心邏輯就是循環(huán)依次返回executors中的executor。

總結(jié)

本文主要分析了Netty的線(xiàn)程模型EventLoop,最重要的要關(guān)注的點(diǎn)就是EventLoop是單線(xiàn)程模型,所有提交給EventLoop都需要判斷調(diào)用線(xiàn)程是否為EventLoop所持有的thread,如果是直接執(zhí)行,否則添加到EventLoop的任務(wù)隊(duì)列里面。這一部分個(gè)人感覺(jué)是netty中設(shè)計(jì)很精妙的一點(diǎn),如果要使用多線(xiàn)程就用EventLoopGroup去管理EventLoop,EventLoopGroup中的EventLoop又互相隔離,互不影響也不會(huì)出現(xiàn)線(xiàn)程安全的問(wèn)題。
本文主要分析了EventLoop的繼承關(guān)系和一些設(shè)計(jì)思想,并沒(méi)有具體去分析一些代碼細(xì)節(jié),下篇文章會(huì)具體分析EventLoop的thread所負(fù)責(zé)的三件事情:

  1. select: 返回EventLoop所持有的selector上已經(jīng)準(zhǔn)備就緒的Channel;
  2. processSelectedKeys: 處理準(zhǔn)備就緒的IO操作;
  3. ranTasks: 執(zhí)行隊(duì)列里的任務(wù)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容