前言
線程并發(fā)系列文章:
Java 線程基礎(chǔ)
Java 線程狀態(tài)
Java “優(yōu)雅”地中斷線程-實踐篇
Java “優(yōu)雅”地中斷線程-原理篇
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有誤
Java Unsafe/CAS/LockSupport 應(yīng)用與原理
Java 并發(fā)"鎖"的本質(zhì)(一步步實現(xiàn)鎖)
Java Synchronized實現(xiàn)互斥之應(yīng)用與源碼初探
Java 對象頭分析與使用(Synchronized相關(guān))
Java Synchronized 偏向鎖/輕量級鎖/重量級鎖的演變過程
Java Synchronized 重量級鎖原理深入剖析上(互斥篇)
Java Synchronized 重量級鎖原理深入剖析下(同步篇)
Java并發(fā)之 AQS 深入解析(上)
Java并發(fā)之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 詳解
Java 并發(fā)之 ReentrantLock 深入分析(與Synchronized區(qū)別)
Java 并發(fā)之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(應(yīng)用篇)
最詳細(xì)的圖文解析Java各種鎖(終極篇)
線程池必懂系列
前面分析了基于AQS的獨占鎖ReentrantLock、共享鎖/獨占鎖ReentrantReadWriteLock,它們內(nèi)部都實現(xiàn)了Lock 接口。而AQS還有其它常用的子類封裝器,它們雖然沒有實現(xiàn)Lock接口,但可以用來做線程間的同步,接下來將要來深入了解它們。
通過本篇文章,你將了解到:
1、Semaphore 原理分析
2、CountDownLatch 原理分析
3、CyclicBarrier 原理分析
1、Semaphore 原理分析
場景引入
ReentrantReadWriteLock 里有讀鎖和寫鎖,其中讀鎖是共享鎖,其核心是對AQS里的"state"變量進(jìn)行操作,每獲取一次鎖,將state加1,釋放鎖將state減1。從這里可以看出,將state作為共享資源能夠?qū)崿F(xiàn)線程間的協(xié)作。
現(xiàn)在有個需求:資源是共享的,但是數(shù)量有限,因此沒拿到資源的需要等待別人釋放資源。
將state作為標(biāo)記共享資源的數(shù)量,那么就有:
1、線程占有資源后將state減1,線程釋放資源后將state加1。
2、若線程沒拿到資源(資源都被其它線程占有了),那么掛起等待。
3、線程釋放資源后,喚醒其它等待該資源的線程。

這樣子,不用synchronized+wait/notify與ReentrantLock+await/signal,也依然能夠?qū)崿F(xiàn)線程間同步。
具體到現(xiàn)實場景:
如停車場只能容納一定數(shù)量的車子,當(dāng)停車場停滿了車(入場許可發(fā)放完了),其它想進(jìn)來的車子必須等待有其它車從停車場開出(釋放入場許可)。
Semaphore 構(gòu)造
指定初始的許可個數(shù):
#Semaphore.java
public Semaphore(int permits) {
//默認(rèn)是非公平
sync = new NonfairSync(permits);
}
Sync(int permits) {
setState(permits);
}
可以看出許可的個數(shù)就是state的值。
Semaphore 占有許可
通過調(diào)用acquire(xx)占有許可:
#Semaphore.java
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
//交給AQS處理,可中斷
sync.acquireSharedInterruptibly(permits);
}
#AbstractQueuedSynchronizer.java
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//發(fā)生了中斷,直接返回
if (Thread.interrupted())
throw new InterruptedException();
//嘗試修改state(減)
if (tryAcquireShared(arg) < 0)
//修改失敗,則掛起等待
doAcquireSharedInterruptibly(arg);
}
每次可以占有多個許可,若占有成功則直接返回,否則掛起等待。
具體的操作state在tryAcquireShared(xx)里實現(xiàn),此處以非公平模式說明:
#Semaphore.java
final int nonfairTryAcquireShared(int acquires) {
//死循環(huán)確保修改state成功,或者state已經(jīng)獲取完了
for (;;) {
//獲取state
int available = getState();
//減少state
int remaining = available - acquires;
if (remaining < 0 ||
//CAS 操作
compareAndSetState(available, remaining))
return remaining;
}
}
Semaphore 釋放許可
占有許可做了相應(yīng)的任務(wù)后,就可以釋放許可了。
通過調(diào)用release(xx)釋放許可:
#Semaphore.java
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
//AQS 實現(xiàn)
sync.releaseShared(permits);
}
#AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
//嘗試修改state(加)
if (tryReleaseShared(arg)) {
//成功修改state,喚醒后繼節(jié)點
doReleaseShared();
return true;
}
//修改失敗
return false;
}
具體的操作state在tryReleaseShared(xx)里實現(xiàn):
#Semaphore.java
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取state
int current = getState();
//增加
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//修改
if (compareAndSetState(current, next))
return true;
}
}
可以看出:
釋放許可,增加state,占有許可,減少state。
另外,Semaphore 占有許可可分為公平與非公平模式,占有許可過程可中斷/不可中斷。

