【第6篇】Netty對Executor實現(xiàn)機制分析

ThreadPerTaskExecutor

  • ThreadPerTaskExecutor每一個任務的執(zhí)行器(代理和命令模式)線程解耦(執(zhí)行線程和創(chuàng)建線程)
public final class ThreadPerTaskExecutor implements Executor {
   //定義一個私有的線程工廠
    private final ThreadFactory threadFactory;
   //定義一個ThreadPerTaskExecutor構(gòu)造方法
    public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        this.threadFactory = threadFactory;
    }

  //這個是一個命令模式,這個execute方法的作用是:在將來的某個時間執(zhí)行給定的命令。 該命令可以在Executor實現(xiàn)的判斷下在新線程,池化線程或調(diào)用線程中執(zhí)行
    @Override
    public void execute(Runnable command) {
        threadFactory.newThread(command).start();
    }
}

ExecutorService

  • ExecutorService 繼承了Executor接口,增加了對自身生命周期管理的方法,同時提供了一個Future給命令者去獲取命令的執(zhí)行結(jié)果
public interface ExecutorService extends Executor {

    /**
     * 啟動一個有序關閉,其中先前提交的任務將被執(zhí)行,但不會接受任何新任務。 如果已經(jīng)關閉,調(diào)用沒有其他影響。此方法不會等待先前提交的任務完成執(zhí)行。 使用awaitTermination來做到這一點。
     */
    void shutdown();

    /**
     * 嘗試停止所有正在執(zhí)行的任務,停止等待任務的處理,并返回等待執(zhí)行的任務列表。此方法不等待主動執(zhí)行任務終止。 使用awaitTermination來做到這一點。除盡力嘗試停止處理主動執(zhí)行任務之外,沒有任何保證。 例如,典型的實現(xiàn)將通過Thread.interrupt取消,因此任何無法響應中斷的任務都可能永遠不會終止。
     */
    List<Runnable> shutdownNow();

    /**
     * 如果此執(zhí)行程序已關閉,則返回true。
     */
    boolean isShutdown();

    /**
     * 如果關閉后所有任務都已完成,則返回true。 請注意,除非先調(diào)用shutdown或shutdownNow,否則isTerminated永遠不會為真。
     */
    boolean isTerminated();

    /**
     * 阻止所有任務在關閉請求之后完成執(zhí)行,或者發(fā)生超時,或者當前線程被中斷,以先發(fā)生者為準。
     */
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 提交值返回任務以執(zhí)行并返回表示任務的掛起結(jié)果的Future。 Future的get方法將在成功完成后返回任務的結(jié)果。
如果您想立即阻止等待任務,可以使用結(jié)構(gòu)形式為result = exec.submit(aCallable).get();
注意:Executors類包含一組方法,這些方法可以將一些其他常見的類似閉包的對象(例如,java.security.PrivilegedAction)轉(zhuǎn)換為Callable形式,以便可以提交它們。
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * 提交Runnable任務以執(zhí)行并返回表示該任務的Future。 Future的get方法將在成功完成后返回給定的結(jié)果。
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * 提交Runnable任務以執(zhí)行并返回表示該任務的Future。 Future的get方法將在成功完成后返回null。
     */
    Future<?> submit(Runnable task);

    /*
執(zhí)行給定的任務,返回完成所有狀態(tài)和結(jié)果的Futures列表。 對于返回列表的每個元素,F(xiàn)uture.isDone都為true。 請注意,已完成的任務可能正常終止或通過拋出異常終止。 如果在此操作正在進行時修改了給定集合,則此方法的結(jié)果是不確定的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

    /**
     *執(zhí)行給定的任務,返回一個Futures列表,其中包含所有完成或超時到期時的狀態(tài)和結(jié)果,以先發(fā)生者為準。 對于返回列表的每個元素,F(xiàn)uture.isDone都為true。 返回時,未完成的任務將被取消。 請注意,已完成的任務可能正常終止或通過拋出異常終止。 如果在此操作正在進行時修改了給定集合,則此方法的結(jié)果是不確定的。
     */
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    /**
     * 執(zhí)行給定的任務,返回已成功完成的任務的結(jié)果(即,不拋出異常),如果有的話。 在正?;蛱厥馔素洉r,未完成的任務將被取消。 如果在此操作正在進行時修改了給定集合,則此方法的結(jié)果是不確定的。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

    /**
     * 執(zhí)行給定的任務,返回已成功完成的任務的結(jié)果(即,不拋出異常),如果在給定的超時之前已經(jīng)執(zhí)行了任何操作。 在正?;蛱厥馔素洉r,未完成的任務將被取消。 如果在此操作正在進行時修改了給定集合,則此方法的結(jié)果是不確定的。
     */
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • invokeAny 被哪些類調(diào)用。


