Netty源碼解析—— EventLoop(二)之 EventLoopGroup

Netty源碼解析—— EventLoop(二)之 EventLoopGroup

1.類結構圖

NioEventLoopGroup.png

2. EventExecutorGroup

EventExecutorGroup 實現(xiàn) ScheduledExecutorService 、Iterable接口,這兩個接口都是jdk原生接口,具體看EventExecutorGroup接口中的方法,代碼如下:

   // ========== 自定義接口 ===================================
   
    //是否正在關閉
    boolean isShuttingDown();

    //優(yōu)雅關閉線程池
    Future<?> shutdownGracefully();
  
    Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit);

    //返回線程池終止時的異步結果
    Future<?> terminationFuture();

    //選擇一個 EventExecutor 對象
    EventExecutor next();


  // ========== 實現(xiàn)自 Iterable 接口 ==========
    @Override
    Iterator<EventExecutor> iterator();


// ========== 實現(xiàn)自 ExecutorService 接口 ==========

    @Override
    @Deprecated
    void shutdown();

   
    @Override
    @Deprecated
    List<Runnable> shutdownNow();


    @Override
    Future<?> submit(Runnable task);

    @Override
    <T> Future<T> submit(Runnable task, T result);

    @Override
    <T> Future<T> submit(Callable<T> task);

    // ========== 實現(xiàn)自 ScheduledExecutorService 接口 ==========


    @Override
    ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    @Override
    <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    @Override
    ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
  • 重點關注next()方法,該方法的功能是從線程池中選擇一個線程
  • 比較特殊的是,接口方法返回類型為 Future 不是 Java 原生的 java.util.concurrent.Future ,而是 Netty 自己實現(xiàn)的 Future 接口,如下代碼:
public interface Future<V> extends java.util.concurrent.Future<V> 

public interface ScheduledFuture<V> extends Future<V>, java.util.concurrent.ScheduledFuture<V> 

3. AbstractEventExecutorGroup

io.netty.util.concurrent.AbstractEventExecutorGroup ,實現(xiàn) EventExecutorGroup 接口,EventExecutor ( 事件執(zhí)行器 )的分組抽象類。

3.1 submit

#submit(...) 方法,提交一個普通任務到 EventExecutor 中, 提交的 EventExecutor ,通過 #next() 方法選擇

   @Override
    public Future<?> submit(Runnable task) {
        return next().submit(task);
    }

    @Override
    public <T> Future<T> submit(Runnable task, T result) {
        return next().submit(task, result);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        return next().submit(task);
    }

3.2 schedule

#schedule(...) 方法,提交一個定時任務到 EventExecutor 中,提交的 EventExecutor ,通過 #next() 方法選擇。代碼如下:

@Override
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
        return next().schedule(command, delay, unit);
    }

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        return next().schedule(callable, delay, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
        return next().scheduleAtFixedRate(command, initialDelay, period, unit);
    }

    @Override
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        return next().scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }

3.3 execute

#execute(...) 方法,在 EventExecutor 中執(zhí)行一個普通任務,不需要返回結果,代碼如下:

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

3.4 invokeAll

#invokeAll(...) 方法,在 EventExecutor 中執(zhí)行多個普通任務, 多個任務使用同一個 EventExecuto。代碼如下:

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
            throws InterruptedException {
        return next().invokeAll(tasks);
    }

    @Override
    public <T> List<java.util.concurrent.Future<T>> invokeAll(
            Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
        return next().invokeAll(tasks, timeout, unit);
    }

3.5 invokeAny

#invokeAll(...) 方法,在 EventExecutor 中執(zhí)行多個普通任務,有一個執(zhí)行完成即可,多個任務使用同一個 EventExecutor 。代碼如下:

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
        return next().invokeAny(tasks);
    }

    @Override
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException {
        return next().invokeAny(tasks, timeout, unit);
    }

3.6 shutdown

#shutdown(...) 方法,關閉 EventExecutorGroup 。代碼如下:

    @Override
    public Future<?> shutdownGracefully() {
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }

    
    @Override
    @Deprecated
    public abstract void shutdown();

  
    @Override
    @Deprecated
    public List<Runnable> shutdownNow() {
        shutdown();
        return Collections.emptyList();
    }

  • 具體的 #shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit)#shutdown() 方法,由子類實現(xiàn)。