Semaphore 與Lock 區(qū)別
與ReentrantLock、ReentrantReadWriteLock 區(qū)別在于從不同的角度看待state:
1、ReentrantLock、ReentrantReadWriteLock 獲取鎖的過程是將state值增大,而Semaphore 占有許可是將state值減小。
2、ReentrantLock、ReentrantReadWriteLock 釋放鎖的過程是將state值減小,而Semaphore 釋放許可是將state值增大。
3、這也是AQS的靈活之處,將具體的"state"鎖代表的意義由子類實現(xiàn),可實現(xiàn)不同場景的應(yīng)用。
2、CountDownLatch 原理分析
場景引入
A、B、C三個線程協(xié)作:
A 等待B、C完成任務(wù)后再進(jìn)行下一步操作。
這場景我們可能會想到用Thread.join(),A調(diào)用B.join(),C.join(),A阻塞等待,當(dāng)B、C線程執(zhí)行結(jié)束后喚醒A。這種方式雖然能夠解決問題,但是有些不盡人意的地方:比如說A不一定要等待B、C執(zhí)行完成,而是B、C中途完成某個任務(wù)后通知A;又比如,B、C線程不止執(zhí)行一次任務(wù),而是一定的次數(shù)后才會喚醒A,這個時候使用Thread.join() 就無法解決問題了。
而CountDownLatch 可以很好地解決這問題。
CountDownLatch 構(gòu)造
#CountDownLatch.java
//初始化次數(shù)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
//設(shè)置state
setState(count);
}
可以看出,count的值最終反饋到state上。
CountDownLatch 等待
通過await(xx)等待state變?yōu)?:
#CountDownLatch.java
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
//超時返回
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
#AbstractQueuedSynchronizer.java
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
//該方法響應(yīng)中斷
if (Thread.interrupted())
throw new InterruptedException();
//主要工作在tryAcquireShared(xx)里
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
又是AQS的套路,具體的操作state在tryAcquireShared(xx)里實現(xiàn):
#CountDownLatch.java
protected int tryAcquireShared(int acquires) {
//若state == 0,則返回1,否則-1
//外層判斷>=0,說明當(dāng)前state還有數(shù)量,則需要阻塞等待,否則不阻塞
return (getState() == 0) ? 1 : -1;
}
與其它子類實現(xiàn)的tryAcquireShared(xx)方法不同的是,CountDownLatch里的Sync并沒有修改state的值,僅僅只是判斷state?=0進(jìn)而做具體的操作而已。
由此可知:CountDownLatch 是基于AQS的共享模式。
CountDownLatch 倒數(shù)計數(shù)
既然調(diào)用await(xx)可能會使得線程阻塞等待,那么勢必有其它線程喚醒它,調(diào)用的方法即是countDown():
#CountDownLatch.java
public void countDown() {
sync.releaseShared(1);
}
#AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
//子類實現(xiàn)
if (tryReleaseShared(arg)) {
//AQS里實現(xiàn),喚醒阻塞的線程
doReleaseShared();
return true;
}
return false;
}
同樣的,具體的操作state在tryReleaseShared(xx)里實現(xiàn):
#CountDownLatch.java
protected boolean tryReleaseShared(int releases) {
for (;;) {
//獲取state
int c = getState();
//若當(dāng)前state==0,說明已經(jīng)沒有可以釋放的了
if (c == 0)
return false;
int nextc = c-1;
//CAS修改
if (compareAndSetState(c, nextc))
//說明可以喚醒其它線程了
return nextc == 0;
}
}
也即是說,當(dāng)線程調(diào)用await(xx)阻塞后,其它線程通過countDown()修改state值,若是發(fā)現(xiàn)state最終變?yōu)?了,那么喚醒阻塞的線程。
用圖表示CountDownLatch主要結(jié)構(gòu)如下:

