通過本文檔你將學習到
- 線程初始化的幾種方式。
- 線程的相關概念
- 線程池怎么玩
- 線程池的常見面試題
1 線程回顧知識
1.1 線程初始化的幾種方式
1)、繼承 Thread
2)、實現 Runnable 接口
3)、實現 Callable 接口 + FutureTask (可以拿到返回結果,可以處理異常)
4)、線程池
常見線程方式一
public static class Thread01 extends Thread {
@Override
public void run() {
System.out.println("當前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運行結果:" + i);
}
}
Thread thread = new Thread01();
thread.start();
常見線程方式二
public static class Runable01 implements Runnable {
@Override
public void run() {
System.out.println("當前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運行結果:" + i);
}
}
Runable01 runable01 = new Runable01();
new Thread(runable01).start();
說到上面兩種,大家非常非常的熟悉,就不多扯淡了,接下來,將引出第三種Callable。
Thread thread = new Thread01();
thread.start();
相信這個大家再熟悉不過了,剛入門的時候,就是這么學習的。接下來我們就打開API

https://www.matools.com/api/java8
Callable,帶返回值的
我們先看下自己超級熟悉的Thread() 構造方法,第一第二種方式都得創(chuàng)建Thread()
Thread()
分配一個新的 Thread對象。
Thread(Runnable target)
分配一個新的 Thread對象。
Thread(Runnable target, String name)
分配一個新的 Thread對象。
一般情況下我們都是使用 Thread(Runnable target, String name)吧
廢話不多說 要是有一個這構造方法 Thread(Callable target, String name) 那不是直接搞起么
那么說明,我們一拍腦袋半秒就能想到的問題,這sum Oracle公司的天才們都想不到,
他們就是蠢貨一個,馬上寫信給他們說,讓我來拯救JAVA代碼
那么接下來你有兩種做法,馬上發(fā)郵件告訴sum Oracle公司的天才們
1 我發(fā)現了BUG,你們沒有擴展一個方法 Thread(Callable target, String name)
2 自動誕生的時候天才們就想到了擴展方法,根本不用多余的加個構造方法,最后發(fā)現小丑竟然是自己
你覺得是哪一個呢?
回歸正題:本來我只能接受Runnable ,現在Callable 說,我也想勾搭下Thread,那么你準備怎么搞?
你想想搞定Thread,但是我不認識,那我就先搞定它周人的人,間接不就搞定了么?
你想想我們寫代碼的時候參數都是寫的list 然后傳遞list的子類ArrayList對吧。更底層的代碼直接傳遞的Collection,有時候大神寫代碼你看著更無語,直接傳遞一個接口,這個接口還沒有任何方法,繼承后也不用實現,這種混個身份,到時候可以透傳的,如果你這是無語了,。你要說這人是傻逼,到最后發(fā)現小丑竟是我自己對吧?因為我們能力還達不到人家的境界,無法理解。等你能力提高后,你才會發(fā)現人家為什么這么寫底層代碼?哦明白了,看著好自然,對,就該這么搞??粗a像大自然一樣自然
FutureTask 就是中間人了,幫你撮合
Thread(Callable target, String name) 沒有 這個構造,那么中間人過來
Thread(中間人 FutureTask , String name)
new Thread(futureTask , '"我的名字叫線程110號").star()
在此之前也得介紹下:Future
這個是一個接口,獲取異步任務的結果,取消任務的執(zhí)行,判斷任務是否被取消,判斷任務執(zhí)行是否完畢。
主線程讓子線程去執(zhí)行其它事情。Future接口的實現類FutureTask 。

JDK.5以后常見方式三:
public static class Callable01 implements Callable <Integer> {
@Override
public Integer call() throws Exception {
System.out.println("當前線程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("運行結果:" + i);
return I;
}
}
我啟動了1000個線程去跑對賬任務,現在編號999線程出錯了,你給我返回錯誤,我想看下為什么?
FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
new Thread(futureTask).start();
System.out.println(futureTask.get());
方式4 直接提交給線程池即可
public static ExecutorService executor = Executors.newFixedThreadPool(10);
// service.execute(new Runable01());
// Future<Integer> submit = service.submit(new Callable01());
// submit.get();
futureTask優(yōu)勢明顯呀,你看搭配線程池用的飛起,那么它又缺點嗎?
當然有呀,人無完人,代碼也有缺點
1、futureTask.get()阻塞 也可以傳遞x秒 等待幾秒
2、那我用isDone()方法。

這樣也不行呀,浪費cpu字段,你一直while true 要有個回調通知那多好呀。
一般情況下我們都是通過輪詢的方式獲取結果,進來不要阻塞。
那有沒有什么方式解決上面的缺點呢?
答案是肯定的,有的,我們不用這個類,我們有一個更好的選擇 CompletableFuture 這個比Future還能干,Future能干的CompletableFuture也可以,CompletableFuture還能干Future不能干的。爽不爽?
CompletableFuture基本介紹。
阻塞的方式和異步編程的設計理念相違背,而輪詢的方式會消耗無畏的CPU資源。因此,JDK8設計出CompletableFuture
public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
在Java 8中, Complet able Future提供了非常強大的Future的擴展功能, 可以幫助我們簡化異步編程的復雜性, 并且提供了函數式編程的能力, 可以通過回調的方式處理計算結果, 也提供了轉換和組合Complet able Future的方法。
它可能代表一個明確完成的Future, 也有可能代表一個完成階段(Completion Stage) , 它支持在計算完成以后觸發(fā)一些函數或執(zhí)行某些動作。
?核心的四個靜態(tài)方法
public static CompletableFuture<Void> runAsync(Runnable runnable
runAsync+線程池
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor)
supplyAsync 有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
supplyAsync+線程池
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,
Executor executor)
CompletableFuture優(yōu)點總結
異步任務結束時,會自動回調某個對象的方法;
主線程設置好后,不再關心異步任務的執(zhí)行,異步任務之間可以順序執(zhí)行
異步任務出錯時,會自動回調某個對象的方法
1.2 為什么要使用線程池
1、2、 不能得到返回值
3 可以得到返回值但是他們都不能控制資源
實際玩的過程中我們都用線程池 第四方式。
1.3 線程池的介紹
線程池做的工作主要是控制運行的線程的數量,處理過程中將任務加入隊列,然后在線程創(chuàng)建后啟動這些任務,如果線程超過了最大數量,超出的數量的線程排隊等候,等其他線程執(zhí)行完畢,再從隊列中取出任務來執(zhí)行.
他的主要特點為:線程復用:控制最大并發(fā)數:管理線程.
第一:降低資源消耗.通過重復利用自己創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗.
第二: 提高響應速度.當任務到達時,任務可以不需要等到線程和粗昂就愛你就能立即執(zhí)行.
第三: 提高線程的可管理性.線程是稀缺資源,如果無限的創(chuàng)建,不僅會消耗資源,還會較低系統(tǒng)的穩(wěn)定性,使用線程池可以進行統(tǒng)一分配,調優(yōu)和監(jiān)控.
1.4 線程池如何使用?
Java中的線程池是通過Executor框架實現的,該框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor這幾個類.

1.5 面試重點 說下常見的線程池唄
Executors.newFixedThreadPool(int)
執(zhí)行一個長期的任務,性能好很多
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
主要特點如下:
1.創(chuàng)建一個定長線程池,可控制線程的最大并發(fā)數,超出的線程會在隊列中等待.
2.newFixedThreadPool創(chuàng)建的線程池corePoolSize和MaxmumPoolSize是 相等的,它使用的的LinkedBlockingQueue
Executors.newSingleThreadExecutor()
一個任務一個線程執(zhí)行的任務場景
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
主要特點如下:
1.創(chuàng)建一個單線程化的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有任務都按照指定順序執(zhí)行.
2.newSingleThreadExecutor將corePoolSize和MaxmumPoolSize都設置為1,它使用的的LinkedBlockingQueue
Executors.newCachedThreadPool()
適用:執(zhí)行很多短期異步的小程序或者負載較輕的服務器
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
主要特點如下:
1.創(chuàng)建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則創(chuàng)建新線程.
2.newCachedThreadPool將corePoolSize設置為0MaxmumPoolSize設置為Integer.MAX_VALUE,它使用的是SynchronousQUeue,也就是說來了任務就創(chuàng)建線程運行,如果線程空閑超過60秒,就銷毀線程
1.6 面試重點 說下常見的線程池的七大參數唄
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
從源碼中可以看出,線程池的構造函數有7個參數,分別是corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler。下面會對這7個參數一一解釋。
一、corePoolSize 線程池核心線程大小
線程池中會維護一個最小的線程數量,即使這些線程處理空閑狀態(tài),他們也不會被銷毀,除非設置了allowCoreThreadTimeOut。這里的最小線程數量即是corePoolSize。
1.在創(chuàng)建了線程池后,當有請求任務來之后,就會安排池中的線程去執(zhí)行請求任務,近視理解為今日當值線程
2.當線程池中的線程數目達到corePoolSize后,就會把到達的任務放入到緩存隊列當中.
二、maximumPoolSize 線程池最大線程數量
一個任務被提交到線程池以后,首先會找有沒有空閑存活線程,如果有則直接將任務交給這個空閑線程來執(zhí)行,如果沒有則會緩存到工作隊列(后面會介紹)中,如果工作隊列滿了,才會創(chuàng)建一個新線程,然后從工作隊列的頭部取出一個任務交由新線程來處理,而將剛提交的任務放入工作隊列尾部。線程池不會無限制的去創(chuàng)建新線程,它會有一個最大線程數量的限制,這個數量即由maximunPoolSize指定。
三、keepAliveTime 空閑線程存活時間
一個線程如果處于空閑狀態(tài),并且當前的線程數量大于corePoolSize,那么在指定時間后,這個空閑線程會被銷毀,這里的指定時間由keepAliveTime來設定
默認情況下:
只有當線程池中的線程數大于corePoolSize時keepAliveTime才會起作用,知道線程中的線程數不大于corepoolSIze,
四、unit 空閑線程存活時間單位
keepAliveTime的計量單位
五、workQueue 工作隊列
新任務被提交后,會先進入到此工作隊列中,任務調度時再從隊列中取出任務。jdk中提供了四種工作隊列:
①ArrayBlockingQueue
基于數組的有界阻塞隊列,按FIFO排序。新任務進來后,會放到該隊列的隊尾,有界的數組可以防止資源耗盡問題。當線程池中線程數量達到corePoolSize后,再有新任務進來,則會將任務放入該隊列的隊尾,等待被調度。如果隊列已經是滿的,則創(chuàng)建一個新線程,如果線程數量已經達到maxPoolSize,則會執(zhí)行拒絕策略。
②LinkedBlockingQuene
基于鏈表的無界阻塞隊列(其實最大容量為Interger.MAX),按照FIFO排序。由于該隊列的近似無界性,當線程池中線程數量達到corePoolSize后,再有新任務進來,會一直存入該隊列,而不會去創(chuàng)建新線程直到maxPoolSize,因此使用該工作隊列時,參數maxPoolSize其實是不起作用的。
③SynchronousQuene
一個不緩存任務的阻塞隊列,生產者放入一個任務必須等到消費者取出這個任務。也就是說新任務進來時,不會緩存,而是直接被調度執(zhí)行該任務,如果沒有可用線程,則創(chuàng)建新線程,如果線程數量達到maxPoolSize,則執(zhí)行拒絕策略。
④PriorityBlockingQueue
具有優(yōu)先級的無界阻塞隊列,優(yōu)先級通過參數Comparator實現。
六、threadFactory 線程工廠
創(chuàng)建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等
七、handler 拒絕策略
當工作隊列中的任務已到達最大限制,并且線程池中的線程數量也達到最大限制,這時如果有新任務提交進來,該如何處理呢。這里的拒絕策略,就是解決這個問題的,jdk中提供了4中拒絕策略:
AbortPolicy(默認):直接拋出RejectedException異常阻止系統(tǒng)正常運行
CallerRunPolicy:"調用者運行"一種調節(jié)機制,該策略既不會拋棄任務,也不會拋出異常,而是將任務分給調用線程來執(zhí)行
DiscardOldestPolicy:拋棄隊列中等待最久的任務,然后把當前任務加入隊列中嘗試再次提交
DiscardPolicy:直接丟棄任務,不予任何處理也不拋出異常.如果允許任務丟失,這是最好的拒絕策略
以上內置策略均實現了RejectExecutionHandler接口
一句圖總結中心思想:面試必問

1.7 坑你沒商量
你在工作中單一的/固定數的/可變你的三種創(chuàng)建線程池的方法,你用哪個多?超級大坑
答案是一個都不用,我們生產上只能使用自定義的
Executors中JDK給你提供了為什么不用?
【強制】線程資源必須通過線程池提供,不允許在應用中自行顯式創(chuàng)建線程。
說明:使用線程池的好處是減少在創(chuàng)建和銷毀線程上所消耗的時間以及系統(tǒng)資源的開銷,解決資源不足的問題。如果不使用線程池,有可能造成系統(tǒng)創(chuàng)建大量同類線程而導致消耗完內存或者“過度切換”的問題。
【強制】線程池不允許使用Executors去創(chuàng)建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規(guī)則,規(guī)避資源耗盡的風險。說明:Executors返回的線程池對象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM。
2)CachedThreadPool和ScheduledThreadPool:允許的創(chuàng)建線程數量為Integer.MAX_VALUE,可能會創(chuàng)建大量的線程,從而導致OOM。
1.7 你在工作中是如何創(chuàng)建線程池的,是否自定義過線程池使用
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(3),
Executors.defaultThreadFactory(),
//默認拋出異常
//new ThreadPoolExecutor.AbortPolicy()
//回退調用者
//new ThreadPoolExecutor.CallerRunsPolicy()
//處理不來的不處理
//new ThreadPoolExecutor.DiscardOldestPolicy()
new ThreadPoolExecutor.DiscardPolicy()
);
//模擬10個用戶來辦理業(yè)務 沒有用戶就是來自外部的請求線程.
try {
for (int i = 1; i <= 10; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 辦理業(yè)務");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
//threadPoolInit();
}
private static void threadPoolInit() {
/**
* 一池5個處理線程
*/
//ExecutorService threadPool= Executors.newFixedThreadPool(5);
/**
* 一池一線程
*/
//ExecutorService threadPool= Executors.newSingleThreadExecutor();
/**
* 一池N線程
*/
ExecutorService threadPool = Executors.newCachedThreadPool();
//模擬10個用戶來辦理業(yè)務 沒有用戶就是來自外部的請求線程.
try {
for (int i = 1; i <= 20; i++) {
threadPool.execute(() -> {
System.out.println(Thread.currentThread().getName() + "\t 辦理業(yè)務");
});
try {
TimeUnit.MICROSECONDS.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
此處看下RMQ的源碼以及自己項目中的使用情況
1.7 合理配置線程池你是如何考慮的?

CPU 密集型
核心數+1。
但是為什么要加一呢?《Java并發(fā)編程實戰(zhàn)》一書中給出的原因是:即使當計算(CPU)密集型的線程偶爾由于頁缺失故障或者其他原因而暫停時,這個“額外”的線程也能確保 CPU 的時鐘周期不會被浪費。
看不懂是不是?沒關系我也看不懂。反正把它理解為一個備份的線程就行了。
IO密集型
cpu*2

既是IO密集型也是CPU密集型
合理分配即可
以上都是面試答案,但是
線程池使用面臨的核心的問題在于:線程池的參數并不好配置。
一方面線程池的運行機制不是很好理解,配置合理需要強依賴開發(fā)人員的個人經驗和知識;
另一方面,線程池執(zhí)行的情況和任務類型相關性較大,IO密集型和CPU密集型的任務運行起來的情況差異非常大。
這導致業(yè)界并沒有一些成熟的經驗策略幫助開發(fā)人員參考。
https://blog.csdn.net/qq_41893274/article/details/112424767
1.8數據賞析
RMQ的BrokerController類等等
//5、啟動定時調度,每10秒鐘掃描所有Broker,檢查存活狀態(tài)
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"NSScheduledThread"));
this.scheduledExecutorService.scheduleAtFixedRate(() -> NamesrvController.this.routeInfoManager.scanNotActiveBroker(), 5, 10, TimeUnit.SECONDS);
//不是說不能用嗎?你還用?為什么?
//周期性向nameserv發(fā)心跳,如果有變化則同步broker信息
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl(
"BrokerControllerScheduledThread"));
this.scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}, initialDelay, period, TimeUnit.MILLISECONDS);
Broker向所有的nameServer注冊自己的戶口本信息。
private BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList) {
brokerOuterExecutor.execute(() -> {
try {
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
if (result != null) {
registerBrokerResultList.add(result);
}
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
} catch (Exception e) {
log.warn("registerBroker Exception, {}", namesrvAddr, e);
} finally {
countDownLatch.countDown();
}
});
}
try {
countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}