二、netty源碼分析之EventLoopGroup

一、EventLoopGroup功能概述

EventLoopGroup是netty中一個比較核心的組件,想要知道EventLoopGroup的功能,我們先看一下EventLoopGroup的類圖關(guān)系:

EventLoopGroup

Exector是java的JUC包中定義的一個接口,我們可以看一下具體定義:

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

從代碼的注釋中我們可以看到,Exector定義的execute方法的作用。即執(zhí)行提交的任務(wù)。執(zhí)行可以是在一個的線程中、或者在線程池中、也可以在方法調(diào)用線程中。總的來說,Exector開放了提交任務(wù)執(zhí)行的能力。

接下來是ExecutorService接口。它繼承自Exector,新定義了以下方法:

public interface ExecutorService extends Executor {
    /**
     * 停止接受新提交的任務(wù),已經(jīng)提交的等待中的任務(wù)會執(zhí)行完成
     */
    void shutdown();
     /**
     * 停止提交新的任務(wù),已經(jīng)提交的等待中的任務(wù)
     * 也會停止等待,返回等待中的任務(wù)表
     */
    List<Runnable> shutdownNow();
    /**
     * 返回執(zhí)行器是否被停止
     */
    boolean isShutdown();
    /**
     * 返回執(zhí)行器在shutdown之后,是否所有的任務(wù)都被執(zhí)行完。
     *shutdown()或者shutdownNow()被調(diào)用后才會終止。
     */
    boolean isTerminated();
    /**
     * 阻塞直到所有的任務(wù)都執(zhí)行完,或者超時,或者線程受到interrupted信號
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    /**
     * 提交有返回值的任務(wù)
     */
    <T> Future<T> submit(Callable<T> task);
    /**
     * 提交一個任務(wù),任務(wù)成功Future返回給定的result
     */
    <T> Future<T> submit(Runnable task, T result);
    /**
     * 提交一個任務(wù),任務(wù)成功Future返回null
     */
    Future<?> submit(Runnable task);
    /**
     * 執(zhí)行給定的任務(wù)列表,當(dāng)執(zhí)行完成后返回持有執(zhí)行狀態(tài)和執(zhí)行結(jié)果的Future列表
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
   /**
     * 執(zhí)行給定的任務(wù)列表,當(dāng)執(zhí)行完或者超時后返回
     *持有執(zhí)行狀態(tài)和執(zhí)行結(jié)果的Future列表
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
   /**
     * 執(zhí)行給定的任務(wù)列表,如果所有任務(wù)都執(zhí)行成功,返回其中的一個結(jié)果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
   /**
     * 執(zhí)行給定的任務(wù)列表,如果所有任務(wù)都在給定的時
     * 間內(nèi)執(zhí)行成功,返回其中的一個結(jié)果
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以看到ExecutorService這個接口開放了提交有返回結(jié)果的任務(wù)的能力,同時開放了停止執(zhí)行器的能力。

接下來是ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {
   /**
     * 創(chuàng)建并且執(zhí)行一個一次性的任務(wù),這個任務(wù)在給定的延遲事件之后才可以被執(zhí)行
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

      /**
     * 創(chuàng)建并且執(zhí)行一個帶有返回值的一次性的任務(wù),
     * 這個任務(wù)在給定的延遲事件之后才可以被執(zhí)行
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
   /**
     * 創(chuàng)建并且執(zhí)行一個周期性的任務(wù),任務(wù)的開始時間是固定間隔的
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
   /**
     * 創(chuàng)建并且執(zhí)行一個周期性的任務(wù),一個任務(wù)執(zhí)行完成到
     * 下個任務(wù)執(zhí)行開始之間的時間間隔是固定的
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);
}

我們可以看到,ScheduledExecutorService具有延遲執(zhí)行和周期執(zhí)行任務(wù)的能力。

接下來是EventExecutorGroup接口,EventExecutorGroup的方法比較多,我們這里只列出部分關(guān)鍵方法:

public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor> {

    boolean isShuttingDown();

    Future<?> shutdownGracefully();

    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

    /**
     * Returns the {@link Future} which is notified when all {@link EventExecutor}s managed by this
     * {@link EventExecutorGroup} have been terminated.
     */
    Future<?> terminationFuture();

    /**
     * Returns one of the {@link EventExecutor}s managed by this {@link EventExecutorGroup}.
     */
    EventExecutor next();

    @Override
    Iterator<EventExecutor> iterator();
}

我們可以看到EventExecutorGroup主要提供了兩個能力:一是優(yōu)雅停機的能力,優(yōu)雅停機這塊我們本篇先不去分析,放到接下來的筆記中去分析;第二個就是執(zhí)行器調(diào)度的能力,通過next()方法來返回下一個要執(zhí)行任務(wù)的EventExecutor。

