AbstractQueuedSynchronizer簡稱AQS
Java并發(fā)編程核心在于java.concurrent.util包而juc當中的大多數(shù)同步器 實現(xiàn)都是圍繞著共同的基礎行為,比如等待隊列、條件隊列、獨占獲取、共享獲 取等,而這個行為的抽象就是基于AbstractQueuedSynchronizer簡稱AQS,AQS定 義了一套多線程訪問共享資源的同步器框架,是一個依賴狀態(tài)(state)的同步器。
AQS具備特性
- 阻塞等待隊列
- 共享/獨占
- 公平/非公平
- 可重入
- 允許中斷
例如Java.concurrent.util當中同步器的實現(xiàn)如Lock,Latch,Barrier等,都是基 于AQS框架實現(xiàn)
- 一般通過定義內(nèi)部類Sync繼承AQS
- 將同步器所有調(diào)用都映射到Sync對應的方法
AQS內(nèi)部維護屬性 volatile int state
? state表示資源的可用狀態(tài)
State三種訪問方式
? getState()、setState()、compareAndSetState()
AQS定義兩種資源共享方式
- Exclusive-獨占,只有一個線程能執(zhí)行,如ReentrantLock
- Share-共享,多個線程可以同時執(zhí)行,如Semaphore/CountDownLatch
AQS定義兩種隊列
- 同步等待隊列
- 條件等待隊列
不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現(xiàn)時只 需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護 (如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在頂層實現(xiàn)好了。自定義同步器 實現(xiàn)時主要實現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。只有用到 condition才需要去實現(xiàn)它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗 則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗 則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數(shù)表示失敗; 0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許 喚醒后續(xù)等待結點返回true,否則返回false。
同步等待隊列
AQS當中的同步等待隊列也稱CLH隊列,CLH隊列是Craig、Landin、 Hagersten三人發(fā)明的一種基于雙向鏈表數(shù)據(jù)結構的隊列,是FIFO先入先出線程等待隊列,Java中的CLH隊列是原CLH隊列的一個變種,線程由原自旋機制改為阻 塞機制。

條件等待隊列
Condition是一個多線程間協(xié)調(diào)通信的工具類,使得某個,或者某些線程一 起等待某個條件(Condition),只有當該條件具備時,這些等待線程才會被喚 醒,從而重新爭奪鎖

ReentrantLock簡介
ReentrantLock 重入鎖實現(xiàn)了 Lock和 java.io.Serializable接口,并提供了與 synchronized相同的互斥性和內(nèi)存可見性,ReentrantLock 提供了可重入的加鎖語義,能夠?qū)蚕碣Y源能夠重復加鎖,即當前線程獲取該鎖再次獲取不會被阻塞,并且與synchronized相比,它還為處理鎖的不可用性提供了更高的靈活性,與此同時,ReentrantLock 還支持公平鎖和非公平鎖兩種方式。
ReentrantLock類層次結構:

Lock lock = new ReentrantLock();
lock.lock();
try{
//更新對象狀態(tài)
//捕獲異常,并在必須時恢復不變性條件
}catch (Exception e){
e.printStackTrace();
} finally {
lock.unlock();
}
可重入性
可重入鎖,也叫做 遞歸鎖,從名字上理解,字面意思就是再進入的鎖,重入性是指任意線程在獲取到鎖之后能夠再次獲取該鎖而不會被鎖阻塞,首先他需要具備兩個條件:1) 線程再次獲取鎖:所需要去識別獲取鎖的線程是否為當前占據(jù)鎖的線程,如果是,則再次獲取成功 2) 鎖的最終釋放:線程重復n次獲取了鎖,隨后在第n次釋放該鎖后,其它線程能夠獲取到該鎖。鎖的最終釋放要求鎖對于獲取進行計數(shù)自增,計數(shù)表示當前線程被重復獲取的次數(shù),而被釋放時,計數(shù)自減,當計數(shù)為0時表示鎖已經(jīng)成功釋放。
鎖的實現(xiàn)
使用ReentrantLock 案例:
/**
* 無參的構造函數(shù)
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 有參構造函數(shù)
* 參數(shù)為布爾類型
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
上述代碼中是使用Lock接口的標準使用方式,這種形式比使用內(nèi)置鎖(synchronized )復雜一些,必須要在 finally 塊中釋放鎖,否則,如果在被保護的代碼中拋出了異常,那么這個鎖永遠都無法釋放。
ReentrantLock 源碼分析
在簡介中我們知道 ReentrantLock繼承自 Lock接口, Lock提供了一些獲取鎖和釋放鎖的方法,以及條件判斷的獲取的方法,通過實現(xiàn)它來進行鎖的控制,因為它是顯示鎖,所以需要顯示指定起始位置和終止位置,下面就來介紹一下 Lock接口的方法介紹:
| 方法名稱 | 方法描述 |
|---|---|
| lock | 用來獲取鎖,如果鎖已被其他線程獲取,則進行等待 |
| tryLock | 表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失?。存i已被其他線程獲取),則返回false,也就說這個方法無論如何都會立即返回。在拿不到鎖時不會一直在那等待 |
| tryLock(long time, TimeUnit unit) | 和tryLock()類似,區(qū)別在于它在拿不到鎖時會等待一定的時間,在時間期限之內(nèi)如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內(nèi)拿到了鎖,則返回true |
| lockInterruptibly | 獲取鎖,如果獲取鎖失敗則進行等到,如果等待的線程被中斷會相應中斷信息 |
| unlock | 釋放鎖的操作 |
| newCondition | 獲取Condition對象,該組件和當前的鎖綁定,當前線程只有獲得了鎖,才能調(diào)用該組件wait()方法,而調(diào)用后,當前線程釋放鎖 |
ReentrantLock 也實現(xiàn)了上面接口的內(nèi)容,同時 ReentrantLock 提供了 公平鎖和 非公平鎖兩種模式,如果沒有特別的去指定使用何種方式,那么 ReentrantLock 會默認為 非公平鎖,首先我們來看一下 ReentrantLock 的構造函數(shù):
/**
* 無參的構造函數(shù)
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 有參構造函數(shù)
* 參數(shù)為布爾類型
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
從上述源碼中我們可以看到:
? ReentrantLock 優(yōu)先使用的是無參構造函數(shù),也就是非公平鎖,但是當我們調(diào)用有參構造函數(shù)時,可以指定使用哪種鎖來進行操作(公平鎖還是非公平鎖),參數(shù)為布爾類型,如果指定為 false 的話代表 非公平鎖 ,如果指定為 true 的話代表的是 公平鎖
? Sync 類 是 ReentrantLock 自定義的同步組件,它是 ReentrantLock 里面的一個內(nèi)部類,它繼承自AQS(AbstractQueuedSynchronizer),Sync 有兩個子類:公平鎖 FairSync 和 非公平鎖 NonfairSync
? ReentrantLock 的獲取與釋放鎖操作都是委托給該同步組件來實現(xiàn)的。下面我們來看一看非公平鎖的 lock() 方法:
非公平鎖 NonfairSync.lock()
lock方法詳解
在初始化 ReentrantLock 的時候,如果我們不傳參,使用默認的構造函數(shù),那么默認使用非公平鎖,也就是 NonfairSync2) 當我們調(diào)用 ReentrantLock 的 lock() 方法的時候,實際上是調(diào)用了 NonfairSync 的 lock() 方法,代碼如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//這個方法先用CAS操作,去嘗試搶占該鎖
// 快速嘗試將state從0設置成1,如果state=0代表當前沒有任何一個線程獲得了鎖
if (compareAndSetState(0, 1))
//state設置成1代表獲得鎖成功
//如果成功,就把當前線程設置在這個鎖上,表示搶占成功,在重入鎖的時候需要
setExclusiveOwnerThread(Thread.currentThread());
else
//如果失敗,則調(diào)用 AbstractQueuedSynchronizer.acquire() 模板方法,等待搶占。
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
調(diào)用 acquire(1) 實際上使用的是 AbstractQueuedSynchronizer 的acquire() 方法,它是一套鎖搶占的模板,acquire() 代碼比較簡單:
public final void acquire(int arg) {
//先去嘗試獲取鎖,如果沒有獲取成功,就在CLH隊列中增加一個當前線程的節(jié)點,表示等待搶占。
//然后進入CLH隊列的搶占模式,進入的時候也會去執(zhí)行一次獲取鎖的操作,如果還是獲取不到,
//就調(diào)用LockSupport.park() 將當前線程掛起。那么當前線程什么時候會被喚醒呢?當
//持有鎖的那個線程調(diào)用 unlock() 的時候,會將CLH隊列的頭節(jié)點的下一個節(jié)點上的線程
//喚醒,調(diào)用的是 LockSupport.unpark() 方法。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire() 會先調(diào)用 tryAcquire() 這個鉤子方法去嘗試獲取鎖,這個方法就是在 NonfairSync.tryAcquire()下的 nonfairTryAcquire(),源碼如下:
//一個嘗試插隊的過程
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//獲取state值
int c = getState();
//比較鎖的狀態(tài)是否為 0,如果是0,當前沒有任何一個線程獲取鎖
if (c == 0) {
//則嘗試去原子搶占這個鎖(設置狀態(tài)為1,然后把當前線程設置成獨占線程)
if (compareAndSetState(0, acquires)) {
// 設置成功標識獨占鎖
setExclusiveOwnerThread(current);
return true;
}
}
//如果當前鎖的狀態(tài)不是0 state!=0,就去比較當前線程和占用鎖的線程是不是一個線程
else if (current == getExclusiveOwnerThread()) {
//如果是,增加狀態(tài)變量的值,從這里看出可重入鎖之所以可重入,就是同一個線程可以反復使用它占用的鎖
int nextc = c + acquires;
//重入次數(shù)太多,大過Integer.MAX
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果以上兩種情況都不通過,則返回失敗false
return false;
}
tryAcquire() 一旦返回 false,就會則進入 acquireQueued() 流程,也就是基于CLH隊列的搶占模式,在CLH鎖隊列尾部增加一個等待節(jié)點,這個節(jié)點保存了當前線程,通過調(diào)用 addWaiter() 實現(xiàn),這里需要考慮初始化的情況,在第一個等待節(jié)點進入的時候,需要初始化一個頭節(jié)點然后把當前節(jié)點加入到尾部,后續(xù)則直接在尾部加入節(jié)點。
代碼如下:
//AbstractQueuedSynchronizer.addWaiter()
private Node addWaiter(Node mode) {
// 初始化一個節(jié)點,用于保存當前線程
Node node = new Node(Thread.currentThread(), mode);
// 當CLH隊列不為空的視乎,直接在隊列尾部插入一個節(jié)點
Node pred = tail;
if (pred != null) {
node.prev = pred;
//如果pred還是尾部(即沒有被其他線程更新),則將尾部更新為node節(jié)點(即當前線程快速設置成了隊尾)
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 當CLH隊列為空的時候,調(diào)用enq方法初始化隊列
enq(node);
return node;
}
private Node enq(final Node node) {
//在一個循環(huán)里不停的嘗試將node節(jié)點插入到隊尾里
for (;;) {
Node t = tail;
if (t == null) { // 初始化節(jié)點,頭尾都指向一個空節(jié)點
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
將節(jié)點增加到CLH隊列后,進入 acquireQueued() 方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//在一個循環(huán)里不斷等待前驅(qū)節(jié)點執(zhí)行完畢
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {// 通過tryAcquire獲得鎖,如果獲取到鎖,說明頭節(jié)點已經(jīng)釋放了鎖
setHead(node);//將當前節(jié)點設置成頭節(jié)點
p.next = null; // help GC//將上一個節(jié)點的next變量被設置為null,在下次GC的時候會清理掉
failed = false;//將failed標記設置成false
return interrupted;
}
//中斷
if (shouldParkAfterFailedAcquire(p, node) && // 是否需要阻塞
parkAndCheckInterrupt())// 阻塞,返回線程是否被中斷
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
如果嘗試獲取鎖失敗,就會進入 shouldParkAfterFailedAcquire() 方法,會判斷當前線程是否阻塞
/**
* 確保當前結點的前驅(qū)結點的狀態(tài)為SIGNAL
* SIGNAL意味著線程釋放鎖后會喚醒后面阻塞的線程
* 只有確保能夠被喚醒,當前線程才能放心的阻塞。
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前驅(qū)節(jié)點狀態(tài)為SIGNAL
//表明當前線程需要阻塞,因為前置節(jié)點承諾執(zhí)行完之后會通知喚醒當前節(jié)點
return true;
if (ws > 0) {//ws > 0 代表前驅(qū)節(jié)點取消了
do {
node.prev = pred = pred.prev;//不斷的把前驅(qū)取消了的節(jié)點移除隊列
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//初始化狀態(tài),將前驅(qū)節(jié)點的狀態(tài)設置成SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
當進入阻塞階段,會進入parkAndCheckInterrupt() 方法,則會調(diào)用 LockSupport.park(this) 將當前線程掛起。代碼如下:
// 從方法名可以看出這個方法做了兩件事
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//掛起當前的線程
// 如果當前線程已經(jīng)被中斷了,返回true,否則返回false
// 有可能在掛起階段被中斷了
return Thread.interrupted();
}
非公平鎖 NonfairSync.unlock()
調(diào)用 unlock() 方法,其實是直接調(diào)用 AbstractQueuedSynchronizer.release() 操作。
進入 release() 方法,內(nèi)部先嘗試 tryRelease() 操作,主要是去除鎖的獨占線程,然后將狀態(tài)減一,這里減一主要是考慮到可重入鎖可能自身會多次占用鎖,只有當狀態(tài)變成0,才表示完全釋放了鎖。
如果 tryRelease 成功,則將CHL隊列的頭節(jié)點的狀態(tài)設置為0,然后喚醒下一個非取消的節(jié)點線程。
一旦下一個節(jié)點的線程被喚醒,被喚醒的線程就會進入 acquireQueued() 代碼流程中,去獲取鎖。
代碼如下:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//嘗試在當前鎖的鎖定計數(shù)(state)值上減1,
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)//waitStatus!=0表明或者處于CANCEL狀態(tài),或者是SIGNAL表示下一個線程在等待其喚醒。也就是說waitStatus不為零表示它的后繼在等待喚醒。
unparkSuccessor(h);
//成功返回true
return true;
}
//否則返回false
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//如果waitStatus < 0 則將當前節(jié)點清零
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//若后續(xù)節(jié)點為空或已被cancel,則從尾部開始找到隊列中第一個waitStatus<=0,即未被cancel的節(jié)點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
當然在 release() 方法中不僅僅只是將 state - 1 這么簡單,-1 之后還需要進行一番處理,如果 -1 之后的 新state = 0 ,則表示當前鎖已經(jīng)被線程釋放了,同時會喚醒線程等待隊列中的下一個線程。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//判斷是否為當前線程在調(diào)用,不是拋出IllegalMonitorStateException異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//c == 0,釋放該鎖,同時將當前所持有線程設置為null
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
//設置state
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 從后往前找到離head最近,而且waitStatus <= 0 的節(jié)點
// 其實在ReentrantLock中,waitStatus應該只能為0和-1,需要喚醒的都是-1(Node.SIGNAL)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);// 喚醒掛起線程
}
重點:unlock最好放在finally中,因為如果沒有使用finally來釋放Lock,那么相當于啟動了一個定時炸彈,如果發(fā)生錯誤,我們很難追蹤到最初發(fā)生錯誤的位置,因為沒有記錄應該釋放鎖的位置和時間,這也就是 ReentrantLock 不能完全替代 synchronized 的原因,因為當程序執(zhí)行控制離開被保護的代碼塊時,不會自動清除鎖。
公平鎖 FairSync
FairSync相對來說就簡單很多,只有重寫的兩個方法跟NonfairSync不同
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&// 沒有前驅(qū)節(jié)點了
compareAndSetState(0, acquires)) {// 而且沒有鎖
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平鎖和非公平鎖的區(qū)別
- 鎖的公平性是相對于獲取鎖的順序而言的。
- 如果是一個公平鎖,那么鎖的獲取順序就應該符合請求的絕對時間順序,也就是FIFO,線程獲取鎖的順序和調(diào)用lock的順序一樣,能夠保證老的線程排隊使用鎖,新線程仍然排隊使用鎖。
- 非公平鎖只要CAS設置同步狀態(tài)成功,則表示當前線程獲取了鎖,線程獲取鎖的順序和調(diào)用lock的順序無關,全憑運氣,也就是老的線程排隊使用鎖,但是無法保證新線程搶占已經(jīng)在排隊的線程的鎖。
- ReentrantLock默認使用非公平鎖是基于性能考慮,公平鎖為了保證線程規(guī)規(guī)矩矩地排隊,需要增加阻塞和喚醒的時間開銷。如果直接插隊獲取非公平鎖,跳過了對隊列的處理,速度會更快。
ReenTrantLock(可重入鎖) 和 synchronized的區(qū)別
可重入性
- ReenTrantLock(可重入鎖) 的字面意思就是再進入的鎖,對于 synchronized 關鍵字所使用的鎖也是可重入的,兩者關于這個的區(qū)別不大。兩者都是同一個線程沒進入一次,鎖的計數(shù)器都自增1,所以要等到鎖的計數(shù)器下降為0時才能釋放鎖。
鎖的實現(xiàn)
- Synchronized 是依賴于JVM實現(xiàn)的,而 ReenTrantLock 是JDK實現(xiàn)的,有什么區(qū)別,說白了就類似于操作系統(tǒng)來控制實現(xiàn)和用戶自己敲代碼實現(xiàn)的區(qū)別。前者的實現(xiàn)是比較難見到的,后者有直接的源碼可供閱讀。
性能的區(qū)別
- 在Synchronized優(yōu)化以前,synchronized的性能是比ReenTrantLock差很多的,但是自從Synchronized引入了偏向鎖,輕量級鎖(自旋鎖)后,兩者的性能就差不多了,在兩種方法都可用的情況下,官方甚至建議使用synchronized,其實synchronized的優(yōu)化我感覺就借鑒了ReenTrantLock中的CAS技術。都是試圖在用戶態(tài)就把加鎖問題解決,避免進入內(nèi)核態(tài)的線程阻塞。
功能區(qū)別
便利性:很明顯Synchronized的使用比較方便簡潔,并且由編譯器去保證鎖的加鎖和釋放,而ReenTrantLock需要手工聲明來加鎖和釋放鎖,為了避免忘記手工釋放鎖造成死鎖,所以最好在finally中聲明釋放鎖。
鎖的細粒度和靈活度:很明顯 ReenTrantLock 優(yōu)于 Synchronized ,但是 ReenTrantLock 沒有辦法完全取代 Synchronized
ReenTrantLock獨有的能力
ReenTrantLock 可以指定是公平鎖還是非公平鎖。而 synchronized 只能是非公平鎖。所謂的公平鎖就是先等待的線程先獲得鎖。
ReenTrantLock 提供了一個 Condition(條件)類,用來實現(xiàn)分組喚醒需要喚醒的線程們,而不是像synchronized 要么隨機喚醒一個線程要么喚醒全部線程。
ReenTrantLock 提供了一種能夠中斷等待鎖的線程的機制,通過 lock.lockInterruptibly() 來實現(xiàn)這個機制。
總結
ReentrantLock 是java中非常重要的一個并發(fā)工具,相比于java原生的 synchronized 有著更好的性能,學習 ReentrantLock ,我們主要需要了解它,公平鎖 和 非公平鎖 的實現(xiàn),以及重入鎖的獲取與釋放的流程,還有最重要的就是要了解AQS(AbstractQueuedSynchronizer),這是實現(xiàn)重入鎖的基礎,ReentrantLock 是一個比較輕量級的鎖,而且使用面向?qū)ο蟮乃枷肴崿F(xiàn)了鎖的功能,比原來的synchronized關鍵字更加好理解。