4. MultithreadEventExecutorGroup

4.1 構造方法

/**
     * EventExecutor 數(shù)組
     */
    private final EventExecutor[] children;
    /**
     * 不可變( 只讀 )的 EventExecutor 數(shù)組
     */
    private final Set<EventExecutor> readonlyChildren;
    /**
     * 已終止的 EventExecutor 數(shù)量
     */
    private final AtomicInteger terminatedChildren = new AtomicInteger();
    /**
     * 用于終止 EventExecutor 的異步 Future
     */
    private final Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
    /**
     * EventExecutor 選擇器
     */
    private final EventExecutorChooserFactory.EventExecutorChooser chooser;

   
    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }

   
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
        this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
    }

    
    protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        // 創(chuàng)建執(zhí)行器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        // 創(chuàng)建 EventExecutor 數(shù)組
        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            // 是否創(chuàng)建成功
            boolean success = false;
            try {
                // 創(chuàng)建 EventExecutor 對象,newChild抽象方法,具體有子類實現(xiàn)
                children[i] = newChild(executor, args);
                // 標記創(chuàng)建成功
                success = true;
            } catch (Exception e) {
                // 創(chuàng)建失敗,拋出 IllegalStateException 異常
                // TODO: Think about if this is a good exception type
                throw new IllegalStateException("failed to create a child event loop", e);
            } finally {
                // 創(chuàng)建失敗,關閉所有已創(chuàng)建的 EventExecutor
                if (!success) {
                    // 優(yōu)雅的關閉所有已創(chuàng)建的 EventExecutor,只負責關閉線程,并不知道關閉的結果
                    for (int j = 0; j < i; j ++) {
                        children[j].shutdownGracefully();
                    }
                    // 確保所有已創(chuàng)建的 EventExecutor 已關閉
                    for (int j = 0; j < i; j ++) {
                        EventExecutor e = children[j];
                        try {
                            //isTerminated() 若關閉后所有任務都已完成,則返回true。注意除非首先調(diào)用shutdown或shutdownNow,否則isTerminated永不為true。
                            // 返回:若關閉后所有任務都已完成,則返回true。
                            while (!e.isTerminated()) {
                                //等所有已提交的任務(包括正在跑的和隊列中等待的)執(zhí)行完
                                //或者等超時時間到
                                //或者線程被中斷,拋出InterruptedException
                                e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                            }
                        } catch (InterruptedException interrupted) {
                            // Let the caller handle the interruption.
                            Thread.currentThread().interrupt();
                            break;
                        }
                    }
                }
            }
        }

        // 創(chuàng)建 EventExecutor 選擇器
        chooser = chooserFactory.newChooser(children);

        // 創(chuàng)建監(jiān)聽器,用于 EventExecutor 終止時的監(jiān)聽
        //回調(diào)的具體邏輯是,當所有 EventExecutor 都終止完成時,
        // 通過調(diào)用 Future#setSuccess(V result) 方法,通知監(jiān)聽器們。至于為什么設置的值是 null ,因為監(jiān)聽器們不關注具體的結果。
        final FutureListener<Object> terminationListener = new FutureListener<Object>() {
            @Override
            public void operationComplete(Future<Object> future) throws Exception {
                 // 線程池中的線程每終止一個增加記錄數(shù),直到全部終止設置線程池異步終止結果為成功
                if (terminatedChildren.incrementAndGet() == children.length) {// 全部關閉
                    terminationFuture.setSuccess(null);// 設置結果,并通知監(jiān)聽器們。
                }
            }
        };

        // 設置監(jiān)聽器到每個 EventExecutor 上
        for (EventExecutor e: children) {
            e.terminationFuture().addListener(terminationListener);
        }

        // 創(chuàng)建不可變( 只讀 )的 EventExecutor 數(shù)組
        Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
        Collections.addAll(childrenSet, children);
        //設置不可變的EventExecutor集合
        readonlyChildren = Collections.unmodifiableSet(childrenSet);
    }