CountDownLatch 與其它AQS子類封裝器的區(qū)別
前面已經(jīng)分析了基于AQS的封裝器:ReentrantLock、ReentrantReadWriteLock、Semaphore,它們對state值的修改包括增加與減少,而CountDownLatch 只是減小state的值,用以實現(xiàn)倒數(shù)計數(shù)的功能。
可類比場景如下:
1、田徑運動場開始百米賽跑。
2、運動員在跑道上各就各位(多個線程調(diào)用await 阻塞等待)。
3、裁判喊倒數(shù)3、2、1(線程調(diào)用countDown)。
4、等待倒數(shù)結(jié)束,發(fā)令槍響,運動員就開始跑(線程被喚醒,繼續(xù)做事)。
可以看出,運動員不會去干涉裁判的倒數(shù)(修改state值)。
3、CyclicBarrier 原理分析
場景引入
在CountDownLatch 場景里說到運動員需要裁判,想想可以不需要裁判嗎?運動員之間自發(fā)倒數(shù),倒數(shù)結(jié)束就一起跑。
更普遍的場景是:
1、幾個驢友想去某個景點旅游,約定了在某個地方集合后再一起出發(fā)。
2、每個驢友到達(dá)集合點時打卡并看人都到齊了沒,沒到齊則等待。
3、若最后一個參與者過來后發(fā)現(xiàn)人到齊了,于是告訴大家不用等了,出發(fā)吧。
CyclicBarrier 可滿足該場景的需求。
CyclicBarrier 構(gòu)造
#CyclicBarrier.java
public CyclicBarrier(int parties, Runnable barrierAction) {
//必須要有參與者
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
//臨時變量count
this.count = parties;
//參與者都到達(dá)了后執(zhí)行的動作
this.barrierCommand = barrierAction;
}
可以看出,此處并沒有AQS介入,也就是沒有直接修改state。
CyclicBarrier是通過ReentrantLock + Condition 來實現(xiàn)線程間同步的:
#CyclicBarrier.java
//獨占鎖,為了互斥修改count
private final ReentrantLock lock = new ReentrantLock();
//線程等待條件
private final Condition trip = lock.newCondition();
//修改的共享變量
private int count;
CyclicBarrier 等待參與者
接著來分析,如何實現(xiàn)線程間的同步的。
#CyclicBarrier.java
public int await() throws InterruptedException, BrokenBarrierException {
try {
//實際調(diào)用doWait(),此處是不限時等待
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
//先鎖住
lock.lock();
try {
final Generation g = generation;
//等待過程被中斷
if (g.broken)
throw new BrokenBarrierException();
//中斷了線程
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//等待個數(shù)-1
int index = --count;
if (index == 0) {
//都到齊了,無需等待了
boolean ranAction = false;
try {
//執(zhí)行既定的方法
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//開始下一輪
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
//走到這,說明還需要等待
for (;;) {
try {
if (!timed)
//不限時等待
trip.await();
else if (nanos > 0L)
//限時等待,時間到了就返回
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
//等待過程被中斷,則拋出異常
if (g == generation && ! g.broken) {
//不等了,喚醒其它線程
breakBarrier();
throw ie;
} else {
...
Thread.currentThread().interrupt();
}
}
//醒來后發(fā)現(xiàn)已經(jīng)被中斷了,則直接拋出異常
if (g.broken)
throw new BrokenBarrierException();
//已經(jīng)開啟了下一輪,說明前面一輪都到齊了結(jié)束了
if (g != generation)
return index;
if (timed && nanos <= 0L) {
//超時了還是沒到齊,不等了,喚醒其它線程
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
線程先獲取獨占鎖,然后修改count值,若發(fā)現(xiàn)修改后count !=0,那么還需要等待,等待借助的是Condition.await(xx)方法。
有等待,自然有喚醒的地方:
#CyclicBarrier.java
private void breakBarrier() {
//置為true,表示已經(jīng)結(jié)束等待了
generation.broken = true;
//重置count,復(fù)用的關(guān)鍵
count = parties;
//喚醒其它在等待的線程
trip.signalAll();
}
用圖表示,等待/喚醒過程如下:

來看看CyclicBarrier 主要方法:

CyclicBarrier與CountDownLatch 區(qū)別
看到這,你也許已經(jīng)發(fā)現(xiàn)了CyclicBarrier 和CountDownLatch 實現(xiàn)的功能很相似,都是等待某個條件滿足后再進(jìn)行下一步的動作,兩者不同之處在于:
1、CountDownLatch 參與的線程分為兩類:一個是等待者,另一個是計數(shù)者;CyclicBarrier 參與的線程既是等待者,也是計數(shù)者。
2、CountDownLatch 完成一次完整的協(xié)作過程后不能再復(fù)用,CountDownLatch 可以復(fù)用(不用重新新建CountDownLatch 對象)。
3、CountDownLatch 的計數(shù)值與線程個數(shù)沒有必然聯(lián)系,CyclicBarrier 的初始計數(shù)值與線程個數(shù)一致。
4、CountDownLatch 基于AQS實現(xiàn),CyclicBarrier 基于ReentrantLock&Condition實現(xiàn)(內(nèi)部也是基于AQS)。
你可能還有疑惑,在下一篇應(yīng)用篇將會重點體現(xiàn)兩者區(qū)別。
下篇文章重點分析:Semaphore/CountDownLatch/CyclicBarrier 實際應(yīng)用。
本文基于jdk1.8。