使用線程池時一定要注意的五個點(diǎn)

一、使用線程池在流量突發(fā)期間能夠平滑地服務(wù)降級

很多場景下應(yīng)用程序必須能夠處理一系列傳入請求,簡單的處理方式是通過一個線程順序的處理這些請求,如下圖:

單線程策略的優(yōu)勢和劣勢都非常明顯:

優(yōu)勢:設(shè)計(jì)和實(shí)現(xiàn)簡單;劣勢:這種方式會帶來處理效率的問題,單線程的處理能力是有限,不能發(fā)揮多核處理器優(yōu)勢。

在這種場景下我們就需要考慮并發(fā),一個簡單的并發(fā)策略就是Thread-Per-Message模式,即為每個請求使用一個新的線程。

Thread-Per-Message策略的優(yōu)勢和劣勢也非常明顯:

優(yōu)勢:設(shè)計(jì)和實(shí)現(xiàn)比較簡單,能夠同時處理多個請求,提升響應(yīng)效率;

劣勢:主要在兩個方面

1.資源消耗 引入了在串行執(zhí)行中所沒有的開銷,包括線程創(chuàng)建和調(diào)度,任務(wù)處理,資源分配和回收以及頻繁上下文切換所需的時間和資源。2.安全

  • 攻擊者可以通過一次進(jìn)行大量請求使系統(tǒng)癱瘓并且拒絕服務(wù) (DoS),從而導(dǎo)致系統(tǒng)立即不響應(yīng)而不是平滑地退出。
  • 從安全角度來看,一個組件可能由于連續(xù)的錯誤而耗盡所有資源,因此使所有其他組件無法獲得資源。

有沒有一種方式可以并發(fā)執(zhí)行又可以克服Thread-Per-Message的問題?

采用線程池的策略,線程池通過控制并發(fā)執(zhí)行的工作線程的最大數(shù)量來解決Thread-Per-Message帶來的問題??梢娤聢D,請求來臨時先放入線程池的隊(duì)列

線程池可以接受一個Runnable或Callable<T>任務(wù),并將其存儲在臨時隊(duì)列中,當(dāng)有空閑線程時可以從隊(duì)列中拿到一個任務(wù)并執(zhí)行。

反例(使用 Thread-Per-Message 策略)

class Helper {
    public void handle(Socket socket) {
        // do something
    }
}

final class RequestHandler {
    private final Helper helper = new Helper();

    //......
    private RequestHandler(int port) throws IOException {
        //do something
    }

    public void handleRequest() {
        new Thread(new Runnable() {
            public void run() {
                try {
                    helper.handle(server.accept());
                } catch (IOException e) {
                    // Forward to handler
                }
            }
        }).start();
    }
}

正例(使用 線程池 策略)

class Helper {
    public void handle(Socket socket) {
        // do something
    }
}

final class RequestHandler {
    private final Helper helper = new Helper();
    private final ServerSocket server;
    private final ExecutorService exec;

    private RequestHandler(int port, int poolSize) throws IOException {
        server = new ServerSocket(port);
        exec = Executors.newFixedThreadPool(poolSize);
    }

    public static RequestHandler newInstance(int poolSize) throws IOException {
        return new RequestHandler(0, poolSize);
    }

    public void handleRequest() {
        Future<?> future = exec.submit(new Runnable() {
            @Override
            public void run() {
                try {
                    helper.handle(server.accept());
                } catch (IOException e) {
                    // Forward to handler
                }
            }
        });
    }
    // ... Other methods such as shutting down the thread pool
    // and task cancellation ...
}

JAVA 中(JDK 1.5+)線程池的種類:

  • newFixedThreadPool()
  • newCachedThreadPool()
  • newSingleThreadExecutor()
  • newScheduledThreadPool()線程池的詳細(xì)使用方法可參見Java API文檔

二、不要在有界線程池中執(zhí)行相互依賴的任務(wù)

程序不能使用來自有界線程池的線程來執(zhí)行依賴于線程池中其他任務(wù)的任務(wù)。

有兩個場景:

  1. 當(dāng)線程池中正在執(zhí)行的線程阻塞在依賴于線程池中其他任務(wù)的完成上,這樣就會出現(xiàn)稱為線程饑餓(threadstarvation)死鎖的死鎖形式。
  2. 線程饑餓死鎖還會發(fā)生在當(dāng)前執(zhí)行的任務(wù)向線程池提交其他任務(wù)并等待這些任務(wù)完成的時候,然而此時線程池缺乏一次容納所有任務(wù)的能力。

要緩解上面兩個場景產(chǎn)生的問題有兩個簡單的辦法:

  1. 擴(kuò)大線程池中的線程數(shù),以容納更多的任務(wù),但 決定一個線程池合適的大小可能是困難的甚至不可能的。
  2. 線程池中的隊(duì)列改為無界,由于系統(tǒng)資源有限,無界隊(duì)列只能說是盡可能容納任務(wù) 但饑餓死鎖的現(xiàn)象無法消除。