4.2 ThreadPerTaskExecutor

  • 創(chuàng)建執(zhí)行器的代碼如下:
// 創(chuàng)建執(zhí)行器
        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

具體看下 ThreadPerTaskExecutor這個類,代碼如下:


/**
 * 實現(xiàn) Executor 接口,每個任務一個線程的執(zhí)行器實現(xiàn)類
 */
public final class ThreadPerTaskExecutor implements Executor {
    /**
     * 線程工廠對象
     * Netty 實現(xiàn)自定義的 ThreadFactory 類,為 io.netty.util.concurrent.DefaultThreadFactory
     */
    private final ThreadFactory threadFactory;

    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }
    /**
     * 執(zhí)行任務
     *
     * @param command 任務
     *
     *  通過 ThreadFactory#newThread(Runnable) 方法,創(chuàng)建一個 Thread ,然后調(diào)用 Thread#start() 方法,啟動線程執(zhí)行任務
     */
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

  • io.netty.util.concurrent.ThreadPerTaskExecutor ,實現(xiàn) Executor 接口,每個任務一個線程的執(zhí)行器實現(xiàn)類

  • threadFactory 屬性,線程工程實例,通過構造方法來初始化,Netty 實現(xiàn)自定義的 ThreadFactory 類,為 io.netty.util.concurrent.DefaultThreadFactory 具體的創(chuàng)建看如下方法,創(chuàng)建默認的線程工廠類

  • /**
        * 創(chuàng)建線程工廠對象,并且使用類名作為 poolType
        * @return
        */
       protected ThreadFactory newDefaultThreadFactory() {
           return new DefaultThreadFactory(getClass());
       }
    
  • #execute(Runnable command) 方法,通過 ThreadFactory#newThread(Runnable) 方法,創(chuàng)建一個 Thread ,然后調(diào)用 Thread#start() 方法,啟動線程執(zhí)行任務

4.3 DefaultThreadFactory

4.4 EventExecutorChooserFactory

io.netty.util.concurrent.EventExecutorChooserFactory ,EventExecutorChooser 工廠接口。代碼如下:

/**
 * Factory that creates new {@link EventExecutorChooser}s.
 *
 * EventExecutorChooser 工廠接口
 */
@UnstableApi
public interface EventExecutorChooserFactory {

    /**
     * Returns a new {@link EventExecutorChooser}.
     *
     * 創(chuàng)建一個 EventExecutorChooser 對象
     */
    EventExecutorChooser newChooser(EventExecutor[] executors);

    /**
     * Chooses the next {@link EventExecutor} to use.
     *
     * EventExecutor 選擇器接口
     */
    @UnstableApi
    interface EventExecutorChooser {

        /**
         * Returns the new {@link EventExecutor} to use.
         *
         * 選擇下一個 EventExecutor 對象
         */
        EventExecutor next();
    }
}
  • #newChooser(EventExecutor[] executors) 方法,創(chuàng)建一個 EventExecutorChooser 對象;
  • EventExecutorChooser 接口,EventExecutor 選擇器接口。
    • #next() 方法選擇下一個 EventExecutor對象;

4.4.1 DefaultEventExecutorChooserFactory

io.netty.util.concurrent.DefaultEventExecutorChooserFactory ,實現(xiàn) EventExecutorChooserFactory 接口,默認 EventExecutorChooser 工廠實現(xiàn)類。代碼如下


/**
 * Default implementation which uses simple round-robin to choose next {@link EventExecutor}.
 *
 * 實現(xiàn) EventExecutorChooserFactory 接口,默認 EventExecutorChooser 工廠實現(xiàn)類
 */
@UnstableApi
public final class DefaultEventExecutorChooserFactory implements EventExecutorChooserFactory {
    /**
     * 單例
     */
    public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();

    private DefaultEventExecutorChooserFactory() { }

    @SuppressWarnings("unchecked")
    @Override
    public EventExecutorChooser newChooser(EventExecutor[] executors) {
        if (isPowerOfTwo(executors.length)) {// 是否為 2 的冪次方
            return new PowerOfTwoEventExecutorChooser(executors);
        } else {
            return new GenericEventExecutorChooser(executors);
        }
    }

