準(zhǔn)備
保證寫入時(shí),線程安全的List和Set結(jié)構(gòu)?
ConcurrentHashMap是線程安全的HashMap,CopyOnWriteArrayList是線程安全的ArrayList。
CopyOnWriteArraySet是線程安全的HashSet。考慮多線程處理任務(wù)點(diǎn)?
任務(wù)的類型: 計(jì)算能力復(fù)雜,IO操作;
任務(wù)是否異步: 同步,異步
每個(gè)子線程是否有依賴關(guān)系:有,沒有 (例如:使用多線程處理 從1累加到1萬(wàn))
如果是多個(gè)任務(wù),每個(gè)任務(wù)使用多線程處理,主線程需要等待子線程任務(wù)執(zhí)行完畢之后在進(jìn)行執(zhí)行: 在沒有CompletableFuture 使用CountDownLatch解決
(https://blog.csdn.net/qq_38599840/article/details/120708245)
使用CompletableFuture處理依賴任務(wù)
- 線程池的方式并行獲取數(shù)據(jù)弊端?
CPU資源大量浪費(fèi)在阻塞等待上,導(dǎo)致CPU資源利用率低。在Java 8之前,一般會(huì)通過回調(diào)的方式來(lái)減少阻塞,但是大量使用回調(diào),又引發(fā)臭名昭著的回調(diào)地獄問題,導(dǎo)致代碼可讀性和可維護(hù)性大大降低。
為了增加并發(fā)度,會(huì)引入更多額外的線程池,隨著CPU調(diào)度線程數(shù)的增加,會(huì)導(dǎo)致更嚴(yán)重的資源爭(zhēng)用,寶貴的CPU資源被損耗在上下文切換上,而且線程本身也會(huì)占用系統(tǒng)資源,且不能無(wú)限增加。
同步模型下,會(huì)導(dǎo)致硬件資源無(wú)法充分利用,系統(tǒng)吞吐量容易達(dá)到瓶頸。
- 保證了多線程寫入時(shí)安全,咋樣使用什么樣的多線程呢?
使用executor.submit(() -> {})處理RPC任務(wù);
使用future處理每一個(gè)子線程是否 終端還是跳過 邏輯
public class Test3 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
.setNameFormat("coustomThread %d")
.setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕獲到:" +t.getName()+"發(fā)生異常:"+e.getMessage()))
.build());
public static void main(String[] args) {
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6);//線程不安全List
List<String> test = test(list);
System.out.println("主線程:" + Thread.currentThread().getName() + ":" + test);
}
public static List<String> test(List<Integer> list) {
List<String> safeList = new CopyOnWriteArrayList();//線程安全的list
list.stream().map(i -> executor.submit(() -> {
//模擬rpc
try {
Thread.sleep(100);
if(i == 3) {
int j = i / 0;
}
safeList.add(i + "A");
System.out.println("子線程:" + Thread.currentThread().getName() + "返回參數(shù):" + i + "A");
return i + "A";
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})).collect(Collectors.toList()).forEach(future -> {
try {
if (future != null) {
Object o = future.get();
// System.out.println("子線程:" + Thread.currentThread().getName() + "返回參數(shù):" + o);
}
} catch (InterruptedException e) {
//如果不拋出異常,那么線程執(zhí)行不會(huì)終端;反之,如果拋出異常,則線程中斷
// throw new RuntimeException(e);
} catch (ExecutionException e) {
// throw new RuntimeException(e);
}
});
System.out.println(123);
return safeList;
}
``
4. 使用CountDownLatch主線程需要等待子線程任務(wù)執(zhí)行完畢之后在進(jìn)行執(zhí)行?
```java
public class Test4 {
static ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100), new ThreadFactoryBuilder()
.setNameFormat("coustomThread %d")
.setUncaughtExceptionHandler((t,e) -> System.out.println("UncaughtExceptionHandler捕獲到:" +t.getName()+"發(fā)生異常:"+e.getMessage()))
.build());
public static void main(String[] args) throws Exception{
System.out.println("主線程開始:" + Thread.currentThread().getName());
CountDownLatch downLatch = new CountDownLatch(2);
//任務(wù)1
AtomicReference<String> q1 = new AtomicReference<>("");//線程安全
Future<String> future1 = executor.submit(() -> {
try {
Thread.sleep(1500);
q1.set("任務(wù)1");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
// downLatch.countDown();
return "任務(wù)1";
});
//任務(wù)2
AtomicReference<String> q2 = new AtomicReference<>("");//線程安全
Future<String> future2 = executor.submit(() -> {
try {
Thread.sleep(300);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
q2.set("任務(wù)2" + q1.get());
// downLatch.countDown();
return "任務(wù)2" + q1.get();
});
// downLatch.await();
Thread.sleep(600);
System.out.println("獲取任務(wù)1返回:" + future1.get());
System.out.println("獲取任務(wù)2返回:" + future2.get());
System.out.println("主線程結(jié)束:" + Thread.currentThread().getName());
}
/*
* 主線程開始:main
獲取任務(wù)1返回:任務(wù)1
獲取任務(wù)2返回:任務(wù)2任務(wù)1
主線程結(jié)束:main
* */
}
CompletableFuture
實(shí)例
public void testCompletableInfo() throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
//調(diào)用用戶服務(wù)獲取用戶基本信息
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() ->
//模擬查詢商品耗時(shí)500毫秒
{
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "用戶A";
});
//調(diào)用商品服務(wù)獲取商品基本信息
CompletableFuture<String> goodsFuture = CompletableFuture.supplyAsync(() ->
//模擬查詢商品耗時(shí)500毫秒
{
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "商品A";
});
System.out.println("獲取用戶信息:" + userFuture.get());
System.out.println("獲取商品信息:" + goodsFuture.get());
//模擬主程序耗時(shí)時(shí)間
Thread.sleep(600);
System.out.println("總共用時(shí)" + (System.currentTimeMillis() - startTime) + "ms");
}
相關(guān)的方法使用
public class TestFuture1 {
public static void main(String[] args) throws InterruptedException, ExecutionException {
long startTime = System.currentTimeMillis();
//自定義線程池
ExecutorService executor = Executors.newFixedThreadPool(5);
//cf1 cf2是0依賴
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> "cf1", executor);
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("cf2");
//CF3,CF5分別依賴于CF1和CF2,一元依賴
CompletableFuture<String> cf3 = cf1.thenApply(res1 -> "cf3");
CompletableFuture<String> cf5 = cf2.thenApply(res2 -> "cf5");
//cf4 依賴 cf1和cf2, 二元依賴
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (res1, res2) -> "cf4");
//cf6 依賴 cf3, cf4, cf5, 多元依賴
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
//最終結(jié)果
CompletableFuture<String> result = cf6.thenApply(v -> "cf6");
// //模擬主程序耗時(shí)時(shí)間
// Thread.sleep(600);
// System.out.println("總共用時(shí)" + (System.currentTimeMillis() - startTime) + "ms");
}
}
注意事項(xiàng)
- fulture需要返回值,才能獲取異常信息
Future需要獲取返回值,才能獲取到異常信息。如果不加 get()/join()方法,看不到異常信息。
小伙伴們使用的時(shí)候,注意一下哈,考慮是否加try...catch...或者使用exceptionally方法。
CompletableFuture.get()方法是阻塞的(CompletableFuture.get(5, TimeUnit.SECONDS);)
CompletableFuture的get()方法是阻塞的,如果使用它來(lái)獲取異步調(diào)用的返回值,需要添加超時(shí)時(shí)間。不建議使用默認(rèn)的線程池(ForkJoinPool中的共用線程池CommonPool(CommonPool的大小是CPU核數(shù)-1,如果是IO密集的應(yīng)用,線程數(shù)可能成為瓶頸)。)
CompletableFuture代碼中又使用了默認(rèn)的「ForkJoin線程池」,處理的線程個(gè)數(shù)是電腦「CPU核數(shù)-1」。在大量請(qǐng)求過來(lái)的時(shí)候,處理邏輯復(fù)雜的話,響應(yīng)會(huì)很慢。一般建議使用自定義線程池,優(yōu)化線程池配置參數(shù);
前面提到,異步回調(diào)方法可以選擇是否傳遞線程池參數(shù)Executor,這里我們建議強(qiáng)制傳線程池,且根據(jù)實(shí)際情況做線程池隔離。
當(dāng)不傳遞線程池時(shí),會(huì)使用ForkJoinPool中的公共線程池CommonPool,這里所有調(diào)用將共用該線程池,核心線程數(shù)=處理器數(shù)量-1(單核核心線程數(shù)為1),所有異步回調(diào)都會(huì)共用該CommonPool,核心與非核心業(yè)務(wù)都競(jìng)爭(zhēng)同一個(gè)池中的線程,很容易成為系統(tǒng)瓶頸。手動(dòng)傳遞線程池參數(shù)可以更方便的調(diào)節(jié)參數(shù),并且可以給不同的業(yè)務(wù)分配不同的線程池,以求資源隔離,減少不同業(yè)務(wù)之間的相互干擾。
- 自定義線程池,注意飽和策略
CompletableFuture的get()方法是阻塞的,我們一般建議使用future.get(5, TimeUnit.SECONDS)。并且一般建議使用自定義線程池。
但是如果線程池拒絕策略是DiscardPolicy或者DiscardOldestPolicy,當(dāng)線程池飽和時(shí),會(huì)直接丟棄任務(wù),不會(huì)拋棄異常。因此建議,CompletableFuture線程池策略最好使用AbortPolicy,然后耗時(shí)的異步線程,做好線程池隔離哈。