????AbstractQueuedSynchronizer類(簡稱AQS)依賴于內(nèi)部的FIFO隊列,提供了一個可以實現(xiàn)阻塞鎖和同步機制的框架。依賴于AQS實現(xiàn)的具有同步機制和鎖功能的類,需要實現(xiàn)它定義的protect方法(tryAcquire,tryRelease等方法),通過改變state字段的狀態(tài)來保證同步機制。
AQS的使用場景
1、ReentrantLock類提供的同步鎖的功能也是基于AQS類來實現(xiàn)的,ReentrantLock提供的公平鎖(sync)和非公平鎖(NofairSync)兩種鎖機制,都是通過重寫tryAcquire和tryRelease方法來實現(xiàn)的。
ReentrantLock重寫的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 非公平的獲取鎖方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 提供了鎖的可重入功能,當已經(jīng)獲取鎖的線程再次請求獲取鎖時
// 將state的值加一。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock重寫的tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 判斷當前請求釋放鎖的線程與當前持有鎖的線程是不是同一個線程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 因為提供了鎖重入的功能,這里會判斷當前線程是否已經(jīng)釋放完所有持有的鎖
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Reentrant基于AQS加鎖的執(zhí)行過程

2、CyclicBarrier類
應用場景:創(chuàng)建一組任務,它們并行地執(zhí)行工作,然后在進行下一個步驟之前等待,直至所有任務都完成,它使得所有并行任務都將在柵欄處列隊,因此可以一致的向前移動。
CyclicBarrier類提供的功能也是基于AQS類來實現(xiàn)的,主要是利用AQS類中ConditionObject類提供的功能。
// 喚醒所有等待的線程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 喚醒所有線程的方法,循環(huán)隊列中等待的線程,調(diào)用transferForSignal方法喚醒在等待中的線程。
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
transferForSignal方法,會對當前節(jié)點狀態(tài)校驗,然后調(diào)用LockSupport.unpark方法喚醒指定的線程。
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
await方法:當CyclicBarrier類提供的計數(shù)器沒有減到等于0時,提前進入的線程會調(diào)用AQS中ConditionObject類提供的await方法,將當前線程加入到等待隊列中。
public final void await() throws InterruptedException {
// 如果當前線程被中斷,會拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 將新節(jié)點添加到等待隊列中去,節(jié)點的狀態(tài)為 CONDITON。
Node node = addConditionWaiter();
// 釋放當前線程持有的鎖,如果失敗則將當前節(jié)點的狀態(tài)改為cancelled(表示當前節(jié)點失效)。
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判斷當前節(jié)點是否在CLH隊列中,CONDITION狀態(tài)的節(jié)點,只能在CONDITION隊列中。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 出了while循環(huán),說明線程被喚醒,并且已經(jīng)將該node節(jié)點從CONDITION隊列transfer到了CLH隊列中,或者發(fā)生了中斷。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 處理異?;蛘咧袛嗟墓?jié)點
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
下面來看一下CyclicBarrier類的執(zhí)行過程