    /**
     * 是否為 2 的冪次方
     * @param val
     * @return
     */
    private static boolean isPowerOfTwo(int val) {
        return (val & -val) == val;
    }  
}

  • DefaultEventExecutorChooserFactory是個單例;
  • Netty實現(xiàn)了兩個線程選擇器,雖然代碼不一致,功能都是一樣的:每次選擇索引為上一次所選線程索引+1的線程
  • #newChooser(EventExecutor[] executors)創(chuàng)建具體的選擇器,根據(jù)#isPowerOfTwo(executors.length)方法來判斷,創(chuàng)建哪種選擇器,EventExecutor 數(shù)組的大小是否為 2 的冪次方,如果是,創(chuàng)建PowerOfTwoEventExecutorChooser選擇器,不是則創(chuàng)建GenericEventExecutorChooser選擇器;
  • #isPowerOfTwo(int val) 方法,為什么 (val & -val) == val 可以判斷數(shù)字是否為 2 的冪次方呢?

? 我們以 8 來舉個例子:

      - 8 的二進制為 `1000` 
      - -8 的二進制使用補碼表示。所以,先求反生成反碼為 `0111` ,然后加一生成補碼為 `1000` 
      - 8 和 -8 與操作后,還是 8 。與操作是都為1則為1,其他都為0,所以結果還是1000&1000還是1000;
      - 實際上,以 2 為冪次方的數(shù)字,都是最高位為 1 ,剩余位為 0 ,所以對應的負數(shù),求完補碼還是自己

4.4.2 PowerOfTwoEventExecutorChooser

PowerOfTwoEventExecutorChooser 實現(xiàn) EventExecutorChooser 接口,基于 EventExecutor 數(shù)組的大小為 2 的冪次方的 EventExecutor 選擇器實現(xiàn)類。這是一個優(yōu)化的實現(xiàn),線程池數(shù)量使用2的冪次方,這樣線程池選擇線程時使用位操作,能使性能最高,PowerOfTwoEventExecutorChooser 是 DefaultEventExecutorChooserFactory 的靜態(tài)內(nèi)部類,代碼如下:

 /**
     *  實現(xiàn) EventExecutorChooser 接口,基于 EventExecutor 數(shù)組的大小為 2 的冪次方的 EventExecutor 選擇器實現(xiàn)類
     */
    private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
        /**
         * 自增序列
         */
        private final AtomicInteger idx = new AtomicInteger();
        /**
         * EventExecutor 數(shù)組
         */
        private final EventExecutor[] executors;

        PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        /**
         * 因為 - ( 二元操作符 ) 的計算優(yōu)先級高于 & ( 一元操作符 ) 。
         *
         * 因為 EventExecutor 數(shù)組的大小是以 2 為冪次方的數(shù)字,那么減一后,除了最高位是 0 ,剩余位都為 1          ( 例如 8 減一后等于 7 ,而 7 的二進制為 0111 。),
         * 那么無論 idx 無論如何遞增,再進行 & 并操作,都不會超過 EventExecutor 數(shù)組的大小。并且,還能保            證順序遞增。
         * @return
         */
        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }
    }

4.4.3 GenericEventExecutorChooser

GenericEventExecutorChooser 實現(xiàn) EventExecutorChooser 接口,通用的 EventExecutor 選擇器實現(xiàn)類。代碼如下:

GenericEventExecutorChooser 內(nèi)嵌在 DefaultEventExecutorChooserFactory 類中。

?

/**
     * GenericEventExecutorChooser 內(nèi)嵌在 DefaultEventExecutorChooserFactory 類中。
     *  實現(xiàn) EventExecutorChooser 接口,通用的 EventExecutor 選擇器實現(xiàn)類
     */
    private static final class GenericEventExecutorChooser implements EventExecutorChooser {
        private final AtomicInteger idx = new AtomicInteger();
        private final EventExecutor[] executors;

        GenericEventExecutorChooser(EventExecutor[] executors) {
            this.executors = executors;
        }

        /**
         * 使用 idx 自增,并使用 EventExecutor 數(shù)組的大小來取余
         * @return
         */
        @Override
        public EventExecutor next() {
            return executors[Math.abs(idx.getAndIncrement() % executors.length)];
        }
    }

