1 概述
哪里使用AQS?我們最常用的ReentrantLock類其實就是使用CAS和AQS來實現(xiàn)的。
ReentrantLock的構造方法中,sync對象其實就是繼承了AbstractQueuedSynchronizer(AQS)。
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
....
}
public ReentrantLock() {
sync = new NonfairSync();
}
AQS翻譯過來是,抽象隊列同步器,其定義了一套多線程訪問共享變量的同步器框架。
內(nèi)部類Sync就是自定義同步器,采用內(nèi)部類的方式也是作者建議的。
AbstractQueuedSynchronizer的源碼里作者還給了一個圖。
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
畫詳細點

2 state
state我們可以理解成為表示當前同步情況的一個狀態(tài)變量。state是使用volatile修飾的一個變量,它的值代表鎖的狀態(tài),state > 0表示鎖被使用,state < 0表示鎖被釋放。
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
getState用于獲取state的值,比如一個線程調(diào)用該方法查看state的值是不是0,來判斷當前是不是鎖定的狀態(tài)。
setState用來直接將state的值修改成其他,比如重入鎖重入的時候將state的值+1。
compareAndSetState使用CAS的方式修改state的值,比如線程獲取到鎖嘗試修改state的值為1。
state有兩種共享方式:Exclusive(獨占,只能有一個線程使用state)和 Share(共享,多個線程可以同時使用state)。
3 CLH同步隊列
這里的CLH同步隊列是一個FIFO(先進先出的)雙向隊列,元素的節(jié)點類型為Node,并且通過head和tail來記錄隊首和隊尾的元素。Node節(jié)點對象用來維護需要獲取鎖的線程,當前一個節(jié)點釋放鎖的時候,該節(jié)點的線程就會被喚醒。
private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final boolean isShared() {
return nextWaiter == SHARED;
}
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
說明一下幾個重要的常量和變量:
SHARED:表示被標記的線程是因為以共享的方式獲取state失敗而進入阻塞隊列的;
EXCLUSIV:表示被標記的線程是因為以獨占的方式獲取state失敗而進入阻塞隊列的;
-
waitStatus:表示線程等待的狀態(tài)
static final int CANCELLED = 1; 表示線程因為中斷或者等待超時,需要從等待隊列中取消等待;
static final int SIGNAL = -1; 鎖被占用,隊列中的head(僅僅代表頭結點,里面沒有存放線程引用)的后繼結點node1處于等待狀態(tài),如果已占有鎖的線程釋放鎖或被CANCEL之后就會通知這個結點node1去獲取鎖執(zhí)行。
static final int CONDITION = -2; 表示結點在等待隊列中(這里指的是等待在某個lock的condition上,關于Condition的原理下面會寫到),當持有鎖的線程調(diào)用了Condition的signal()方法之后,結點會從該condition的等待隊列轉移到該lock的同步隊列上,去競爭lock。(注意:這里的同步隊列就是我們說的AQS維護的FIFO隊列,等待隊列則是每個condition關聯(lián)的隊列)
static final int PROPAGATE = -3; 表示下一次共享狀態(tài)獲取將會傳遞給后繼結點獲取這個共享同步狀態(tài)。
我們可以通過繼承AbstractQueuedSynchronizer類來自己實現(xiàn)一個鎖,使用的時候需要重寫一些指定的方法(模板方法模式)。
//獨占式的獲取同步狀態(tài),實現(xiàn)該方法需要查詢當前狀態(tài)并判斷同步狀態(tài)是否符合預期,然后再進行CAS設置同步狀態(tài)
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
//獨占式的釋放同步狀態(tài),等待獲取同步狀態(tài)的線程可以有機會獲取同步狀態(tài)
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
//共享式的獲取同步狀態(tài)
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
//嘗試將狀態(tài)設置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調(diào)用。
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
//當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占
protected int isHeldExclusively(int arg) { throw new UnsupportedOperationException();}
4 使用ReentrantLock分析
我們通過ReentrantLock來幫助了解一下加鎖和釋放鎖時state和CLH隊列的具體操作情況。
4.1 先來看lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
很好理解,state沒有被占用的時候,當前線程使用鎖,并將獨占鎖的線程持有者設置為自己。
否則,調(diào)用acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
第一個條件
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//(1)獲取當前線程
final Thread current = Thread.currentThread();
//(2)獲得當前同步狀態(tài)state
int c = getState();
//(3)如果state==0,表示沒有線程獲取
if (c == 0) {
//(3-1)那么就嘗試以CAS的方式更新state的值
if (compareAndSetState(0, acquires)) {
//(3-2)如果更新成功,就設置當前獨占模式下同步狀態(tài)的持有者為當前線程
setExclusiveOwnerThread(current);
//(3-3)獲得成功之后,返回true
return true;
}
}
//(4)這里是重入鎖的邏輯
else if (current == getExclusiveOwnerThread()) {
//(4-1)判斷當前占有state的線程就是當前來再次獲取state的線程之后,就計算重入后的state
int nextc = c + acquires;
//(4-2)這里是風險處理
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//(4-3)通過setState無條件的設置state的值,(因為這里也只有一個線程操作state的值,即
//已經(jīng)獲取到的線程,所以沒有進行CAS操作)
setState(nextc);
return true;
}
//(5)沒有獲得state,也不是重入,就返回false
return false;
}
第一個條件總結來說就是再次嘗試占用state,如果成功當前線程使用鎖,如果失敗并且當前線程和正在使用鎖的線程是同一個,那么重入該鎖。如果都不是,返回false。
第二個條件又分為幾個方法,我們都看一下
private Node addWaiter(Node mode) {
//(1)將當前線程以及阻塞原因(是因為SHARED模式獲取state失敗還是EXCLUSIVE獲取失敗)構造為Node結點
Node node = new Node(Thread.currentThread(), mode);
//(2)這一步是快速將當前線程插入隊列尾部
Node pred = tail;
if (pred != null) {
//(2-1)將構造后的node結點的前驅結點設置為tail
node.prev = pred;
//(2-2)以CAS的方式設置當前的node結點為tail結點
if (compareAndSetTail(pred, node)) {
//(2-3)CAS設置成功,就將原來的tail的next結點設置為當前的node結點。這樣這個雙向隊
//列就更新完成了
pred.next = node;
return node;
}
}
//(3)執(zhí)行到這里,說明要么當前隊列為null,要么存在多個線程競爭失敗都去將自己設置為tail結點,
//那么就會有線程在上面(2-2)的CAS設置中失敗,就會到這里調(diào)用enq方法
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
//(4)還是先獲取當前隊列的tail結點
Node t = tail;
//(5)如果tail為null,表示當前同步隊列為null,就必須初始化這個同步隊列的head和tail(建
//立一個哨兵結點)
if (t == null) {
//(5-1)初始情況下,多個線程競爭失敗,在檢查的時候都發(fā)現(xiàn)沒有哨兵結點,所以需要CAS的
//設置哨兵結點
if (compareAndSetHead(new Node()))
tail = head;
}
//(6)tail不為null
else {
//(6-1)直接將當前結點的前驅結點設置為tail結點
node.prev = t;
//(6-2)前驅結點設置完畢之后,還需要以CAS的方式將自己設置為tail結點,如果設置失敗,
//就會重新進入循環(huán)判斷一遍
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//在這樣一個循環(huán)中嘗試tryAcquire同步狀態(tài)
for (;;) {
//獲取前驅結點
final Node p = node.predecessor();
//(1)如果前驅結點是頭節(jié)點,就嘗試取獲取同步狀態(tài),這里的tryAcquire方法相當于還是調(diào)
//用NofairSync的tryAcquire方法,在上面已經(jīng)說過
if (p == head && tryAcquire(arg)) {
//如果前驅結點是頭節(jié)點并且tryAcquire返回true,那么就重新設置頭節(jié)點為node
setHead(node);
p.next = null; //將原來的頭節(jié)點的next設置為null,交由GC去回收它
failed = false;
return interrupted;
}
//(2)如果不是頭節(jié)點,或者雖然前驅結點是頭節(jié)點但是嘗試獲取同步狀態(tài)失敗就會將node結點
//的waitStatus設置為-1(SIGNAL),并且park自己,等待前驅結點的喚醒。至于喚醒的細節(jié)
//下面會說到
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//(1)獲取前驅結點的waitStatus
int ws = pred.waitStatus;
//(2)如果前驅結點的waitStatus為SINGNAL,就直接返回true
if (ws == Node.SIGNAL)
//前驅結點的狀態(tài)為SIGNAL,那么該結點就能夠安全的調(diào)用park方法阻塞自己了。
return true;
if (ws > 0) {
//(3)這里就是將所有的前驅結點狀態(tài)為CANCELLED的都移除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//CAS操作將這個前驅節(jié)點設置成SIGHNAL。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
模擬一下,
- 假設線程A已經(jīng)持有鎖,并將state修改成1;
- 線程B通過lock方法進入到addWaiter,此時還沒有維護隊列,tail=nill;
- 進入enq方法的自旋中;
- 第一次循環(huán) t = tail = null,進入if條件,head = tail = new Node();
- 第二次循環(huán) t = tail = new Node(),進入else,將node(線程B)的上一個節(jié)點設置為t,將node設置為新的tail,將t的下一個節(jié)點設置為node,退出循環(huán);
- 進入到acquireQueued方法,還是自旋,先獲取node的上一個結點;
- 第一次循環(huán),判斷上一個節(jié)點是head并且當前線程B可以獲取到state;
- 第7步通過,將head設置為node,返回false;
- 顯然第7步不通過,進入下面的條件,先進入shouldParkAfterFailedAcquire方法;
- 判斷上一個節(jié)點(pred)的waitStatus不是 -1,進入else,將上一個節(jié)點(pred)的waitStatus設置為 -1,返回false;
- 第二次循環(huán),進入shouldParkAfterFailedAcquire方法,由于上一次循環(huán)修改waitStatus為-1,直接返回true;
- 再看parkAndCheckInterrupt方法,將線程B pack阻塞;
- 自旋并沒有結束,只是被掛起了,這個自旋在后面還有用。
這時,隊列是這樣的

再來一個線程C,變成這樣

4.2 再來看unlock方法
public void unlock() {
sync.release(1); //這里ReentrantLock的unlock方法調(diào)用了AQS的release方法
}
public final boolean release(int arg) {
//這里調(diào)用了子類的tryRelease方法,即ReentrantLock的內(nèi)部類Sync的tryRelease方法
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先我們能看到,重入鎖在獲取鎖和釋放鎖的時候對state的操作參數(shù)都是1,這也就是為什么lock了幾次就要unlock幾次的原因。
protected final boolean tryRelease(int releases) {
//(1)獲取當前的state,然后減1,得到要更新的state
int c = getState() - releases;
//(2)判斷當前調(diào)用的線程是不是持有鎖的線程,如果不是拋出IllegalMonitorStateException
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//(3)判斷更新后的state是不是0
if (c == 0) {
free = true;
//(3-1)將當前鎖持者設為null
setExclusiveOwnerThread(null);
}
//(4)設置當前state=c=getState()-releases
setState(c);
//(5)只有state==0,才會返回true
return free;
}
private void unparkSuccessor(Node node) {
//(1)獲得node的waitStatus
int ws = node.waitStatus;
//(2)判斷waitStatus是否小于0
if (ws < 0)
//(2-1)如果waitStatus小于0需要將其以CAS的方式設置為0
compareAndSetWaitStatus(node, ws, 0);
//(2)獲得s的后繼結點,這里即head的后繼結點
Node s = node.next;
//(3)判斷后繼結點是否已經(jīng)被移除,或者其waitStatus==CANCELLED
if (s == null || s.waitStatus > 0) {
//(3-1)如果s!=null,但是其waitStatus=CANCELLED需要將其設置為null
s = null;
//(3-2)會從尾部結點開始尋找,找到離head最近的不為null并且node.waitStatus的結點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//(4)node.next!=null或者找到的一個離head最近的結點不為null
if (s != null)
//(4-1)喚醒這個結點中的線程
LockSupport.unpark(s.thread);
}
我們還是分析一下代碼
- 進入tryRelease方法,將state的值 -1 ,如果state = 0 就返回true,否則返回 false;
- 進入unparkSuccessor方法,先將head的waitStatus修改成0,在判斷head的后續(xù)節(jié)點的waitStatus是不是 < 0 ;
- 小于 0 就喚醒,否則一直向下查找直到出現(xiàn)waitStatus小于 0 的節(jié)點,并且將它喚醒;
后面程序是怎么執(zhí)行的呢?
- 回憶一下上面的lock邏輯,這時候線程B被喚醒了,繼續(xù)自旋;
- 線程B獲取state成功,就會從acquireQueued方法中退出,最后執(zhí)行自己鎖住的代碼塊中的程序。