有順序的多線程操作

又到了天橋說書的時候了,今天來聊一聊多線程的一些東西。假如,我是說假如,突然有一天給了來了這樣一個需求:有多組任務,組與組之間的任務是順序執(zhí)行的,組內的任務是并發(fā)執(zhí)行的。文字說明如下:

組1: 任務A   任務B
組2:任務A1  任務B1  任務C1
組3:    任務A3  
...

就像上面文字所描述的,組1的任務是第一個執(zhí)行的,任務A和任務B是并發(fā)去執(zhí)行的。組2的任務要等到組1的所有任務執(zhí)行完畢之后,再去并發(fā)執(zhí)行任務A1 任務B1 任務C1,就這樣以此類推,一直往下執(zhí)行。

是不是一聽,就很簡單呢,可以寫個循環(huán),遍歷每一組,每一組的各個任務包裝為一個Callable對象去扔到線程池去執(zhí)行,并且將放入線程池的返回對象Future對象放入列表,循環(huán)遍歷這個Future列表,但凡有一個future.get()是阻塞的,那么就說明任務沒有執(zhí)行完畢,那就繼續(xù)等著,直到循環(huán)遍歷完畢,就說明所有的組內任務執(zhí)行完畢了。下一組的任務如法炮制。

不得不說,這是一個取巧的辦法,就是一次執(zhí)行一點點的任務,根據(jù)結果再來決定是否繼續(xù)執(zhí)行。但總感覺這種方法太low了,不夠高大上。為啥要高大上呢?一切都是為了裝逼。

我更希望的是這些任務一股腦的扔進線程池,然后根據(jù)某種機制,去保證上面要求的順序。這樣一來,題目的難度是不是就高了那么一點點呢。

首先說一下思路吧:線程間的等待,可以使用CountDownLatch來做。每個任務的包裝類Callable均持有兩個CountDownLatch的引用。我們分別叫他為prev和current吧。prev這個是上一組任務的CountDownLatch,上一組任務每次執(zhí)行完畢,都countdown一次。prev這個就在執(zhí)行方法體第一行進行await,當上一組所有方法執(zhí)行完,該線程就不再阻塞,可以執(zhí)行了。current是本組內的CountDownLatch,當前任務執(zhí)行完畢,就countdown一次。

對應的偽代碼如下:

public class TestCallable implements Callable {

    private Data data;

    private CountDownLatch prevCountDownLatch;

    private CountDownLatch currentCountDownLatch;

    public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
        this.data = data;
        this.prevCountDownLatch = prevCountDownLatch;
        this.currentCountDownLatch = currentCountDownLatch;
    }

    @Override
    public Object call() throws Exception e{
        if(prevCountDownLatch != null){
            prevCountDownLatch.await();
        }
        try {
            //執(zhí)行業(yè)務邏輯
        }finally {
            if(currentCountDownLatch != null){
                currentCountDownLatch.countDown();
            }
            return result;
        }
    }
}

值得注意的是,構造哪個是當前的CountDownLatch,哪一個是上一個的CountDownLatch,在循環(huán)中,應該注意:

CountDownLatch prevCountDownLatch = null;
CountDownLatch currentCountDownLatch = null;
for (int i = 0; i < list.size(); i++) {
  List<Data> dataList = list.get(i);
  currentCountDownLatch = new CountDownLatch(dataList.size());
  for(int j = 0; j < dataList.size(); j++){
    Data data = dataList.get(j);
    TestCallable callable = new TestCallable(data,prevCountDownLatch,currentCountDownLatch);
    //放入線程池略...

  }
  prevCountDownLatch = currentCountDownLatch;
}

到這里,基本上就好了,你可以享受一下窗外的湖光山色,聲色犬馬一番...等等,突然需求又增加了:如果前面組內的任何一個任務失敗或者出錯了,后續(xù)的任務都要取消執(zhí)行。

