本文適合AQS有一定基礎(chǔ)的伙伴進行閱讀,對其中比較重點的內(nèi)容做一個簡單的總結(jié),本文不會對AQS基礎(chǔ)框架和源碼進行很詳細的分析,網(wǎng)上有很多這種資源,大家可以先深入了解一下,直接上干貨。
一、互斥鎖與共享鎖
AQS是鎖實現(xiàn)的基礎(chǔ)框架,AQS區(qū)別了互斥鎖與共享鎖的實現(xiàn)方式,互斥鎖就是只有一個線程同一時刻可以獲得鎖,共享鎖是同一時刻可以有多個線程可以獲得鎖(這里不要與讀與寫進行關(guān)聯(lián),我之前有一段時間也有這種誤區(qū)),但是這兩種鎖是否能夠獲得到鎖,是實現(xiàn)鎖功能的類實現(xiàn)的如(ReentrantLock、ReentrantReadWriteLock),一般都需要實現(xiàn)如下幾個方法。

1.互斥鎖
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先調(diào)用實現(xiàn)鎖的tryAcquire來判斷是否能獲得鎖,若不能獲得鎖則初始化一個EXCLUSIVE類型的Node節(jié)點,并以CAS配合自旋的方式放入同步隊列尾部,然后再看該節(jié)點是否在同步隊列的首節(jié)點后,如果是再次調(diào)用tryAcquire,如果不能獲得到鎖
該線程就會阻塞( LockSupport.park實現(xiàn))。
二、共享鎖
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
調(diào)用實現(xiàn)鎖的tryAcquireShared方法,如果<0代表沒有獲得鎖,直接調(diào)用doAcquireShared方法,該方法的大部分與互斥鎖很相似,但是其中的不同是setHeadAndPropagate方法,該方法是在獲得鎖后實現(xiàn)了一個"傳播行為",依次喚醒同步隊列中的SHARE節(jié)點。
二、公平鎖與非公平鎖
公平鎖與非公平鎖是針對ASQ框架所實現(xiàn)的鎖(不是AQS自身實現(xiàn),這點很重要),我個人的理解是公平鎖是按照申請鎖的順序獲取鎖,而非公平鎖就沒有這樣嚴(yán)格的要求,那么我們知道要實現(xiàn)鎖,都需要實現(xiàn)AQS的核心方法(源碼可以查看ReentrantLock中的NonfairSync和FairSync),那我們看一下公平鎖與非公平鎖的具體實現(xiàn)。
1.非公平鎖
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
根據(jù)state==0的條件(狀態(tài))來嘗試上鎖,通過CAS的方式成功更改了state則獲得到鎖,且支持重入,如果沒有獲得到鎖返回false,后面的操作和ASQ的互斥鎖的流程一致,這里不做過多的介紹了。
2.公平鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
整段方法大家可以看到處理過程中多了一個hasQueuedPredecessors方法,其他大部分都很相似,這也是公平鎖與非公平鎖的控制的關(guān)鍵,主要實現(xiàn)的就是判斷當(dāng)前是否已經(jīng)存在等待的線程,如果存在則不能獲得鎖,老老實實的在AQS的同步隊列中進行排隊。
三、讀寫鎖
在ReentrantReadWriteLock中有兩個鎖,一個是ReadLock、一個是WriteLock,一般適用于多讀少寫的一些場景,之前我提過所有的鎖都是基于AQS進行的實現(xiàn),所以讀寫鎖也一樣,只是針對于這種場景(可以同一時間并發(fā)讀,但是只要有寫就阻塞)進行了處理。
1.讀鎖
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
在讀寫鎖中state32位,高16位為讀鎖獲取的次數(shù),低16位是互斥鎖獲取的次數(shù),如果獲取讀鎖時已經(jīng)有其他線程獲得了寫鎖那就直接返回獲得鎖失敗。如果沒有人獲得寫鎖,那就直接修改state的值,代表本線程獲得到了讀鎖。如果r==0說明沒有人獲得讀鎖,則給firstReader設(shè)置為當(dāng)前線程,重入次數(shù)為1,如果firstReader已經(jīng)是自身線程,則重入次數(shù)加1(支持重入),如果都不是,則從緩存中獲得重入次數(shù)并自增,如果本方法沒有獲得鎖那還是根據(jù)AQS共享鎖的原理在同步隊列等待。
讀鎖的釋放這里不做過多的描述,大概的思路是如果當(dāng)前線程是firstReader且重入次數(shù)為1則設(shè)置firstReader為空,如果不是則獲取緩存中的重入數(shù)量減1,如果減完后為0則返回true,進行同步隊列中線程的喚醒,其他時候返回false。
2.寫鎖
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
如果state!=0且w==0,說明有讀鎖的獲取次不為0,getExclusiveOwnerThread說明其他線程獲得了寫鎖,以上兩種情況都返回獲取鎖失敗,如果該線程通過重入獲取鎖成功則重新設(shè)置state的值,返回true。如果在上一步state為0則直接嘗試獲得寫鎖,獲得到返回true,獲取不到鎖返回false,后續(xù)流程就與AQS是互斥鎖一致了。
釋放鎖的過程也類似,修改state的值(減releases),如果重入為0則設(shè)置OwnerThread為null,返回true,喚醒同步隊列中的線程。
四、CountDownLatch
在CountDownLatch進行初始化的時候會設(shè)置state的個數(shù),每當(dāng)執(zhí)行countDown方式的時候就會調(diào)用tryReleaseShared方法將state減1,如果最后state不為0則一直都返回false,一旦state為0,則會喚醒等待隊列中的線程。
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
上面簡述了喚醒等待線程的過程,那這些進程是如何進入到隊列中的,其實是調(diào)用了CountDownLatch的await方法,await方法內(nèi)部調(diào)用了acquireSharedInterruptibly方法,如果state不為0也就是還有線程沒有運行,則所有的await線程要進入到等待隊列中。
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
五、Semaphore
該鎖也成為信號量,同一時刻最多只有N個線程可以同時工作,可以實現(xiàn)簡單的限流,底層的實現(xiàn)也是基于AQS,在該鎖中也可以實現(xiàn)公平鎖與非公平鎖兩種方式。
1.獲得許可(以非公平鎖為例)
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
當(dāng)線程想獲得一個鎖的時候,該鎖實現(xiàn)了可中斷和非終端兩種方式,nonfairTryAcquireShared方法是一個自旋,根據(jù)state獲得現(xiàn)在還剩下的許可數(shù),然后減想獲得的許可數(shù),如果剩余的許可小于0則返回一個負數(shù),該線程進入到AQS是同步隊列中。如果剩余的許可數(shù)大于0,則會更新state的許可數(shù),然后返回一個正數(shù),該線程獲得鎖。
2.釋放許可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
釋放一個許可也很簡單,獲得當(dāng)前的許可數(shù)state,將釋放的許可數(shù)疊在state上(可以提供這種方式動態(tài)的增加許可的數(shù)量),然后對state進行更新,如果更新成功則返回true,依次喚醒同步隊列中的線程獲取鎖。
小結(jié):AQS的核心就是互斥鎖和共享鎖的實現(xiàn)流程,至于其他出現(xiàn)的鎖,都是在AQS的基礎(chǔ)上實現(xiàn)的,比如ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphore等,我看源碼的時候最后就是充分的理解了這句話才對整個AQS理解更加的深刻。