最后就是EventLoopGroup了。我們看一下這個接口的代碼:

public interface EventLoopGroup extends EventExecutorGroup {
    /**
     * Return the next {@link EventLoop} to use
     */
    @Override
    EventLoop next();

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The returned {@link ChannelFuture}
     * will get notified once the registration was complete.
     */
    ChannelFuture register(Channel channel);

    /**
     * Register a {@link Channel} with this {@link EventLoop} using a {@link ChannelFuture}. The passed
     * {@link ChannelFuture} will get notified once the registration was complete and also will get returned.
     */
    ChannelFuture register(ChannelPromise promise);

    /**
     * Register a {@link Channel} with this {@link EventLoop}. The passed {@link ChannelFuture}
     * will get notified once the registration was complete and also will get returned.
     *
     * @deprecated Use {@link #register(ChannelPromise)} instead.
     */
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
}

從上面的代碼可以看到,EventLoopGroup又新增了注冊Channel的方法。到這里為止,我們可以給EventLoopGroup下個定義:有優(yōu)雅停機功能、可以注冊Channel的事件執(zhí)行器。到現(xiàn)在我們只看了EventLoopGroup這個接口,也許會對這個接口的理解比較模糊,接下來我們就結(jié)合EventLoopGroup這個接口的一個實現(xiàn)類來看看EventLoopGroup這個接口在netty中究竟是扮演者一個什么角色。我們下面拿我們開發(fā)中比較常用的NioEventLoopGroup來具體分析一下。

從NioEventLoopGroup來看EventLoopGroup在netty中扮演的角色

我們看一下NioEventLoopGroup的部分內(nèi)容:

public class NioEventLoopGroup extends MultithreadEventLoopGroup {

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler,
                             final EventLoopTaskQueueFactory taskQueueFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                rejectedExecutionHandler, taskQueueFactory);
    }

    /**
     * Sets the percentage of the desired amount of time spent for I/O in the child event loops.  The default value is
     * {@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks.
     */
    public void setIoRatio(int ioRatio) {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).setIoRatio(ioRatio);
        }
    }

    /**
     * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
     * around the  infamous epoll 100% CPU bug.
     */
    public void rebuildSelectors() {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).rebuildSelector();
        }
    }

    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
    }
}

NioEventLoopGroup的內(nèi)容其實并沒有多少,邏輯大部分都在其父類之中。除了構(gòu)造方法之外,只有三個方法:
1、setIoRatio(int ioRatio)設(shè)置IO運行率,方法內(nèi)容也很簡單,就是遍歷自身持有的NioEventLoop對象,并且設(shè)置NioEventLoop的ioRatio參數(shù)。
2、rebuildSelectors()遍歷自身持有的NioEventLoop對象,調(diào)用NioEventLoop對象的rebuildSelector()方法。
3、EventLoop newChild(Executor executor, Object... args)創(chuàng)建一個NioEventLoop對象。這是一個父類的模板方法。

父類MultithreadEventLoopGroup是一個抽象類,這個抽象類實現(xiàn)了幾個重載的register方法。方法內(nèi)容都是調(diào)用父類的next()方法獲取自身的一個EventLoop對象,然后把需要注冊的Channel、ChannelPromise參數(shù)注冊到EventLoop。

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

next()方法就是實現(xiàn)EventLoopGroupEventLoop調(diào)度的方法(EventLoop是真正執(zhí)行任務(wù)的執(zhí)行器,后面我們會說到)。next()方法是怎么實現(xiàn)對EventLoop調(diào)度的呢?我們可以看到這個方法調(diào)用了EventExecutorChooser的實現(xiàn)類的next()方法。netty默認(rèn)提供了兩個實現(xiàn)類,一個是GenericEventExecutorChooser,另一個是PowerOfTwoEventExecutorChooser,這兩個選擇器都是輪詢EventLoopGroup持有的EventLoop。這兩個選擇器都是輪詢EventLoop,有什么區(qū)別呢?我們從這兩個實現(xiàn)類可以看出netty對性能追求的極致之處:

// GenericEventExecutorChooser的選擇方法
return executors[Math.abs(idx.getAndIncrement() % executors.length)];

// PowerOfTwoEventExecutorChooser的選擇方法
return executors[idx.getAndIncrement() & executors.length - 1];

GenericEventExecutorChooser是通用的輪詢方法;而PowerOfTwoEventExecutorChooser是專門用來處理當(dāng)EventLoop數(shù)量是2的次方數(shù)時的情況,用位運算取idx的低位(低log2 (executors.length) 位)。netty不會放過哪怕這一點點對性能的優(yōu)化!??!

