ThreadPoolExecutor + CountDownLatch 實(shí)際應(yīng)用補(bǔ)充

注:(本例用submit實(shí)現(xiàn)會(huì)更簡(jiǎn)單,可以參見ThreadPoolExecutor execute 和 submit

昨日上線了我的并發(fā)編程改造后的代碼,速率提升十分明顯,原本需要運(yùn)行將近30小時(shí)的任務(wù)縮短到十分之一(我開啟了最大十個(gè)并發(fā)),不過,雖然達(dá)到了提升速率的目的,但是結(jié)合日志我還是發(fā)現(xiàn)了兩個(gè)潛在Bug。上一篇傳送門:ThreadPoolExecutor + CountDownLatch 實(shí)際應(yīng)用

首先,我在線程池中捕獲了異常,并且記錄日志。

try {
    // 模擬耗時(shí)
    Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
} catch (InterruptedException e) {
    // 實(shí)際應(yīng)用在此記錄日志
    log.error("ThreadPoolExecutor error", e);
}

接著我在日志中發(fā)現(xiàn)的確有報(bào)錯(cuò),這種情況下,我應(yīng)該判斷任務(wù)運(yùn)行結(jié)果為失敗,然而數(shù)據(jù)庫中最終任務(wù)運(yùn)行狀態(tài)是成功。再一看代碼就明白了,因?yàn)槲抑慌袛嗔司€程池中子線程的運(yùn)行狀態(tài)是否完成,但是沒有判斷邏輯結(jié)果是不是對(duì)的,再加上線程內(nèi)異常被吞了,最后任務(wù)的狀態(tài)肯定是成功的,哪怕所有線程都報(bào)錯(cuò),結(jié)果也能輸出,只不過是錯(cuò)的。解決方法如下:

1、創(chuàng)建內(nèi)部類,作為狀態(tài)標(biāo)識(shí)變量(還可以用其他引用類型變量代替)
static class Flag {
    private boolean f;

    public Flag() {
        f = true;// 初始設(shè)為true
    }

    public boolean isF() {
        return f;
    }

    public void setF(boolean f) {
        this.f = f;
    }
}
2、初始化Flag
final Flag flag = new Flag();
3、在異常捕獲中設(shè)置為false
...
catch (Exception e) {
    // 如果子線程報(bào)錯(cuò),狀態(tài)標(biāo)識(shí)為false
    flag.setF(false);
}
4、最后判斷狀態(tài)是否為true
if (flag.isF()) {
    // 打印計(jì)數(shù)
    System.out.println("結(jié)束:" + totalRows.get());
} else {
    System.out.println("有子線程報(bào)錯(cuò),結(jié)果不準(zhǔn)確");
}

另外,我還發(fā)現(xiàn)了一個(gè)潛在Bug。我的任務(wù)核心部分是在while 循環(huán)內(nèi)請(qǐng)求第三方api下載數(shù)據(jù),如果第三方返回結(jié)果為空,手動(dòng)break退出循環(huán),但是未在catch中退出。這樣就有問題了,一旦循環(huán)體內(nèi)拋出運(yùn)行時(shí)異常,代碼未執(zhí)行到判斷是否break時(shí)就被異常了,但是catch未執(zhí)行退出,將可能導(dǎo)致死循環(huán)。好比示例代碼中:

while (true) {
    // 此處發(fā)送請(qǐng)求拉取數(shù)據(jù),一旦報(bào)錯(cuò),將跳過下面的break 判斷,而catch中未退出,將導(dǎo)致死循環(huán)
    String result = download();
    // 判斷是否break
    if (result  == null) {
      break;
    }
} catch (Exception e) {
      e.printStackTrace();
      // 未執(zhí)行退出
    }
}
修改后
boolean loop = true;
while (loop) {
    String result = download();
    // 判斷是否break
    if (result  == null) {
      loop = false;
    }
} catch (Exception e) {
      // 退出循環(huán)
      loop = false;
    }
}
完整示例(while循環(huán)內(nèi)用次數(shù)模擬和真實(shí)業(yè)務(wù)邏輯還是有區(qū)別的)
package com.yzy.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main1 {
    static class Flag {
        private boolean f;

        public Flag() {
            f = true;
        }

        public boolean isF() {
            return f;
        }

        public void setF(boolean f) {
            this.f = f;
        }
    }

    public static void main(String[] args) throws InterruptedException {

        // 線程安全的計(jì)數(shù)器
        AtomicInteger totalRows = new AtomicInteger(0);

        // 創(chuàng)建線程池,其中核心線程10,也是我期望的最大并發(fā)數(shù),最大線程數(shù)和隊(duì)列大小都為30,即我的總?cè)蝿?wù)數(shù)
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

        // 初始化CountDownLatch,大小為30
        CountDownLatch countDownLatch = new CountDownLatch(30);

        // 記錄狀態(tài)
        final Flag flag = new Flag();

        // 模擬遍歷參數(shù)集合
        for (int i = 0; i < 30; i++) {
            // 往線程池提交任務(wù)
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    int times = 0;
                    boolean loop = true;
                    // 模擬數(shù)據(jù)拉取過程可能需要分頁
                    while (loop) {
                        // 模擬每個(gè)任務(wù)需要分頁5次
                        if (times >= 5) {
                            break;
                        }
                        times++;

                        // 模擬計(jì)數(shù)
                        totalRows.incrementAndGet();
                        try {
                            // 模擬耗時(shí)
                            Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
                        } catch (Exception e) {
                            // 如果子線程報(bào)錯(cuò),退出循環(huán)
                            loop = false;
                            // 如果子線程報(bào)錯(cuò),狀態(tài)標(biāo)識(shí)為false
                            flag.setF(false);
                        }
                    }
                    // 子線程完成,countDownLatch執(zhí)行countDown
                    countDownLatch.countDown();
                }
            });
            // 打印線程池運(yùn)行狀態(tài)
            System.out.println("線程池中線程數(shù)目:" + executor.getPoolSize() + ",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:" +
                    executor.getQueue().size() + ",已執(zhí)行結(jié)束的任務(wù)數(shù)目:" + executor.getCompletedTaskCount());
        }
        // 標(biāo)記多線程關(guān)閉,但不會(huì)立馬關(guān)閉
        executor.shutdown();

        // 阻塞當(dāng)前線程,知道所有子線程都執(zhí)行countDown方法才會(huì)繼續(xù)執(zhí)行
        countDownLatch.await();

        // 打印線程池運(yùn)行狀態(tài)
        System.out.println("線程池中線程數(shù)目:" + executor.getPoolSize() + ",隊(duì)列中等待執(zhí)行的任務(wù)數(shù)目:" +
                executor.getQueue().size() + ",已執(zhí)行結(jié)束的任務(wù)數(shù)目:" + executor.getCompletedTaskCount());

        if (flag.isF()) {
            // 打印計(jì)數(shù)
            System.out.println("結(jié)束:" + totalRows.get());
        } else {
            System.out.println("有子線程報(bào)錯(cuò),結(jié)果不準(zhǔn)確");
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容