真正解決此類方法還是需要梳理線程池執(zhí)行業(yè)務(wù)流程,不要在有界線程池中執(zhí)行相互依賴的任務(wù),防止出現(xiàn)競爭和死鎖。

三、確保提交到線程池的任務(wù)可中斷

向線程池提交的任務(wù)需要支持中斷。從而保證線程可以中斷,線程池可以關(guān)閉。線程池支持 java.util.concurrent.ExecutorService.shutdownNow() 方法,該方法嘗試停止所有正在執(zhí)行的任務(wù),停止等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)的列表。

但是 shutdownNow() 除了盡力嘗試停止處理主動執(zhí)行的任務(wù)之外不能保證一定能夠停止。例如,典型的實(shí)現(xiàn)是通過Thread.interrupt()來停止,因此任何未能響應(yīng)中斷的任務(wù)可能永遠(yuǎn)不會終止,也就造成線程池?zé)o法真正的關(guān)閉。

反例:

public final class Worker implements Runnable { // Thread‐safe class
    private AtomBoolean flag = new AtomBoolean(true);

    public Worker() throws IOException {
        //do something
    }

    // Only one thread can use the socket at a particular time
    @Override
    public void run() {
        try {
            while (flag.get()) {
                // do something
            }
        } catch (IOException ie) {
            // Forward to handler
        }
    }

    public void shutdown() {
        this.flag.set(false);
    }
}

正例:

public final class Worker implements Runnable { // Thread‐safe class
    public Worker() throws IOException {
        //do something
    }

    // Only one thread can use the socket at a particular time
    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                // do something
            }
        } catch (IOException ie) {
            // Forward to handler
        }
    }
}

四、確保在線程池中執(zhí)行的任務(wù)不能悄無聲息地失敗

線程池中的所有任務(wù)必須提供機(jī)制,如果它們異常終止,則需要通知應(yīng)用程序.

如果不這樣做不會導(dǎo)致資源泄漏,但由于池中的線程仍然被會重復(fù)使用,使故障診斷非常困難或不可能。

在應(yīng)用程序級別處理異常的最好方法是使用異常處理。異常處理可以執(zhí)行診斷操作,清理和關(guān)閉Java虛擬機(jī),或者只是記錄故障的詳細(xì)信息。

也就是說在線程池里執(zhí)行的任務(wù)也需要能夠拋出異常并被捕獲處理。

任務(wù)恢復(fù)或清除操作可以通過重寫 java.util.concurrent.ThreadPoolExecutor 類的 afterExecute() 鉤子來執(zhí)行。

當(dāng)任務(wù)通過執(zhí)行其 run() 方法中的所有語句并且成功結(jié)束任務(wù),或者由于異常而導(dǎo)致任務(wù)停止時,將調(diào)用此鉤子。

可以通過自定義 ThreadPoolExecutor 服務(wù)來重載 afterExecute()鉤子。

還可以通過重載 terminated() 方法來釋放線程池獲取的資源,就像一個finally塊。

反例:

final class PoolService {
    private final ExecutorService pool = Executors.newFixedThreadPool(10);

    public void doSomething() {
        pool.execute(new Task());
    }
}

final class Task implements Runnable {
    @Override
    public void run() {
        // do something
        throw new NullPointerException();
    }
}

任務(wù)意外終止時作為一個運(yùn)行時異常,無法通知應(yīng)用程序。此外,它缺乏恢復(fù)機(jī)制。因此,如果Task拋出一個NullPointerException ,異常將被忽略。

正例:

class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    // ... Constructor ...
    public CustomThreadPoolExecutor(
        int corePoolSize, int maximumPoolSize, long keepAliveTime,
                TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                    super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
                }
    @Override
    public void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            // Exception occurred, forward to handler
        }
            // ... Perform task‐specific cleanup actions
        }
    @Override
    public void terminated() {
        super.terminated();
        // ... Perform final clean‐up actions
    }
}

另外一種方式是使用 ExecutorService.submit() 方法(代替 execute() 方法)將任務(wù)提交到線程池并獲取 Future 對象。

當(dāng)通過 ExecutorService.submit() 提交任務(wù)時,拋出的異常并未到達(dá)未捕獲的異常處理機(jī)制,因?yàn)閽伋龅漠惓1徽J(rèn)為是返回狀態(tài)的一部分,因此被包裝在ExecutionException ,并由Future.get() 返回。

Future<?> future = threadPool.submit(new Task());
try {
    future.get();
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();  // Reset interrupted status
} catch (ExecutionException e) {
    Throwable exception = e.getCause();
    // Forward to exception reporter
}

五、確保在使用線程池時重新初始化ThreadLocal變量

java.lang.ThreadLocal 類提供線程內(nèi)的本地變量。根據(jù)Java API

這些變量與其它正常變量不同,每個線程訪問(通過其get或set方法)都有其屬于各自線程的,獨(dú)立初始化的變量拷貝。ThreadLocal實(shí)例通常是一些希望將狀態(tài)與線程(例如,用戶ID或事務(wù)ID)相關(guān)聯(lián)的類中的私有靜態(tài)字段。

