又到了天橋說書的時候了,今天來聊一聊多線程的一些東西。假如,我是說假如,突然有一天給了來了這樣一個需求:有多組任務,組與組之間的任務是順序執(zhí)行的,組內的任務是并發(fā)執(zhí)行的。文字說明如下:
組1: 任務A 任務B
組2:任務A1 任務B1 任務C1
組3: 任務A3
...
就像上面文字所描述的,組1的任務是第一個執(zhí)行的,任務A和任務B是并發(fā)去執(zhí)行的。組2的任務要等到組1的所有任務執(zhí)行完畢之后,再去并發(fā)執(zhí)行任務A1 任務B1 任務C1,就這樣以此類推,一直往下執(zhí)行。
是不是一聽,就很簡單呢,可以寫個循環(huán),遍歷每一組,每一組的各個任務包裝為一個Callable對象去扔到線程池去執(zhí)行,并且將放入線程池的返回對象Future對象放入列表,循環(huán)遍歷這個Future列表,但凡有一個future.get()是阻塞的,那么就說明任務沒有執(zhí)行完畢,那就繼續(xù)等著,直到循環(huán)遍歷完畢,就說明所有的組內任務執(zhí)行完畢了。下一組的任務如法炮制。
不得不說,這是一個取巧的辦法,就是一次執(zhí)行一點點的任務,根據(jù)結果再來決定是否繼續(xù)執(zhí)行。但總感覺這種方法太low了,不夠高大上。為啥要高大上呢?一切都是為了裝逼。
我更希望的是這些任務一股腦的扔進線程池,然后根據(jù)某種機制,去保證上面要求的順序。這樣一來,題目的難度是不是就高了那么一點點呢。
首先說一下思路吧:線程間的等待,可以使用CountDownLatch來做。每個任務的包裝類Callable均持有兩個CountDownLatch的引用。我們分別叫他為prev和current吧。prev這個是上一組任務的CountDownLatch,上一組任務每次執(zhí)行完畢,都countdown一次。prev這個就在執(zhí)行方法體第一行進行await,當上一組所有方法執(zhí)行完,該線程就不再阻塞,可以執(zhí)行了。current是本組內的CountDownLatch,當前任務執(zhí)行完畢,就countdown一次。
對應的偽代碼如下:
public class TestCallable implements Callable {
private Data data;
private CountDownLatch prevCountDownLatch;
private CountDownLatch currentCountDownLatch;
public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
this.data = data;
this.prevCountDownLatch = prevCountDownLatch;
this.currentCountDownLatch = currentCountDownLatch;
}
@Override
public Object call() throws Exception e{
if(prevCountDownLatch != null){
prevCountDownLatch.await();
}
try {
//執(zhí)行業(yè)務邏輯
}finally {
if(currentCountDownLatch != null){
currentCountDownLatch.countDown();
}
return result;
}
}
}
值得注意的是,構造哪個是當前的CountDownLatch,哪一個是上一個的CountDownLatch,在循環(huán)中,應該注意:
CountDownLatch prevCountDownLatch = null;
CountDownLatch currentCountDownLatch = null;
for (int i = 0; i < list.size(); i++) {
List<Data> dataList = list.get(i);
currentCountDownLatch = new CountDownLatch(dataList.size());
for(int j = 0; j < dataList.size(); j++){
Data data = dataList.get(j);
TestCallable callable = new TestCallable(data,prevCountDownLatch,currentCountDownLatch);
//放入線程池略...
}
prevCountDownLatch = currentCountDownLatch;
}
到這里,基本上就好了,你可以享受一下窗外的湖光山色,聲色犬馬一番...等等,突然需求又增加了:如果前面組內的任何一個任務失敗或者出錯了,后續(xù)的任務都要取消執(zhí)行。
想想也挺合理,但是,這些任務可是放入了線程池啊,我如何去把它們捉出來,然后殘忍的殺死呢?或者換一種思路,我需要有某種標記,去標記當前任務是否可以執(zhí)行,如果不可以執(zhí)行,就返回。那么后續(xù)的任務呢,可以在返回之前,將直接的后續(xù)任務標記為不可執(zhí)行。每個任務執(zhí)行前,都要判斷可執(zhí)行標記。偽代碼如下:
public class TestCallable implements Callable {
private Data data;
private CountDownLatch prevCountDownLatch;
private CountDownLatch currentCountDownLatch;
private volatile boolean isCancel = false;//是否取消執(zhí)行 注意volatile關鍵字,這是線程可見的重要手段
/**
* 內部持有的引用 用來 控制這些的內部標記 isCancel。當上一組的任意一個任務執(zhí)行失敗,后續(xù)的就要取消執(zhí)行
* 不可以以對這些內部引用callable進行其他操作!
*/
private List<TestCallable> nextCallableList;
public TestCallable(Data data,CountDownLatch prevCountDownLatch,CountDownLatch currentCountDownLatch) {
this.data = data;
this.prevCountDownLatch = prevCountDownLatch;
this.currentCountDownLatch = currentCountDownLatch;
}
@Override
public Object call(){
if(prevCountDownLatch != null){
try {
prevCountDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
return this.doCancel();
}
}
//注意放在prevCountDownLatch.await() 不要放在這之前
if(this.isCancel){//如果當前線程是取消執(zhí)行標記 那么執(zhí)行取消的邏輯
return this.doCancel();
}
boolean result = false;
try {
//邏輯操作
result = true;
}catch (Exception e){
e.printStackTrace();
result = false;
//將所有下一組的線程的取消標記置為true
this.cancelAllNext();
}finally {
if(currentCountDownLatch != null){
currentCountDownLatch.countDown();
}
return result;
}
}
public List<TestCallable> getNextCallableList() {
return nextCallableList;
}
public void setNextCallableList(List<TestCallable> nextCallableList) {
this.nextCallableList = nextCallableList;
}
public Data getData() {
return data;
}
private void cancel(){
this.isCancel = true;
}
private void cancelAllNext(){
if(CollectionUtils.isEmpty(nextCallableList)){
return;
}
for (int i = 0; i < nextCallableList.size(); i++) {
TestCallable nextCallable = nextCallableList.get(i);
nextCallable.cancel();
}
}
private Object doCancel(){
//首先將所有下一組的線程的取消標記置為true
this.cancelAllNext();
//當前線程繼續(xù)countdown 這樣下一組線程才輪的到執(zhí)行
if(currentCountDownLatch != null){
currentCountDownLatch.countDown();
}
return false;
}
}
這些Callable的list引用如何構造呢:
//key: 第幾組任務 value:對應組的任務包裝為callable的列表
Map<Integer,List<TestCallable>> callableMap = new HashMap<>();
CountDownLatch prevCountDownLatch = null;
CountDownLatch currentCountDownLatch = null;
for (int i = 0; i < list.size(); i++) {
List<Data> dataList = list.getChildList();
currentCountDownLatch = new CountDownLatch(dataList.size());
for(int j = 0; j < dataList.size(); j++){
Data data = dataList.get(j);
List<TestCallable> callableList = callableMap.get(i);
if(CollectionUtils.isEmpty(callableList)){
callableList = new ArrayList<>();
}
callableList.add(callable);
callableMap.put(i,callableList);
}
prevCountDownLatch = currentCountDownLatch;
}
for (int i = 0; i < list.size(); i++) {
List<TestCallable> callableList = callableMap.get(i);
List<TestCallable> nextCallableList = callableMap.get(i + 1);
for (int j = 0; j < callableList.size(); j++) {
TestCallable callable = callableList.get(j);
callable.setNextCallableList(nextCallableList);
threadPool.submit(callable);
}
}
好了,就完事了。