    invokeAny

ScheduledExecutorService

  • ScheduledExecutorService 繼承了ExecutorService接口,增加了對定時任務的支持。
//創(chuàng)建和執(zhí)行在給定延遲后啟用的一次性操作。
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
//創(chuàng)建并執(zhí)行一個周期性操作,該操作在給定的初始延遲后首先啟用,隨后在給定的時間段內(nèi)啟用; 即執(zhí)行將在initialDelay之后開始,然后是initialDelay + period,然后是initialDelay + 2 * period,依此類推。 如果任務的任何執(zhí)行遇到異常,則后續(xù)執(zhí)行被禁止。 否則,任務將僅通過取消或終止執(zhí)行者來終止。 如果此任務的執(zhí)行時間超過其周期,則后續(xù)執(zhí)行可能會延遲,但不會同時執(zhí)行。
public ScheduledFuture<?> scheduleAtFixedRate(@org.jetbrains.annotations.NotNull Runnable command, long initialDelay,  long period,TimeUnit unit)

EventExecutorGroup

  • EventExecutorGroup 繼承了ScheduledExecutorService接口,對原來的ExecutorService的關閉接口提供了增強,提供了優(yōu)雅的關閉接口。從接口名稱上可以看出它是對多個EventExecutor的集合,提供了對多個EventExecutor的迭代訪問接口。
EventExecutorGroup

SingleThreadEventExcutor

  • SingleThreadEventExcutor(單線程)實現(xiàn)了ScheduledExecutorService接口,支持執(zhí)行定時任務。得有個地方存放定時任務信息。類中的實現(xiàn)是delayedTaskQueue,它是一個PriorityQueue,也是一個BlockingQueue。不過它里面的元素不是按照先來后到的順序存取的,而是按照各個元素的優(yōu)先級判斷的SingleThreadEventExecutor類中有一個實例變量Thread,它引用的就是當前Executor所擁有的那個thread對象

SingleThreadEventExcutor類用到AtomicIntegerFieldUpdater基于反射的實用程序,可以對指定類的指定volatile int字段進行原子更新。 此類設計用于原子數(shù)據(jù)結(jié)構(gòu),其中同一節(jié)點的多個字段獨立地受原子更新的影響。
請注意,此類中compareAndSet方法的保證比其他原子類弱。 因為此類無法確保該字段的所有使用都適用于原子訪問的目的,所以它只能保證在compareAndSet的其他調(diào)用和同一更新程序上設置的原子性。

 @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
       // 這行是判斷是否在循環(huán)事件里面,點進去會跳到AbstractEventExecutor的inEventLoop方法
        boolean inEventLoop = inEventLoop();
        //如果inEventLoop為true就把任務添加一個任務隊列里
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();  //啟動線程
            addTask(task);//添加任務隊列
            //判斷是否關閉和移除任務
            if (isShutdown() && removeTask(task)) {
                reject();//調(diào)用拒絕方法
            }
        }
        //不是addTaskWakesUp 并且是wakesUpForTask
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop); //調(diào)用wakeup方法
        }
    }
  • startThread方法代碼
