注:(本例用submit實(shí)現(xiàn)會(huì)更簡(jiǎn)單,可以參見ThreadPoolExecutor execute 和 submit)
昨日上線了我的并發(fā)編程改造后的代碼,速率提升十分明顯,原本需要運(yùn)行將近30小時(shí)的任務(wù)縮短到十分之一(我開啟了最大十個(gè)并發(fā)),不過,雖然達(dá)到了提升速率的目的,但是結(jié)合日志我還是發(fā)現(xiàn)了兩個(gè)潛在Bug。上一篇傳送門:ThreadPoolExecutor + CountDownLatch 實(shí)際應(yīng)用
首先,我在線程池中捕獲了異常,并且記錄日志。
try {
// 模擬耗時(shí)
Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
} catch (InterruptedException e) {
// 實(shí)際應(yīng)用在此記錄日志
log.error("ThreadPoolExecutor error", e);
}
接著我在日志中發(fā)現(xiàn)的確有報(bào)錯(cuò),這種情況下,我應(yīng)該判斷任務(wù)運(yùn)行結(jié)果為失敗,然而數(shù)據(jù)庫中最終任務(wù)運(yùn)行狀態(tài)是成功。再一看代碼就明白了,因?yàn)槲抑慌袛嗔司€程池中子線程的運(yùn)行狀態(tài)是否完成,但是沒有判斷邏輯結(jié)果是不是對(duì)的,再加上線程內(nèi)異常被吞了,最后任務(wù)的狀態(tài)肯定是成功的,哪怕所有線程都報(bào)錯(cuò),結(jié)果也能輸出,只不過是錯(cuò)的。解決方法如下:
1、創(chuàng)建內(nèi)部類,作為狀態(tài)標(biāo)識(shí)變量(還可以用其他引用類型變量代替)
static class Flag {
private boolean f;
public Flag() {
f = true;// 初始設(shè)為true
}
public boolean isF() {
return f;
}
public void setF(boolean f) {
this.f = f;
}
}
2、初始化Flag
final Flag flag = new Flag();
3、在異常捕獲中設(shè)置為false
...
catch (Exception e) {
// 如果子線程報(bào)錯(cuò),狀態(tài)標(biāo)識(shí)為false
flag.setF(false);
}
4、最后判斷狀態(tài)是否為true
if (flag.isF()) {
// 打印計(jì)數(shù)
System.out.println("結(jié)束:" + totalRows.get());
} else {
System.out.println("有子線程報(bào)錯(cuò),結(jié)果不準(zhǔn)確");
}
另外,我還發(fā)現(xiàn)了一個(gè)潛在Bug。我的任務(wù)核心部分是在while 循環(huán)內(nèi)請(qǐng)求第三方api下載數(shù)據(jù),如果第三方返回結(jié)果為空,手動(dòng)break退出循環(huán),但是未在catch中退出。這樣就有問題了,一旦循環(huán)體內(nèi)拋出運(yùn)行時(shí)異常,代碼未執(zhí)行到判斷是否break時(shí)就被異常了,但是catch未執(zhí)行退出,將可能導(dǎo)致死循環(huán)。好比示例代碼中:
while (true) {
// 此處發(fā)送請(qǐng)求拉取數(shù)據(jù),一旦報(bào)錯(cuò),將跳過下面的break 判斷,而catch中未退出,將導(dǎo)致死循環(huán)
String result = download();
// 判斷是否break
if (result == null) {
break;
}
} catch (Exception e) {
e.printStackTrace();
// 未執(zhí)行退出
}
}
修改后
boolean loop = true;
while (loop) {
String result = download();
// 判斷是否break
if (result == null) {
loop = false;
}
} catch (Exception e) {
// 退出循環(huán)
loop = false;
}
}
完整示例(while循環(huán)內(nèi)用次數(shù)模擬和真實(shí)業(yè)務(wù)邏輯還是有區(qū)別的)
package com.yzy.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Main1 {
static class Flag {
private boolean f;
public Flag() {
f = true;
}
public boolean isF() {
return f;
}
public void setF(boolean f) {
this.f = f;
}
}
public static void main(String[] args) throws InterruptedException {
// 線程安全的計(jì)數(shù)器
AtomicInteger totalRows = new AtomicInteger(0);
// 創(chuàng)建線程池,其中核心線程10,也是我期望的最大并發(fā)數(shù),最大線程數(shù)和隊(duì)列大小都為30,即我的總?cè)蝿?wù)數(shù)
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
// 初始化CountDownLatch,大小為30
CountDownLatch countDownLatch = new CountDownLatch(30);
// 記錄狀態(tài)
final Flag flag = new Flag();
// 模擬遍歷參數(shù)集合
for (int i = 0; i < 30; i++) {
// 往線程池提交任務(wù)
executor.execute(new Runnable() {
@Override
public void run() {
int times = 0;
boolean loop = true;
// 模擬數(shù)據(jù)拉取過程可能需要分頁
while (loop) {
// 模擬每個(gè)任務(wù)需要分頁5次
if (times >= 5) {
break;
}
times++;
// 模擬計(jì)數(shù)
totalRows.incrementAndGet();
try {
// 模擬耗時(shí)
Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
} catch (Exception e) {
// 如果子線程報(bào)錯(cuò),退出循環(huán)
loop = false;
// 如果子線程報(bào)錯(cuò),狀態(tài)標(biāo)識(shí)為false
flag.setF(false);
}
}
// 子線程完成,countDownLatch執(zhí)行countDown
countDownLatch.countDown();
}
});
// 打印線程池運(yùn)行狀態(tài)
System.out.println("線程池中線程數(shù)目:" + executor.getPoolSize() + ",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:" +
executor.getQueue().size() + ",已執(zhí)行結(jié)束的任務(wù)數(shù)目:" + executor.getCompletedTaskCount());
}
// 標(biāo)記多線程關(guān)閉,但不會(huì)立馬關(guān)閉
executor.shutdown();
// 阻塞當(dāng)前線程,知道所有子線程都執(zhí)行countDown方法才會(huì)繼續(xù)執(zhí)行
countDownLatch.await();
// 打印線程池運(yùn)行狀態(tài)
System.out.println("線程池中線程數(shù)目:" + executor.getPoolSize() + ",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:" +
executor.getQueue().size() + ",已執(zhí)行結(jié)束的任務(wù)數(shù)目:" + executor.getCompletedTaskCount());
if (flag.isF()) {
// 打印計(jì)數(shù)
System.out.println("結(jié)束:" + totalRows.get());
} else {
System.out.println("有子線程報(bào)錯(cuò),結(jié)果不準(zhǔn)確");
}
}
}