Cyclicbarrier也是處理多線(xiàn)程并發(fā)的手段之一,就是多個(gè)線(xiàn)程同時(shí)執(zhí)行,要等到彼此都執(zhí)行完后再進(jìn)行下一步操作,使用方式很簡(jiǎn)單,網(wǎng)上例子很多,這里我貼一個(gè):
https://www.baeldung.com/java-cyclic-barrier
我們這里是要對(duì)這個(gè)類(lèi)的實(shí)現(xiàn)源碼進(jìn)行解讀,看看它是怎么實(shí)現(xiàn)的,首先看看Cyclicbarrier有哪些功能(方法,只看public的,因?yàn)橹挥衟ublic才是對(duì)外的功能,并且忽略構(gòu)造函數(shù)):
await:這個(gè)是最最主要的方法,功能是等到所有線(xiàn)程(代碼中變量為parties)執(zhí)行完畢。
isBroken: 次要的方法,檢查該Cyclicbarrier的狀態(tài)是否破損
reset:特色方法,用于重置整個(gè)Cyclicbarrier的狀態(tài),因?yàn)镃yclicbarrier會(huì)維持一些變量,這些變量會(huì)在對(duì)象方法執(zhí)行過(guò)程中有變化,這個(gè)方法會(huì)把這些變量重置到初始化狀態(tài),這樣這個(gè)Cyclicbarrier對(duì)象就可以重復(fù)使用了。
getNumberWaiting:返回正在等待的線(xiàn)程的數(shù)量,就是有多少個(gè)線(xiàn)程已經(jīng)完成了操作,正在等待所有線(xiàn)程執(zhí)行完畢。
好噠,明眼人都看得出來(lái)await這個(gè)方法是關(guān)鍵,看懂了這個(gè)方法就完全搞清楚這個(gè)類(lèi)啦。
await是多態(tài)方法有兩種形式,有參數(shù)和沒(méi)參數(shù)的:
public intawait()throwsInterruptedException,BrokenBarrierException {
try{
returndowait(false,0L);
???}catch(TimeoutException toe) {
throw newError(toe);// cannot happen
???}
}
和
public intawait(longtimeout,TimeUnit unit)
throwsInterruptedException,
??????????BrokenBarrierException,
??????????TimeoutException {
returndowait(true,unit.toNanos(timeout));
}
第二種其實(shí)就是給await加個(gè)超時(shí),就是說(shuō)在某些情況下線(xiàn)程無(wú)法等到所有其他線(xiàn)程都執(zhí)行完,因?yàn)槟承┚€(xiàn)程可能不能等太久,于是就加個(gè)超時(shí),常規(guī)操作嘛,可以看到它們其實(shí)都調(diào)用的dowait方法:
private intdowait(booleantimed, longnanos)
throwsInterruptedException,BrokenBarrierException,
??????????TimeoutException {
finalReentrantLock lock =this.lock;
???lock.lock();
??? try{
finalGeneration g =generation;
??????? if(g.broken)
throw newBrokenBarrierException();
??????? if(Thread.interrupted()) {
breakBarrier();
??????????? throw newInterruptedException();
???????}
intindex = --count;
??????? if(index ==0) {// tripped
???????????booleanranAction =false;
??????????? try{
finalRunnable command =barrierCommand;
??????????????? if(command !=null)
command.run();
???????????????ranAction =true;
???????????????nextGeneration();
??????????????? return0;
???????????}finally{
if(!ranAction)
breakBarrier();
???????????}
??????? }
// loop until tripped, broken, interrupted, or timed out
???????for(;;) {
try{
if(!timed)
trip.await();
??????????????? else if(nanos >0L)
nanos =trip.awaitNanos(nanos);
???????????}catch(InterruptedException ie) {
if(g ==generation&& ! g.broken) {
breakBarrier();
??????????????????? throwie;
???????????????}else{
// We're about to finish waiting even if we had not
??????????????????? // been interrupted, so this interrupt is deemed to
??????????????????? // "belong" to subsequent execution.
???????????????????Thread.currentThread().interrupt();
???????????????}
??????????? }
if(g.broken)
throw newBrokenBarrierException();
??????????? if(g !=generation)
returnindex;
??????????? if(timed && nanos <=0L) {
breakBarrier();
??????????????? throw newTimeoutException();
???????????}
??????? }
}finally{
lock.unlock();
???}
}
好幾十行哈,看著挺嚇人,但其實(shí)里面的異常和錯(cuò)誤處理都占了一大半=。=,我們平時(shí)寫(xiě)代碼異常處理太少的,站出來(lái)反思一下哈,代碼的魯棒性肯定不強(qiáng)。。。。
我來(lái)手動(dòng)剔除所有的異常和錯(cuò)誤處理,來(lái)看清它主要做的事情:
private intdowait(booleantimed, longnanos)
throwsInterruptedException,BrokenBarrierException,
???????TimeoutException {
finalReentrantLock lock =this.lock;
???lock.lock();
??? try{
finalGeneration g = generation;
??????? intindex = --count;
??????? if(index ==0) {// tripped
???????????finalRunnable command = barrierCommand;
??????????? if(command !=null)
command.run();
???????????nextGeneration();
??????????? return0;
???????}
// loop until tripped, broken, interrupted, or timed out
???????for(; ;) {
if(!timed)
trip.await();
??????????? else if(nanos >0L)
nanos = trip.awaitNanos(nanos);
???????}
}finally{
lock.unlock();
???}
}
是不是短了一大半(當(dāng)然那些異常錯(cuò)誤處理都是必須的)?ok, 首先我們線(xiàn)程第一步就是要獲取CyclicBarrier對(duì)象的鎖,因?yàn)榫€(xiàn)程會(huì)涉及到對(duì)CyclicBarrier對(duì)象里的變量進(jìn)行操作,這里是用鎖機(jī)制來(lái)保證同步。
忘了把CyclicBarrier里的所有變量貼出來(lái)了:
private static classGeneration {
booleanbroken=false;
}
/** The lock for guarding barrier entry */
private finalReentrantLocklock=newReentrantLock();
/** Condition to wait on until tripped */
private finalConditiontrip=lock.newCondition();
/** The number of parties */
private final intparties;
/* The command to run when tripped */
private finalRunnablebarrierCommand;
/** The current generation */
privateGenerationgeneration=newGeneration();
/**
* Number of parties still waiting. Counts down from parties to 0
* on each generation.? It is reset to parties on each new
* generation or when broken.
*/
private intcount;
上面說(shuō)的鎖就是ReentrantLock,我有一篇專(zhuān)門(mén)寫(xiě)ReentrantLock源碼解析的文章:
http://www.itdecent.cn/p/b43c9f62ceb1
好的, 獲取了鎖以后,我們會(huì)去判斷當(dāng)前線(xiàn)程是不是最后一個(gè)等待的線(xiàn)程,如果只剩它一個(gè)了的話(huà),就會(huì)停止了,然后執(zhí)行傳到CyclicBarrier對(duì)象里的線(xiàn)程,并且開(kāi)啟一個(gè)新的循環(huán)(其實(shí)也就是把CyclicBarrier對(duì)象的所有參數(shù)設(shè)置為初始化時(shí)的狀態(tài))。然后就完事了~
錯(cuò)誤處理就幾句話(huà)簡(jiǎn)單帶過(guò),就是當(dāng)有線(xiàn)程被中斷后,其他線(xiàn)程就會(huì)停止等待了,CyclicBarrier對(duì)象也會(huì)被重置,不用擔(dān)心永久等待的問(wèn)題。
所以對(duì)比CountDownLatch,?CyclicBarrier的好處就是CyclicBarrier對(duì)象可以復(fù)用~所以在需要復(fù)用的情況下(目前我沒(méi)遇到過(guò))就用CyclicBarrier吧~