想想也挺合理,但是,這些任務可是放入了線程池啊,我如何去把它們捉出來,然后殘忍的殺死呢?或者換一種思路,我需要有某種標記,去標記當前任務是否可以執(zhí)行,如果不可以執(zhí)行,就返回。那么后續(xù)的任務呢,可以在返回之前,將直接的后續(xù)任務標記為不可執(zhí)行。每個任務執(zhí)行前,都要判斷可執(zhí)行標記。偽代碼如下:

public class TestCallable implements Callable {

    private Data data;

    private CountDownLatch prevCountDownLatch;

    private CountDownLatch currentCountDownLatch;

    private volatile boolean isCancel = false;//是否取消執(zhí)行  注意volatile關鍵字,這是線程可見的重要手段

    /**
     * 內部持有的引用  用來 控制這些的內部標記  isCancel。當上一組的任意一個任務執(zhí)行失敗,后續(xù)的就要取消執(zhí)行
     * 不可以以對這些內部引用callable進行其他操作!
     */
    private List<TestCallable> nextCallableList;

    public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
        this.data = data;
        this.prevCountDownLatch = prevCountDownLatch;
        this.currentCountDownLatch = currentCountDownLatch;
    }

    @Override
    public Object call(){
        if(prevCountDownLatch != null){
            try {
                prevCountDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
                return this.doCancel();
            }
        }

        //注意放在prevCountDownLatch.await()  不要放在這之前
        if(this.isCancel){//如果當前線程是取消執(zhí)行標記  那么執(zhí)行取消的邏輯
            return this.doCancel();
        }

        boolean result = false;
        
        try {
            //邏輯操作
            result = true;
        }catch (Exception e){
            e.printStackTrace();
            result = false;
            //將所有下一組的線程的取消標記置為true
            this.cancelAllNext();
        }finally {
            if(currentCountDownLatch != null){
                currentCountDownLatch.countDown();
            }
            return result;
        }
    }

    public List<TestCallable> getNextCallableList() {
        return nextCallableList;
    }

    public void setNextCallableList(List<TestCallable> nextCallableList) {
        this.nextCallableList = nextCallableList;
    }

    public Data getData() {
        return data;
    }

    private void cancel(){
        this.isCancel = true;
    }

    private void cancelAllNext(){
        if(CollectionUtils.isEmpty(nextCallableList)){
            return;
        }
        for (int i = 0; i < nextCallableList.size(); i++) {
            TestCallable nextCallable = nextCallableList.get(i);
            nextCallable.cancel();
        }
    }

    private Object doCancel(){
        //首先將所有下一組的線程的取消標記置為true
        this.cancelAllNext();
        //當前線程繼續(xù)countdown  這樣下一組線程才輪的到執(zhí)行
        if(currentCountDownLatch != null){
            currentCountDownLatch.countDown();
        }
        return false;
    }
}

這些Callable的list引用如何構造呢:

//key: 第幾組任務  value:對應組的任務包裝為callable的列表
Map<Integer,List<TestCallable>> callableMap = new HashMap<>(); 
CountDownLatch prevCountDownLatch = null;
CountDownLatch currentCountDownLatch = null;
for (int i = 0; i < list.size(); i++) {
  List<Data> dataList = list.getChildList();
  currentCountDownLatch = new CountDownLatch(dataList.size());
  for(int j = 0; j < dataList.size(); j++){
    Data data = dataList.get(j);
    List<TestCallable> callableList = callableMap.get(i);
    if(CollectionUtils.isEmpty(callableList)){
      callableList = new ArrayList<>();
    }
    callableList.add(callable);
    callableMap.put(i,callableList);
  }
  prevCountDownLatch = currentCountDownLatch;
}

for (int i = 0; i < list.size(); i++) {
  List<TestCallable> callableList = callableMap.get(i);
  List<TestCallable> nextCallableList = callableMap.get(i + 1);
  for (int j = 0; j < callableList.size(); j++) {
    TestCallable callable = callableList.get(j);
    callable.setNextCallableList(nextCallableList);
    threadPool.submit(callable);
  }
}

好了,就完事了。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容