4.5 next

#next() 方法,選擇下一個 EventExecutor 對象。代碼如下:

 /**
     * 選擇下一個 EventExecutor 對象
     * @return
     */
    @Override
    public EventExecutor next() {
        return chooser.next();
    }

4.6 iterator

 /**
     * 獲得 EventExecutor 數(shù)組的迭代器
     * 為了避免調(diào)用方,獲得迭代器后,對 EventExecutor 數(shù)組進行修改,
     * 所以返回是不可變的 EventExecutor 數(shù)組 readonlyChildren 的迭代器
     * @return
     */
    @Override
    public Iterator<EventExecutor> iterator() {
        return readonlyChildren.iterator();
    }

4.7 executorCount

 /**
     * Return the number of {@link EventExecutor} this implementation uses. This number is the maps
     * 1:1 to the threads it use.
     *
     * 獲得 EventExecutor 數(shù)組的大小
     */
    public final int executorCount() {
        return children.length;
    }

4.8 newChild

   /**
     * Create a new EventExecutor which will later then accessible via the {@link #next()}  method. This method will be
     * called for each thread that will serve this {@link MultithreadEventExecutorGroup}.
     *
     * 抽象方法,子類實現(xiàn)該方法,創(chuàng)建其對應的 EventExecutor 實現(xiàn)類的對象
     *
     */
    protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;

4.9 shutdownGracefully

    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        return terminationFuture();
    }

  • 優(yōu)雅的關閉EventExecutor線程組,返回terminationFuture,在構造方法中由于已經(jīng)設置了監(jiān)聽,如下代碼,通過success屬性來判斷是否全部都關閉;

  • final FutureListener<Object> terminationListener = new FutureListener<Object>() {
               @Override
               public void operationComplete(Future<Object> future) throws Exception {
                   // 線程池中的線程每終止一個增加記錄數(shù),直到全部終止設置線程池異步終止結果為成功
                   if (terminatedChildren.incrementAndGet() == children.length) {// 全部關閉
                       terminationFuture.setSuccess(null);// 設置結果,并通知監(jiān)聽器們。
                   }
               }
           };
    

4.10 terminationFuture

    /**
     * 返回用于終止 EventExecutor 的異步 Future
     * @return
     */
    @Override
    public Future<?> terminationFuture() {
        return terminationFuture;
    }

4.11 shutdown

   /**
     * 廢棄的方法,EventExecutor線程組關閉
     */
    @Override
    @Deprecated
    public void shutdown() {
        for (EventExecutor l: children) {
            l.shutdown();
        }
    }

4.12 isShuttingDown

    /**
     * 判斷所有的EventExecutor是否在優(yōu)雅的關閉,或者已經(jīng)關閉,
     * 任何一個EventExecutor沒有關閉則返回false
     * @return
     */
    @Override
    public boolean isShuttingDown() {
        for (EventExecutor l: children) {
            if (!l.isShuttingDown()) {
                return false;
            }
        }
        return true;
    }

4.13 isShutdown

    /**
     * 判斷所有的EventExecutor是否都關閉
     * @return
     */
    @Override
    public boolean isShutdown() {
        for (EventExecutor l: children) {
            if (!l.isShutdown()) {
                return false;
            }
        }
        return true;
    }

4.14 isTerminated

    /**
     * EventExecutor線程組關閉后,所有任務是否都已完成
     * @return
     */
    @Override
    public boolean isTerminated() {
        for (EventExecutor l: children) {
            if (!l.isTerminated()) {
                return false;
            }
        }
        return true;
    }

4.15 awaitTermination

/**
     * 等待所有的EventExecutor任務都執(zhí)行完或者等待時間超時,返回任務是否都已經(jīng)執(zhí)行完
     * @param timeout
     * @param unit
     * @return
     * @throws InterruptedException
     */
    @Override
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        long deadline = System.nanoTime() + unit.toNanos(timeout);
        loop: for (EventExecutor l: children) {
            for (;;) {
                //超時則跳出loop循環(huán)
                long timeLeft = deadline - System.nanoTime();
                if (timeLeft <= 0) {
                    break loop;
                }
                //等所有已提交的任務(包括正在跑的和隊列中等待的)執(zhí)行完
                //或者等超時時間到
                //或者線程被中斷,拋出InterruptedException
                //跳出for(;;)循環(huán)
                if (l.awaitTermination(timeLeft, TimeUnit.NANOSECONDS)) {
                    break;
                }
            }
        }
        return isTerminated();
    }

