目標
分析aqs中頂層方法實現,主要包括方法:
- acquire 外層獲取鎖方法
- acquireQueued 節(jié)點以自旋的方式獲取同步狀態(tài),如果獲取同步狀態(tài)失敗,要掛起線程
- addWaiter 將當前線程包裝成node節(jié)點添加到等待隊列
- shouldParkAfterFailedAcquire 處理當前線程是否應該掛起
- cancelAcquire 對于最終未成功獲取到同步鎖狀態(tài)的情況進行移除等待隊列處理
主流程時序圖

分析下
aqs頂層方法的入口為acquire,看下源碼:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
尋著流程圖,看到首先調用的tryAcquire方法(但此方法是其具體鎖實現類實現的,如我們常用的ReentrantLock類,但此方法我們先略過不管),此方法含義為當前線程去嘗試獲取資源,當獲取失敗時調用acquireQueued方法進入等待隊列,相當于進入鎖池,看下acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//1.不斷自旋重試或線程等待,直到結果出來
for (;;) {
final Node p = node.predecessor();
//2. 當前節(jié)點為頭節(jié)點,調用tryAcquire嘗試獲取資源,如果成功直接返回,并把當前節(jié)點置為頭節(jié)點
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//3. 上一步未成功,判斷當前階段是否應該進入線程等待狀態(tài),如果線程進入等待,進入等待池.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//4. 等待被喚醒后標注此線程被中斷過
interrupted = true;
}
} finally {
if (failed)
//5. 不斷試驗后獲取鎖失敗,進行放棄等待邏輯,將此節(jié)點從隊列中移除
cancelAcquire(node);
}
}
看下shouldParkAfterFailedAcquire方法,什么情況下滿足線程等待:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//1. 前節(jié)點狀態(tài)為Node.SIGNAL,代表此節(jié)點已經標記,當他資源釋放后,喚起后面的節(jié)點,所以本節(jié)點可以安全park
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
//2. 當狀態(tài)>0,表述前節(jié)點已經放棄等待,所以遞歸前移,找到可用節(jié)點
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//3. 其他情況,cas嘗試將前節(jié)點狀態(tài)設置為Node.SIGNAL狀態(tài),表示前節(jié)點結束后喚醒當前節(jié)點,但本次并不阻塞.
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
上面就是嘗試獲取隊列,失敗進行阻塞的邏輯.其中還有個addWaiter(Node.EXCLUSIVE)方法,此方法用于將當前線程包裝成一個Node,并放入隊列中:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 1. 嘗試將當前節(jié)點設為尾節(jié)點,成功后,返回此節(jié)點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//2. 如果設置失敗,此方法進行不斷自旋(不斷循環(huán)到原有邏輯進行重試),嘗試加入尾幾點
enq(node);
return node;
}
對應自旋邏輯 enq(node);
private Node enq(final Node node) {
//不斷循環(huán)自旋重試
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//當隊列為空時,必須有個首節(jié)點,進行初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//嘗試再次設置尾節(jié)點,失敗后將會循環(huán)重試,成功將返回此節(jié)點
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
好了,上面就是頂層aqs抽象類定義的方法.
引申出tryAcquire方法:
tryAcquire方法在頂層并未實現,具體實現是各個具體鎖實現的,如ReentrantLock中通過兩個內部類NonfairSync(非公平鎖)和FairSync(公平鎖)來實現,下面對比下這兩個類的此方法:
1. 公平鎖獲取方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//1. 獲取當前資源狀態(tài)
int c = getState();
if (c == 0) {
//2.通過hasQueuedPredecessors方法判斷是否有其他線程排在前邊等待隊列
if (!hasQueuedPredecessors() &&
//3. 如果當前此線程排在最前邊,則將當前隊列狀態(tài)值賦值, acquires在獨占鎖中一直是1,所以就是0到1點變化
compareAndSetState(0, acquires)) {
//4.如果狀態(tài)修改成功,則將當前資源擁有者改成當前線程,返回true
setExclusiveOwnerThread(current);
return true;
}
}
//5. 上一步判斷c!=0代表已有線程擁有資源,判斷當前線程是否為此線程
else if (current == getExclusiveOwnerThread()) {
//6.如果是當前線程獲取的資源,此處重新獲取鎖,因為為可重入鎖,所以鎖次數加1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上面的分析,看出了公平鎖的實現,其中還有個重點,hasQueuedPredecessors方法是怎么判斷是否有其他隊列排在前邊:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//1. 此方法需要返回false才代表沒有排在前邊的線程等待
//2. 如果h!=t表示等待隊列有其他節(jié)點存在,否則h==t表示只有一個節(jié)點,直接返回false,表示當前線程可以獲取資源
//3. 如果h!=t,則表示隊列中有線程在使用資源,繼續(xù)判斷(s = h.next) == null表示第二個節(jié)點為null,說明h的下一個節(jié)點異常,可能是在其他線程處理線程隊列過程中,直接返回true
//4.最后一個判斷s.thread != Thread.currentThread(),在上面所有可能后,判斷s線程不是當前線程,則說明已有其他線程索取到鎖了
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
2. 好了,公平鎖完了,下面看下非公平鎖怎么實現:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//.非公平鎖,不用判斷隊列中是否有其他節(jié)點,直接嘗試修改資源狀態(tài),如果成功,代表獲取到資源,直接返回
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
我們現在非公平鎖實現和公平鎖邏輯類似,只是少了一個hasQueuedPredecessors方法,不在去判斷是否前面有線程排隊而已.
引申出參數(int acquires)
在獨占鎖中, acquires一直為1,表示占有鎖到永遠只有一個,看下ReentrantLock的寫法:
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
而在CountDownLatch等共享鎖到實現中,此值也是1,但其實現類中,通過構造器添加共享鎖中的資源可用數.
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
...........................................
//初始設值,選擇共享鎖容量
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
好了,上面就分析完了aqs實現的重點邏輯.
AQS研究系列(一)--Unsafe使用
AQS研究系列(二)--線程狀態(tài)和interrupt()、interrupted()、isInterrupted等方法學習