java - ThreadPoolExecutor如何實(shí)現(xiàn)線程復(fù)用及超時(shí)銷毀

1. 線程復(fù)用

我們知道Thread.start執(zhí)行之后,線程就不能再次執(zhí)行了,那ThreadPoolExecutor是如何做到線程復(fù)用的呢?
原理很簡(jiǎn)單,在實(shí)際執(zhí)行的線程外部套一個(gè)Thread,外層Threadrun方法while循環(huán)執(zhí)行實(shí)際執(zhí)行線程的run方法,實(shí)現(xiàn)線程的復(fù)用并且執(zhí)行之后不銷毀。下面是偽代碼:

// 任務(wù)等待隊(duì)列
BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue();

new Thread(() -> {
    for (;;){
        Runnable runnable = taskQueue.poll();//隊(duì)列里拿
        runnable.run();//同步執(zhí)行
    }
}).start();// 異步while執(zhí)行

下面是ThreadPoolExecutor的重點(diǎn)代碼:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // 阻塞獲取等待任務(wù)
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run(); // 實(shí)際阻塞執(zhí)行
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

2. 線程銷毀

我們知道,在創(chuàng)建線程池的時(shí)候有超時(shí)參數(shù)keepAliveTime,那么線程池是如何實(shí)現(xiàn)精確的超時(shí)銷毀呢?
這個(gè)是結(jié)合BlockingQueue的阻塞超時(shí)來(lái)實(shí)現(xiàn)的,下面是源碼:

 /**
 * ...
 * @return task, or null if the worker must exit, in which case workerCount is decremented
 * 翻譯: 返回task,如果worker必須退出,則返回null,在這種情況下workerCount遞減
 */
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 重點(diǎn)在這,如果超時(shí)沒(méi)有獲取到任務(wù),則返回null,銷毀線程。
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

3. 為什么用BlockingQueue

  1. 獲取等待任務(wù)的時(shí)候,直接用阻塞代替通知輪詢,提高性能,減少代碼復(fù)雜度。
  2. 復(fù)用阻塞超時(shí)獲取等待任務(wù)實(shí)現(xiàn)線程超時(shí)銷毀,設(shè)計(jì)精巧。
  3. 本身就是支持并發(fā)操作的,不用額外維護(hù)線程安全。

參考

  1. 一. 線程池簡(jiǎn)介
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容