5. EventLoopGroup

io.netty.channel.EventExecutorGroup ,繼承 EventExecutorGroup 接口,EventLoop 的分組接口。代碼如下:

 // ========== 實現(xiàn)自 EventExecutorGroup 接口 ==========
    /**
     * Return the next {@link EventLoop} to use
     * 覆蓋父類接口的方法,選擇下一個 EventLoop 對象
     */
    @Override
    EventLoop next();

    // ========== 自定義接口 ==========

    /**
     * 注冊 Channel 到 EventLoopGroup 中的一個線程上。實際上,EventLoopGroup 會分配一個 EventLoop 給該 Channel 注冊
     */
    ChannelFuture register(Channel channel);

  
    ChannelFuture register(ChannelPromise promise);

   
    @Deprecated
    ChannelFuture register(Channel channel, ChannelPromise promise);
  • #next() 方法,覆蓋父類接口的方法,選擇下一個 EventLoop 對象
  • #register(...) 方法,注冊 Channel 到 EventLoopGroup 中的一個線程上。實際上,EventLoopGroup 會分配一個 EventLoop 給該 Channel 注冊

6.MultithreadEventLoopGroup

io.netty.channel.MultithreadEventLoopGroup ,實現(xiàn) EventLoopGroup 接口,繼承 MultithreadEventExecutorGroup 抽象類,基于多線程的 EventLoop 的分組抽象類。

6.1 構造方法

/**
     * 默認 EventLoop 線程數(shù)
     * EventLoopGroup 默認擁有的 EventLoop 數(shù)量。因為一個 EventLoop 對應一個線程,所以為 CPU 數(shù)量 * 2 。
     * 為什么會 * 2 呢?因為目前 CPU 基本都是超線程,一個 CPU 可對應 2 個線程。
     * 在構造方法未傳入 nThreads 方法參數(shù)時,使用 DEFAULT_EVENT_LOOP_THREADS 。
     */
    private static final int DEFAULT_EVENT_LOOP_THREADS;

    /**
     * 初始化線程數(shù)
     */
    static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, ThreadFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
    }

    /**
     * @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor,
     * EventExecutorChooserFactory, Object...)
     */
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                                     Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, chooserFactory, args);
    }
  • 主要初始化了線程數(shù),然后調(diào)用父類的構造方法
  • 默認情況,線程數(shù)最小為1,如果配置了系統(tǒng)參數(shù)io.netty.eventLoopThreads,設置為該系統(tǒng)參數(shù)值,否則設置為核心數(shù)的2倍。

6.2 newDefaultThreadFactory

#newDefaultThreadFactory() 方法,創(chuàng)建線程工廠對象,覆蓋父類方法,增加了線程優(yōu)先級為 Thread.MAX_PRIORITY ,代碼如下:

    /**
     * 創(chuàng)建線程工廠對象
     *
     * 覆蓋父類方法,增加了線程優(yōu)先級為 Thread.MAX_PRIORITY 。
     * @return
     */
    @Override
    protected ThreadFactory newDefaultThreadFactory() {
        return new DefaultThreadFactory(getClass(), Thread.MAX_PRIORITY);
    }

6.3 next()

#next()方法,選擇下一個 EventLoop 對象,覆蓋父類方法,將返回值轉換成 EventLoop 類,代碼如下:

    /**
     * 選擇下一個 EventLoop 對象
     *
     * 覆蓋父類方法,將返回值轉換成 EventLoop 類
     * @return
     */
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

6.4 newChild