//啟動線程方法
   private void startThread() {
        if (state == ST_NOT_STARTED) {
            //這行代碼是AtomicIntegerFieldUpdaterImpl#Unsafe的compareAndSwapInt方法
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }

private void doStartThread() {
        assert thread == null; //斷言一個線程變量,初始化一個null
        //Executor執(zhí)行者
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread(); //獲取當前線程
                if (interrupted) {
                    //終止線程(暴力處理)
                    thread.interrupt();
                }

                boolean success = false;
              //更新內(nèi)部時間戳,該時間戳指示最近執(zhí)行提交的任務的時間。 runAllTasks()和runAllTasks(long)自動更新此時間戳,因此通常不需要調(diào)用此方法。 但是,如果使用takeTask()或pollTask()手動執(zhí)行任務,則必須在任務執(zhí)行循環(huán)結(jié)束時調(diào)用此方法以進行準確的靜默期檢查。
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                     //死循環(huán)
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // 檢查在循環(huán)結(jié)束時是否調(diào)用了confirmShutdown()。
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // 運行所有剩余的任務并關閉掛鉤。
                        for (;;) {
                              //確認關閉
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();//清除
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();//釋放
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }
                            //
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }
  • 上面Thread內(nèi)部run方法執(zhí)行的是SingleThreadEventExecutor.this.run(),而這個run方法是一個抽象方法,留給了子類去實現(xiàn)了。不過可以肯定的是子類的run方法是不斷的去tasksQueue中取出task去執(zhí)行?,F(xiàn)在重點分析下finally塊中的代碼。
    1、首先更改狀態(tài)為正在關閉狀態(tài)。
    2、如果子類中的run方法中的loop執(zhí)行成功了,就得先調(diào)用confirmShutdown,確認任務隊列中的任務是否都已經(jīng)被執(zhí)行了。
    3、然后還得再次確認下任務隊列中是否已被執(zhí)行完畢,因為在關閉的過程中外部也是能添加任務的。
    4、最終執(zhí)行清理工作,更改狀態(tài)為已關閉,釋放信號量。
    5、如果這個時候還是有任務沒執(zhí)行完,那也只能是無奈了,記個log吧
    6、更新整個關閉過程為success
  • confirmShutdown代碼
protected boolean confirmShutdown() {
    // 如果state狀態(tài) state < ST_SHUTTING_DOWN則直接return false
    if (!isShuttingDown()) {
        return false;
    }
    // 這個方法必須從內(nèi)部調(diào)用,從修飾符 protected也可以看出
    if (!inEventLoop()) {
        throw new IllegalStateException("must be invoked from an event loop");
    }
    // 取消所有的定時任務
    cancelDelayedTasks();
    
    if (gracefulShutdownStartTime == 0) {
        // 標記shutdown處理的開始時間
        gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
    }
    // 運行tasksQueue或者shutdownHooks中的所有Runnable都處理完成
    if (runAllTasks() || runShutdownHooks()) {
        //分析了下源碼,isShutdown()這個只能是在外部線程調(diào)用了shutdown()接口的時候才會有可能成為true
        //但是現(xiàn)在這個方法已經(jīng)@Deprecated,所以這個if塊是不會進入的
        if (isShutdown()) {
            // shutdown 成功,沒有更多的runnable需要執(zhí)行
            return true;
        }

        // There were tasks in the queue. Wait a little bit more until no tasks are queued for the quiet period.
        wakeup(true);
        return false;
    }

    final long nanoTime = ScheduledFutureTask.nanoTime();
    // runAllTasks() 或者runAllTasks() + runShutdownHooks()方法執(zhí)行時間操作了最大限制
    if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
        return true;
    }
    // 現(xiàn)在時間與上個任務執(zhí)行完成的時間差小于quietPeriod時間,繼續(xù)檢測
    if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
        // Check if any tasks were added to the queue every 100ms.
        // TODO: Change the behavior of takeTask() so that it returns on timeout.
        wakeup(true);
        try {
            //內(nèi)部線程sleep 100ms
            Thread.sleep(100);
        } catch (InterruptedException e) {
            // Ignore
        }

        return false;
    }

    // No tasks were added for last quiet period - hopefully safe to shut down.
    // (Hopefully because we really cannot make a guarantee that there will be no execute() calls by a user.)
    return true;
}

這個WAKEUP_TASK什么也不做,為啥取名wakeup呢

private final Semaphore threadLock = new Semaphore(0);  

//threadLock的內(nèi)部permits設置為0,也就是說acquire()永遠獲取不到permit,會一直被阻塞著。
//那有什么用呢?另一種實現(xiàn)wait()/notify()。
SingleThreadEventExcutor

為什么需要AtomicInteger原子操作類

  • AtomicInteger 原子性類,對于Java中的運算操作,例如:自增或自減,若沒有進行額外的同步操作,在多線程環(huán)境下就是線程不安全的。num++解析為num=num+1,明顯,這個操作不具備原子性,多線程并發(fā)共享這個變量時必然會出現(xiàn)問題
  • num ++ 的原子性問題,num++的操作實際上分三個步驟"讀-改-寫"

int num = 10;
num =num++;//10

  • 臨時變量讀-改-寫

int temp = num;
num = num +1;
num = temp;

  • CAS
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容