我們繼續(xù)按圖索驥看一下MultithreadEventLoopGroup的父類MultithreadEventExecutorGroup。我們可以看到,MultithreadEventLoopGroup實現(xiàn)了停機的功能,不過都是調(diào)用的持有的EventExecutor的對應(yīng)方法,我們這里就不詳細分析了。MultithreadEventExecutorGroup最主要的邏輯在它的構(gòu)造方法中,構(gòu)造方法實現(xiàn)了對EventExecutor的初始化等工作,我們詳細看一下這塊內(nèi)容:

    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);
                success = true;
            } catch (Exception e) {
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                if (!success) {
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }

                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            while (!e.isTerminated()) {
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        chooser = chooserFactory.newChooser(children);

        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                if (terminatedChildren.incrementAndGet() == children.length) {
                    terminationFuture.setSuccess(null);
                }
            }
        };

        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

首先,判斷參數(shù)傳入的Executor執(zhí)行器對象是否是空,如果是空,則初始化一個默認(rèn)的實現(xiàn)類ThreadPerTaskExecutor,我們從這個類的名字可以看出它的能力,每個任務(wù)一個線程,我們看它的實現(xiàn)也確實如此:

    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }

每次提交任務(wù)的時候,都會調(diào)用線程工廠創(chuàng)建一個線程來執(zhí)行任務(wù)!這個Executor對象是最終用來執(zhí)行任務(wù)的,不過卻不是EventLoopGroup調(diào)度的內(nèi)容。我們繼續(xù)往下看MultithreadEventExecutorGroup的構(gòu)造方法。初始化EventExecutor[] children這個成員變量,新建一個大小為nThreads的數(shù)組,然后循環(huán)為數(shù)組元素賦值。這個EventExecutor[] children才是EventLoopGroup真正調(diào)度的內(nèi)容。在填充數(shù)組內(nèi)容的時候,調(diào)用了子類的模板方法newChild(),在之前我們在NioEventLoopGroup中看到的newChild()方法就是這個模板方法的一個實現(xiàn)。

children數(shù)組填充完成之后,是初始化成員變量chooser的操作,chooser就是用來調(diào)度所有執(zhí)行器,我們上面已經(jīng)分析過了,這里不再贅述。
接下來是對EventExecutor[] children中的每個執(zhí)行器添加終止監(jiān)聽器,確保EventExecutor[] children中的所有執(zhí)行器都終止后,會調(diào)用設(shè)置成員變量terminationFuture的狀態(tài)。

最后是對readonlyChildren這個成員變量賦值,看這個變量的名字我們也能猜出來這個成員變量是children變量的只讀版本,事實也的確如此。

復(fù)盤

我們分析了NioEventLoop的核心源碼,整個內(nèi)容也許有點繞,我們這里再把NioEventLoop的類圖貼出來幫助大家理解:

NioEventLoopGroup

我們可以看到,EventExecutorGroup以及其父接口,開放了任務(wù)提交、任務(wù)執(zhí)行、優(yōu)雅停機等能力。而實現(xiàn)類AbstractEventExecutorGroup這個抽象類實現(xiàn)了任務(wù)提交的基本方法,MultithreadEventExecutorGroup則實現(xiàn)了多線程任務(wù)調(diào)度的基本方法。子類MultithreadEventLoopGroup不僅繼承了MultithreadEventExecutorGroup這個抽象了,具備了多線程執(zhí)行調(diào)度的基本能力,而且還實現(xiàn)了EventLoopGroup接口,具備注冊Channel的能力。其對外開放的變化點僅僅有創(chuàng)建執(zhí)行器這個功能。我們繼承MultithreadEventExecutorGroup僅僅可以通過實現(xiàn)newChild這個抽象方法來定制自己的EventLoop事件處理器(不考慮方法重寫)。最后,NioEventLoopGroup繼承了MultithreadEventExecutorGroup,通過實現(xiàn)newChild方法,來指定事件處理器是NioEventLoop,下篇我們會分析NioEventLoop的功能。

到這里我們基于NioEventLoopGroup分析了EventLoopGroup的基本能力。EventLoopGroup可以注冊Channel,然后具有任務(wù)執(zhí)行、選擇執(zhí)行器去執(zhí)行任務(wù),也就是執(zhí)行器調(diào)度的能力。也許現(xiàn)在大家對EventLoopGroup的功能還是比較模糊,不過沒關(guān)系,加下來我們會繼續(xù)分析EventLoop、BootStrap等關(guān)鍵組件。等分析完這些內(nèi)容,大家對netty的理解就會更清晰。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容