#newChild(...) 抽象方法,創(chuàng)建 EventExecutor 對象,覆蓋父類方法,返回值改為 EventLoop 類。

    /**
     * 抽象方法,創(chuàng)建 EventExecutor 對象
     *
     * 覆蓋父類方法,返回值改為 EventLoop 類。
     * @param executor
     * @param args
     * @return
     * @throws Exception
     */
    @Override
    protected abstract EventLoop newChild(Executor executor, Object... args) throws Exception;

6.5 register(

#register(...)方法,注冊 Channel 到 EventLoopGroup 中,通過#next() 方法來選擇一個EventLoop來注冊,也就是通過EventExecutorChooser選擇器從線程組中選擇一個;

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }

    @Override
    public ChannelFuture register(ChannelPromise promise) {
        return next().register(promise);
    }

    @Deprecated
    @Override
    public ChannelFuture register(Channel channel, ChannelPromise promise) {
        return next().register(channel, promise);
    }
}

7. NioEventLoopGroup

io.netty.channel.nio.NioEventLoopGroup ,繼承 MultithreadEventLoopGroup 抽象類,NioEventLoop 的分組實現(xiàn)類。

7.1 構造方法

    public NioEventLoopGroup() {
        this(0);
    }

    
    public NioEventLoopGroup(int nThreads) {
        this(nThreads, (Executor) null);
    }

  
    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
        this(nThreads, threadFactory, SelectorProvider.provider());
    }

    public NioEventLoopGroup(int nThreads, Executor executor) {
       
        this(nThreads, executor, SelectorProvider.provider());
    }

    
    public NioEventLoopGroup(
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
       
        this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
        final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
       
        super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(
            int nThreads, Executor executor, final SelectorProvider selectorProvider) {
        this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
    }

    public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory,
                RejectedExecutionHandlers.reject());
    }

    public NioEventLoopGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory,
                             final SelectorProvider selectorProvider,
                             final SelectStrategyFactory selectStrategyFactory,
                             final RejectedExecutionHandler rejectedExecutionHandler) {
        super(nThreads, executor, chooserFactory, selectorProvider, selectStrategyFactory, rejectedExecutionHandler);
    }

構造方法比較多,主要是明確了父構造方法的 Object ... args方法參數(shù)

  • 第一個參數(shù),selectorProvider ,java.nio.channels.spi.SelectorProvider ,用于創(chuàng)建 Java NIO Selector 對象。
  • 第二個參數(shù),selectStrategyFactory ,io.netty.channel.SelectStrategyFactory ,選擇策略工廠。詳細解析,見后續(xù)文章。
  • 第三個參數(shù),rejectedExecutionHandler ,io.netty.channel.SelectStrategyFactory ,拒絕執(zhí)行處理器。詳細解析,見后續(xù)文章。

7.2 newChild

#newChild(Executor executor, Object... args) 方法,創(chuàng)建 NioEventLoop 對象

 
    @Override
    protected EventLoop newChild(Executor executor, Object... args) throws Exception {
        return new NioEventLoop(this, executor, (SelectorProvider) args[0],
            ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
    }
  • 模板方法newChild(),用來創(chuàng)建線程池中的單個線程,現(xiàn)在我們知道MultithreadEventExecutorGroupEventExecutor[] children 保存的就是NioEventLoop

7.3 setIoRatio

#setIoRatio(int ioRatio) 方法,設置所有 EventLoop 的 IO 任務占用執(zhí)行時間的比例

public void setIoRatio(int ioRatio) {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).setIoRatio(ioRatio);
        }
    }

7.4 rebuildSelectors

#rebuildSelectors() 方法,重建所有 EventLoop 的 Selector 對象

   /**
     * Replaces the current {@link Selector}s of the child event loops with newly created {@link Selector}s to work
     * around the  infamous epoll 100% CPU bug.
     *
     * 重建所有 EventLoop 的 Selector 對象
     *
     * 因為 JDK 有 epoll 100% CPU Bug 。實際上,NioEventLoop 當觸發(fā)該 Bug 時,
     * 也會自動調(diào)用 NioEventLoop#rebuildSelector() 方法,進行重建 Selector 對象,以修復該問題。
     */
    public void rebuildSelectors() {
        for (EventExecutor e: this) {
            ((NioEventLoop) e).rebuildSelector();
        }
    }

源碼解析好文

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容