ThreadLocal對象需要關(guān)注那些對象被線程池中的多個線程執(zhí)行的類。

線程池緩存技術(shù)允許線程重用以減少線程創(chuàng)建開銷,或者當(dāng)創(chuàng)建無限數(shù)量的線程時可以降低系統(tǒng)的可靠性。

當(dāng) ThreadLocal 對象在一個線程中被修改,隨后變得可重用時,在重用的線程上執(zhí)行的下一個任務(wù)將能看到該線程上執(zhí)行過的上一個任務(wù)修改的ThreadLocal 對象的狀態(tài)。

所以要在使用線程池時重新初始化的ThreadLocal對象實(shí)例。

反例:

public enum Day {
    MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY, SATURDAY, SUNDAY;
}

public final class Diary {
    private static final ThreadLocal<Day> days = new ThreadLocal<Day>() {
        // Initialize to Monday
        protected Day initialValue() {
            return Day.MONDAY;
        }
    };
    private static Day currentDay() {
        return days.get();
    }
    public static void setDay(Day newDay) {
        days.set(newDay);
    }
    // Performs some thread‐specific task
    public void threadSpecificTask() {
        // Do task ...
    }
}

public final class DiaryPool {
    final int numOfThreads = 2; // Maximum number of threads allowed in pool
    final Executor exec;
    final Diary diary;

    DiaryPool() {
        exec = (Executor) Executors.newFixedThreadPool(numOfThreads);
        diary = new Diary();
    }

    public void doSomething1() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                diary.setDay(Day.FRIDAY);
                diary.threadSpecificTask();
            }
        });
    }

    public void doSomething2() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                diary.threadSpecificTask();
            }
        });
    }

    public static void main(String[] args) {
        DiaryPool dp = new DiaryPool();
        dp.doSomething1(); // Thread 1, requires current day as Friday
        dp.doSomething2(); // Thread 2, requires current day as Monday
        dp.doSomething2(); // Thread 3, requires current day as Monday
    }
}

DiaryPool類創(chuàng)建了一個線程池,它可以通過一個共享的無界的隊(duì)列來重用固定數(shù)量的線程。

在任何時候,不超過numOfThreads個線程正在處理任務(wù)。如果在所有線程都處于活動狀態(tài)時提交其他任務(wù),則 它們在隊(duì)列中等待,直到線程可用。

當(dāng)線程循環(huán)時,線程的線程局部狀態(tài)仍然存在。

下表顯示了可能的執(zhí)行順序:

時間任務(wù)線程池提交方法日期1t11doSomething1()星期五2t22doSomething2()星期一3t31doSomething3()星期五

在這個執(zhí)行順序中,期望從doSomething2() 開始的兩個任務(wù)( t 2和t 3 doSomething2() 將當(dāng)天視為星 期一。然而,因?yàn)槌鼐€程1被重用,所以t 3觀察到星期五。

解決方案(try-finally條款)

符合規(guī)則的方案removeDay() 方法添加到Diary類,并在try‐finally 塊中的實(shí)現(xiàn)doSomething1() 類的doSomething1() 方法的語句。finally 塊通過刪除當(dāng)前線程中的值來恢復(fù)threadlocal類型的days對象的初始狀態(tài)。

public final class Diary {
    // ...
    public static void removeDay() {
        days.remove();
    }
}

public final class DiaryPool {
    // ...
    public void doSomething1() {
        exec.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    Diary.setDay(Day.FRIDAY);
                    diary.threadSpecificTask();
                } finally {
                    Diary.removeDay(); // Diary.setDay(Day.MONDAY)
                    // can also be used
                }
            }
        });
    }
// ...
}

如果threadlocal變量再次被同一個線程讀取,它將使用initialValue()方法重新初始化 ,除非任務(wù)已經(jīng)明確設(shè)置了變量的值。這個解決方案將維護(hù)的責(zé)任轉(zhuǎn)移到客戶端( DiaryPool ),但是當(dāng)Diary類不能被修改時是一個好的選擇。

解決方案(beforeExecute())

使用一個自定義ThreadPoolExecutor 來擴(kuò)展 ThreadPoolExecutor并覆蓋beforeExecute() 方法。beforeExecute() 方法在Runnable 任務(wù)在指定線程中執(zhí)行之前被調(diào)用。該方法在線程 “t” 執(zhí)行任務(wù) “r” 之前重新初始化 threadlocal 變量。

class CustomThreadPoolExecutor extends ThreadPoolExecutor {
    public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                    long keepAliveTime, TimeUnit unit,
                                    BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void beforeExecute(Thread t, Runnable r) {
        if (t == null || r == null) {
            throw new NullPointerException();
        }
        Diary.setDay(Day.MONDAY);
        super.beforeExecute(t, r);
    }
}

public final class DiaryPool {
    // ...
    DiaryPool() {
        exec = new CustomThreadPoolExecutor(NumOfthreads, NumOfthreads,
                10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
        diary = new Diary();
    }
    // ...
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容