前言
JDK1.5以前只有synchronized同步鎖,并且效率非常低,因此大神Doug Lea自己寫了一套并發(fā)框架,這套框架的核心就在于AbstractQueuedSynchronizer類(即AQS),性能非常高,所以被引入JDK包中,即JUC。那么AQS是怎么實(shí)現(xiàn)的呢?本篇就是對AQS及其相關(guān)組件進(jìn)行分析,了解其原理,并領(lǐng)略大神的優(yōu)美而又精簡的代碼。
AbstractQueuedSynchronizer
AQS是JUC下最核心的類,沒有之一,所以我們先來分析一下這個(gè)類的數(shù)據(jù)結(jié)構(gòu)。
AQS內(nèi)部是使用了雙向鏈表將等待線程鏈接起來,當(dāng)發(fā)生并發(fā)競爭的時(shí)候,就會(huì)初始化該隊(duì)列并讓線程進(jìn)入睡眠等待喚醒,同時(shí)每個(gè)節(jié)點(diǎn)會(huì)根據(jù)是否為共享鎖標(biāo)記狀態(tài)為共享模式或獨(dú)占模式。這個(gè)數(shù)據(jù)結(jié)構(gòu)需要好好理解并牢牢記住,下面分析的組件都將基于此實(shí)現(xiàn)。
Lock
Lock是一個(gè)接口,提供了加/解鎖的通用API,JUC主要提供了兩種鎖,ReentrantLock和ReentrantReadWriteLock,前者是重入鎖,實(shí)現(xiàn)Lock接口,后者是讀寫鎖,本身并沒有實(shí)現(xiàn)Lock接口,而是其內(nèi)部類ReadLock或WriteLock實(shí)現(xiàn)了Lock接口。先來看看Lock都提供了哪些接口:
// 普通加鎖,不可打斷;未獲取到鎖進(jìn)入AQS阻塞
void lock();
// 可打斷鎖
void lockInterruptibly() throws InterruptedException;
// 嘗試加鎖,未獲取到鎖不阻塞,返回標(biāo)識(shí)
boolean tryLock();
// 帶超時(shí)時(shí)間的嘗試加鎖
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 解鎖
void unlock();
// 創(chuàng)建一個(gè)條件隊(duì)列
Condition newCondition();
看到這里讀者們可以先思考下,自己如何來實(shí)現(xiàn)上面這些接口。
ReentrantLock
加鎖
synchronized和ReentrantLock都是可重入的,后者使用更加靈活,也提供了更多的高級特性,但其本質(zhì)的實(shí)現(xiàn)原理是差不多的(據(jù)說synchronized是借鑒了ReentrantLock的實(shí)現(xiàn)原理)。ReentrantLock提供了兩個(gè)構(gòu)造方法:
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
有參構(gòu)造是根據(jù)參數(shù)創(chuàng)建公平鎖或非公平鎖,而無參構(gòu)造默認(rèn)則是非公平鎖,因?yàn)榉枪芥i性能非常高,并且大部分業(yè)務(wù)并不需要使用公平鎖。至于為什么非公平鎖性能很高,咱們接著往下看。
非公平鎖/公平鎖
lock
非公平鎖和公平鎖在實(shí)現(xiàn)上基本一致,只有個(gè)別的地方不同,因此下面會(huì)采用對比分析方法進(jìn)行分析。
從lock方法開始:
public void lock() {
sync.lock();
}
實(shí)際上是委托給了內(nèi)部類Sync,該類實(shí)現(xiàn)了AQS(其它組件實(shí)現(xiàn)方法也基本上都是這個(gè)套路);由于有公平和非公平兩種模式,因此該類又實(shí)現(xiàn)了兩個(gè)子類:FairSync和NonfairSync:
// 非公平鎖
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// 公平鎖
final void lock() {
acquire(1);
}
這里就是公平鎖和非公平鎖的第一個(gè)不同,非公平鎖首先會(huì)調(diào)用CAS將state從0改為1,如果能改成功則表示獲取到鎖,直接將exclusiveOwnerThread設(shè)置為當(dāng)前線程,不用再進(jìn)行后續(xù)操作;否則則同公平鎖一樣調(diào)用acquire方法獲取鎖,這個(gè)是在AQS中實(shí)現(xiàn)的模板方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire
這里兩種鎖唯一不同的實(shí)現(xiàn)就是tryAcquire方法,先來看非公平鎖的實(shí)現(xiàn):
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
state=0表示還沒有被線程持有鎖,直接通過CAS修改,能修改成功的就獲取到鎖,修改失敗的線程先判斷exclusiveOwnerThread是不是當(dāng)前線程,是則state+1,表示重入次數(shù)+1并返回true,加鎖成功,否則則返回false表示嘗試加鎖失敗并調(diào)用acquireQueued入隊(duì)。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 首尾不相等且頭結(jié)點(diǎn)線程不是當(dāng)前線程則表示需要進(jìn)入隊(duì)列
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
上面就是公平鎖的嘗試獲取鎖的代碼,可以看到基本和非公平鎖的代碼是一樣的,區(qū)別在于首次加鎖需要判斷是否已經(jīng)有隊(duì)列存在,沒有才去加鎖,有則直接返回false。
addWaiter
接著來看addWaiter方法,當(dāng)嘗試加鎖失敗時(shí),首先就會(huì)調(diào)用該方法創(chuàng)建一個(gè)Node節(jié)點(diǎn)并添加到隊(duì)列中去。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 尾節(jié)點(diǎn)不為null表示已經(jīng)存在隊(duì)列,直接將當(dāng)前線程作為尾節(jié)點(diǎn)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 尾結(jié)點(diǎn)不存在則表示還沒有初始化隊(duì)列,需要初始化隊(duì)列
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋
for (;;) {
Node t = tail;
if (t == null) { // 只會(huì)有一個(gè)線程設(shè)置頭節(jié)點(diǎn)成功
if (compareAndSetHead(new Node()))
tail = head;
} else { // 其它設(shè)置頭節(jié)點(diǎn)失敗的都會(huì)自旋設(shè)置尾節(jié)點(diǎn)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這里首先傳入了一個(gè)獨(dú)占模式的空節(jié)點(diǎn),并根據(jù)該節(jié)點(diǎn)和當(dāng)前線程創(chuàng)建了一個(gè)Node,然后判斷是否已經(jīng)存在隊(duì)列,若存在則直接入隊(duì),否則調(diào)用enq方法初始化隊(duì)列,提高效率。
此處還有一個(gè)非常細(xì)節(jié)的地方,為什么設(shè)置尾節(jié)點(diǎn)時(shí)都要先將之前的尾節(jié)點(diǎn)設(shè)置為node.pre的值呢,而不是在CAS之后再設(shè)置?比如像下面這樣:
if (compareAndSetTail(pred, node)) {
node.prev = pred;
pred.next = node;
return node;
}
因?yàn)槿绻@樣做的話,在CAS設(shè)置完tail后會(huì)存在一瞬間的tail.pre=null的情況,而Doug Lea正是考慮到這種情況,不論何時(shí)獲取tail.pre都不會(huì)為null。
acquireQueued
接著看acquireQueued方法:
final boolean acquireQueued(final Node node, int arg) {
// 為true表示存在需要取消加鎖的節(jié)點(diǎn),僅從這段代碼可以看出,
// 除非發(fā)生異常,否則不會(huì)存在需要取消加鎖的節(jié)點(diǎn)。
boolean failed = true;
try {
// 打斷標(biāo)記,因?yàn)檎{(diào)用的是lock方法,所以是不可打斷的
// (但實(shí)際上是打斷了的,只不過這里采用了一種**靜默**處理方式,稍后分析)
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
這里就是隊(duì)列中線程加鎖/睡眠的核心邏輯,首先判斷剛剛調(diào)用addWaiter方法添加到隊(duì)列的節(jié)點(diǎn)是否是頭節(jié)點(diǎn),如果是則再次嘗試加鎖,這個(gè)剛剛分析過了,非公平鎖在這里就會(huì)再次搶一次鎖,搶鎖成功則設(shè)置為head節(jié)點(diǎn)并返回打斷標(biāo)記;否則則和公平鎖一樣調(diào)用shouldParkAfterFailedAcquire判斷是否應(yīng)該調(diào)用park方法進(jìn)入睡眠。
park細(xì)節(jié)
為什么在park前需要這么一個(gè)判斷呢?因?yàn)楫?dāng)前節(jié)點(diǎn)的線程進(jìn)入park后只能被前一個(gè)節(jié)點(diǎn)喚醒,那前一個(gè)節(jié)點(diǎn)怎么知道有沒有后繼節(jié)點(diǎn)需要喚醒呢?因此當(dāng)前節(jié)點(diǎn)在park前需要給前一個(gè)節(jié)點(diǎn)設(shè)置一個(gè)標(biāo)識(shí),即將waitStatus設(shè)置為Node.SIGNAL(-1),然后自旋一次再走一遍剛剛的流程,若還是沒有獲取到鎖,則調(diào)用parkAndCheckInterrupt進(jìn)入睡眠狀態(tài)。
打斷
讀者可能會(huì)比較好奇Thread.interrupted這個(gè)方法是做什么用的。
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
這個(gè)是用來判斷當(dāng)前線程是否被打斷過,并清除打斷標(biāo)記(若是被打斷過則會(huì)返回true,并將打斷標(biāo)記設(shè)置為false),所以調(diào)用lock方法時(shí),通過interrupt也是會(huì)打斷睡眠的線程的,只是Doug Lea做了一個(gè)假象,讓用戶無感知;但有些場景又需要知道該線程是否被打斷過,所以acquireQueued最終會(huì)返回interrupted打斷標(biāo)記,如果是被打斷過,則返回的true,并在acquire方法中調(diào)用selfInterrupt再次打斷當(dāng)前線程(將打斷標(biāo)記設(shè)置為true)。
這里我們對比看看lockInterruptibly的實(shí)現(xiàn):
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到區(qū)別就在于使用lockInterruptibly加鎖被打斷后,是直接拋出InterruptedException異常,我們可以捕獲這個(gè)異常進(jìn)行相應(yīng)的處理。
取消
最后來看看cancelAcquire是如何取消加鎖的,該情況比較特殊,簡單了解下即可:
private void cancelAcquire(Node node) {
if (node == null)
return;
// 首先將線程置空
node.thread = null;
// waitStatus > 0表示節(jié)點(diǎn)處于取消狀態(tài),則直接將當(dāng)前節(jié)點(diǎn)的pre指向在此之前的最后一個(gè)有效節(jié)點(diǎn)
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 保存前一個(gè)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),如果在此之前存在取消節(jié)點(diǎn),這里就是之前取消被取消節(jié)點(diǎn)的頭節(jié)點(diǎn)
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
// 當(dāng)前節(jié)點(diǎn)是tail節(jié)點(diǎn),則替換尾節(jié)點(diǎn),替換成功則將新的尾結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)設(shè)置為null;
// 否則需要判斷是將當(dāng)前節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)賦值給最后一個(gè)有效節(jié)點(diǎn),還是喚醒下一個(gè)節(jié)點(diǎn)。
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
解鎖
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 并發(fā)情況下,可能已經(jīng)被其它線程喚醒或已經(jīng)取消,則從后向前找到最后一個(gè)有效節(jié)點(diǎn)并喚醒
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
解鎖就比較簡單了,先調(diào)用tryRelease對state執(zhí)行減一操作,如果state==0,則表示完全釋放鎖;若果存在后繼節(jié)點(diǎn),則調(diào)用unparkSuccessor喚醒后繼節(jié)點(diǎn),喚醒后的節(jié)點(diǎn)的waitStatus會(huì)重新被設(shè)置為0.
只是這里有一個(gè)小細(xì)節(jié),為什么是從后向前找呢?因?yàn)槲覀冊陂_始說過,設(shè)置尾節(jié)點(diǎn)保證了node.pre不會(huì)為null,但pre.next仍有可能是null,所以這里只能從后向前找到最后一個(gè)有效節(jié)點(diǎn)。
小結(jié)
上面是ReentrantLock的加鎖流程,可以看到整個(gè)流程不算復(fù)雜,只是判斷和跳轉(zhuǎn)比較多,主要是Doug Lea將代碼和性能都優(yōu)化到了極致,代碼非常精簡,但細(xì)節(jié)卻非常多。另外通過上面的分析,我們也可以發(fā)現(xiàn),公平鎖和非公平鎖的區(qū)別就在于非公平鎖不管是否有線程在排隊(duì),先搶三次鎖,而公平鎖則會(huì)判斷是否存在隊(duì)列,有線程在排隊(duì)則直接進(jìn)入隊(duì)列排隊(duì);另外線程在park被喚醒后非公平鎖還會(huì)搶鎖,公平鎖仍然需要排隊(duì),所以非公平鎖的性能比公平鎖高很多,大部分情況下我們使用非公平鎖即可。
ReentrantReadWriteLock
ReentrantLock是一把獨(dú)占鎖,只支持重入,不支持共享,所以JUC包下還提供了讀寫鎖,這把鎖支持讀讀并發(fā),但讀寫、寫寫都是互斥的。
讀寫鎖也是基于AQS實(shí)現(xiàn)的,也包含了一個(gè)繼承自AQS的內(nèi)部類Sync,同樣也有公平和非公平兩種模式,下面主要討論非公平模式下的讀寫鎖實(shí)現(xiàn)。
讀寫鎖實(shí)現(xiàn)相對比較復(fù)雜,在ReentrantLock中就是使用的int型的state屬性來表示鎖被某個(gè)線程占有和重入次數(shù),而ReentrantReadWriteLock分為了讀和寫兩種鎖,要怎么用一個(gè)字段表示兩種鎖的狀態(tài)呢?Doug Lea大師將state字段分為了高二字節(jié)和低二字節(jié),即高16位用來表示讀鎖狀態(tài),低16位則用來表示寫鎖,如下圖:
因?yàn)樽x寫鎖狀態(tài)都只用了兩個(gè)字節(jié),所以可重入的次數(shù)最多是65535,當(dāng)然正常情況下重入是不可能達(dá)到這么多的。
那它是怎么實(shí)現(xiàn)的呢?還是先從構(gòu)造方法開始:
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
同樣默認(rèn)就是非公平鎖,同時(shí)還創(chuàng)建了readerLock和writerLock兩個(gè)對象,我們只需要像下面這樣就能獲取到讀寫鎖:
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static Lock r = lock.readLock();
private static Lock w = lock.writeLock();
寫鎖
由于寫鎖的加鎖過程相對更簡單,下面先從寫鎖加鎖開始分析,入口在ReentrantReadWriteLock#WriteLock.lock()方法,點(diǎn)進(jìn)去看,發(fā)現(xiàn)還是使用的AQS中的acquire方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
所以不同的地方也只有tryAcquire方法,我們重點(diǎn)分析這個(gè)方法就行:
static final int SHARED_SHIFT = 16;
// 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 低16位是1111....1111
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 得到c低16位的值
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 獲取寫鎖加鎖和重入的次數(shù)
int w = exclusiveCount(c);
if (c != 0) { // 已經(jīng)有線程持有鎖
// 這里有兩種情況:1\. c!=0 && w==0表示有線程獲取了讀鎖,不論是否是當(dāng)前線程,直接返回false,
// 也就是說讀-寫鎖是不支持升級重入的(但支持寫-讀降級),原因后文會(huì)詳細(xì)分析;
// 2\. c!=0 && w!=0 && current != getExclusiveOwnerThread()表示有其它線程持有了寫鎖,寫寫互斥
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 超出65535,拋異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 否則寫鎖的次數(shù)直接加1
setState(c + acquires);
return true;
}
// c==0才會(huì)走到這,但這時(shí)存在兩種情況,有隊(duì)列和無隊(duì)列,所以公平鎖和非公平鎖處理不同,
// 前者需要判斷是否存在隊(duì)列,有則嘗試加鎖失敗,無則加鎖成功,而非公平鎖直接使用CAS加鎖即可
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
寫鎖嘗試加鎖的過程就分析完了,其余的部分上文已經(jīng)講過,這里不再贅述。
讀鎖
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
讀鎖在加鎖開始就和其它鎖不同,調(diào)用的是acquireShared方法,意為獲取共享鎖。
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 右移16位得到讀鎖狀態(tài)的值
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 為什么讀寫互斥?因?yàn)樽x鎖一上來就判斷了是否有其它線程持有了寫鎖(當(dāng)前線程持有寫鎖再獲取讀鎖是可以的)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 公平鎖判斷是否存在隊(duì)列,非公平鎖判斷第一個(gè)節(jié)點(diǎn)是不是EXCLUSIVE模式,是的話會(huì)返回true
// 返回false則需要判斷讀鎖加鎖次數(shù)是否超過65535,沒有則使用CAS給讀鎖+1
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 第一個(gè)讀鎖線程就是當(dāng)前線程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 記錄讀鎖的重入
firstReaderHoldCount++;
} else {
// 獲取最后一次加讀鎖的重入次數(shù)記錄器HoldCounter
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
// 當(dāng)前線程第一次重入需要初始化,以及當(dāng)前線程和緩存的最后一次記錄器的線程id不同,需要從ThreadLocalHoldCounter拿到對應(yīng)的記錄器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 緩存到ThreadLocal
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
這段代碼有點(diǎn)復(fù)雜,首先需要保證讀寫互斥,然后進(jìn)行初次加鎖,若加鎖失敗就會(huì)調(diào)用fullTryAcquireShared方法進(jìn)行兜底處理。在初次加鎖中與寫鎖不同的是,寫鎖的state可以直接用來記錄寫鎖的重入次數(shù),因?yàn)閷憣懟コ猓x鎖是共享的,state用來記錄讀鎖的加鎖次數(shù)了,重入次數(shù)該怎么記錄呢?重入是指同一線程,那么是不是可以使用ThreadLocl來保存呢?沒錯(cuò),Doug Lea就是這么處理的,新增了一個(gè)HoldCounter類,這個(gè)類只有線程id和重入次數(shù)兩個(gè)字段,當(dāng)線程重入的時(shí)候就會(huì)初始化這個(gè)類并保存在ThreadLocalHoldCounter類中,這個(gè)類就是繼承ThreadLocl的,用來初始化HoldCounter對象并保存。
這里還有個(gè)小細(xì)節(jié),為什么要使用cachedHoldCounter緩存最后一次加讀鎖的HoldCounter?因?yàn)榇蟛糠智闆r下,重入和釋放鎖的線程很有可能就是最后一次加鎖的線程,所以這樣做能夠提高加解鎖的效率,Doug Lea真是把性能優(yōu)化到了極致。
上面只是初次加鎖,有可能會(huì)加鎖失敗,就會(huì)進(jìn)入到fullTryAcquireShared方法:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
這個(gè)方法中代碼和tryAcquireShared基本上一致,只是采用了自旋的方式,處理初次加鎖中的漏網(wǎng)之魚,讀者們可自行閱讀分析。
上面兩個(gè)方法若返回大于0則表示加鎖成功,小于0則會(huì)調(diào)用doAcquireShared方法,這個(gè)就和之前分析的acquireQueued差不多了:
private void doAcquireShared(int arg) {
// 先添加一個(gè)SHARED類型的節(jié)點(diǎn)到隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次嘗試加讀鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
// 設(shè)置head節(jié)點(diǎn)以及傳播喚醒后面的讀線程
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 只有前一個(gè)節(jié)點(diǎn)的waitStatus=-1時(shí)才會(huì)park,=0或者-3(先不考慮-2和1的情況)都會(huì)設(shè)置為-1后再次自旋嘗試加鎖,若還是加鎖失敗就會(huì)park
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
// 設(shè)置頭節(jié)點(diǎn)
Node h = head; // Record old head for check below
setHead(node);
// propagate是tryAcquireShared的返回值,當(dāng)前線程加鎖成功還要去喚醒后繼的共享節(jié)點(diǎn)
// (其余的判斷比較復(fù)雜,筆者也還未想明白,知道的讀者可以指點(diǎn)一下)
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 判斷后繼節(jié)點(diǎn)是否是共享節(jié)點(diǎn)
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
// 存在后繼節(jié)點(diǎn)
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 當(dāng)前一個(gè)節(jié)點(diǎn)加鎖成功后自然需要將-1改回0,并喚醒后繼線程,同時(shí)自旋將0改為-2讓喚醒傳播下去
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
// 設(shè)置頭節(jié)點(diǎn)的waitStatus=-2,使得喚醒可以傳播下去
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
這里的邏輯也非常的繞,當(dāng)多個(gè)線程同時(shí)調(diào)用addWaiter添加到隊(duì)列中后,并且假設(shè)這些節(jié)點(diǎn)的第一個(gè)節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)就是head節(jié)點(diǎn),那么第一個(gè)節(jié)點(diǎn)就能加鎖成功(假設(shè)都是SHARED節(jié)點(diǎn)),其余的節(jié)點(diǎn)在第一個(gè)節(jié)點(diǎn)設(shè)置頭節(jié)點(diǎn)之前都會(huì)進(jìn)入shouldParkAfterFailedAcquire方法,這時(shí)候waitStatus都等于0,所以繼續(xù)自旋不會(huì)park,若再次加鎖還失敗就會(huì)park(因?yàn)檫@時(shí)候waitStatus=-1),但都是讀線程的情況下一般都不會(huì)出現(xiàn),因?yàn)?strong>setHeadAndPropagate第一步就是修改head,所以其余SHARED節(jié)點(diǎn)最終都能加鎖成功并一直將喚醒傳播下去。
以上就是讀寫鎖加鎖過程,解鎖比較簡單,這里就不詳細(xì)分析了。
小結(jié)
讀寫鎖將state分為了高二字節(jié)和低二字節(jié),分別存儲(chǔ)讀鎖和寫鎖的狀態(tài),實(shí)現(xiàn)更為的復(fù)雜,在使用上還有幾點(diǎn)需要注意:
- 讀讀共享,但是在讀中間穿插了寫的話,后面的讀都會(huì)被阻塞,直到前面的寫釋放鎖后,后面的讀才會(huì)共享,相關(guān)原理看完前文不難理解。
- 讀寫鎖只支持降級重入,不支持升級重入。因?yàn)槿绻С稚壷厝氲脑?,是?huì)出現(xiàn)死鎖的。如下面這段代碼:
private static void rw() {
r.lock();
try {
log.info("獲取到讀鎖");
w.lock();
try {
log.info("獲取到寫鎖");
} finally {
w.unlock();
}
} finally {
r.unlock();
}
}
多個(gè)線程訪問都能獲取到讀鎖,但讀寫互斥,彼此都要等待對方的讀鎖釋放才能獲取到寫鎖,這就造成了死鎖。
ReentrantReadWriteLock在某些場景下性能上不算高,因此Doug Lea在JDK1.8的時(shí)候又提供了一把高性能的讀寫鎖StampedLock,前者讀寫鎖都是悲觀鎖,而后者提供了新的模式——樂觀鎖,但它不是基于AQS實(shí)現(xiàn)的,本文不進(jìn)行分析。
Condition
Lock接口中還有一個(gè)方法newCondition,這個(gè)方法就是創(chuàng)建一個(gè)條件隊(duì)列:
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
所謂條件隊(duì)列就是創(chuàng)建一個(gè)新的ConditionObject對象,這個(gè)對象的數(shù)據(jù)結(jié)構(gòu)在開篇就看過了,包含首、尾兩個(gè)節(jié)點(diǎn)字段,每當(dāng)調(diào)用Condition#await方法時(shí)就會(huì)在對應(yīng)的Condition對象中排隊(duì)等待:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入條件隊(duì)列
Node node = addConditionWaiter();
// 因?yàn)镃ondition.await必須配合Lock.lock使用,所以await時(shí)就是將已獲得鎖的線程全部釋放掉
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判斷是在同步隊(duì)列還是條件隊(duì)列,后者則直接park
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 獲取打斷處理方式(拋出異?;蛑卦O(shè)標(biāo)記)
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 調(diào)用aqs的方法
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
// 清除掉已經(jīng)進(jìn)入同步隊(duì)列的節(jié)點(diǎn)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// 清除狀態(tài)為取消的節(jié)點(diǎn)
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 創(chuàng)建一個(gè)CONDITION狀態(tài)的節(jié)點(diǎn)并添加到隊(duì)列末尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
await方法實(shí)現(xiàn)比較簡單,大部分代碼都是上文分析過的,這里不再重復(fù)。接著來看signal方法:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 從條件隊(duì)列第一個(gè)節(jié)點(diǎn)開始喚醒
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 修改waitStatus狀態(tài),如果修改失敗,則說明該節(jié)點(diǎn)已經(jīng)從條件隊(duì)列轉(zhuǎn)移到了同步隊(duì)列
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 上面修改成功,則將該節(jié)點(diǎn)添加到同步隊(duì)列末尾,并返回之前的尾結(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// unpark當(dāng)前線程,結(jié)合await方法看
LockSupport.unpark(node.thread);
return true;
}
signal的邏輯也比較簡單,就是喚醒條件隊(duì)列中的第一個(gè)節(jié)點(diǎn),主要是要結(jié)合await的代碼一起理解。
其它組件
上文分析的鎖都是用來實(shí)現(xiàn)并發(fā)安全控制的,而對于多線程協(xié)作JUC又基于AQS提供了CountDownLatch、CyclicBarrier、Semaphore等組件,下面一一分析。
CountDownLatch
CountDownLatch在創(chuàng)建的時(shí)候就需要指定一個(gè)計(jì)數(shù):
CountDownLatch countDownLatch = new CountDownLatch(5);
然后在需要等待的地方調(diào)用countDownLatch.await()方法,然后在其它線程完成任務(wù)后調(diào)用countDownLatch.countDown()方法,每調(diào)用一次該計(jì)數(shù)就會(huì)減一,直到計(jì)數(shù)為0時(shí),await的地方就會(huì)自動(dòng)喚醒,繼續(xù)后面的工作,所以CountDownLatch適用于一個(gè)線程等待多個(gè)線程的場景,那它是怎么實(shí)現(xiàn)的呢?讀者們可以結(jié)合上文自己先思考下。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync(int count) {
setState(count);
}
與前面講的鎖一樣,也有一個(gè)內(nèi)部類Sync繼承自AQS,并且在構(gòu)造時(shí)就將傳入的計(jì)數(shù)設(shè)置到了state屬性,看到這里不難猜到CountDownLatch的實(shí)現(xiàn)原理了。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
在await方法中使用的是可打斷的方式獲取的共享鎖,同樣除了tryAcquireShared方法,其余的都是復(fù)用的之前分析過的代碼,而tryAcquireShared就是判斷state是否等于0,不等于就阻塞。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
而調(diào)用countDown就更簡單了,每次對state遞減,直到為0時(shí)才會(huì)調(diào)用doReleaseShared釋放阻塞的線程。
最后需要注意的是CountDownLatch的計(jì)數(shù)是不支持重置的,每次使用都要新建一個(gè)。
CyclicBarrier
CyclicBarrier和CountDownLatch使用差不多,不過它只有await方法。CyclicBarrier在創(chuàng)建時(shí)同樣需要指定一個(gè)計(jì)數(shù),當(dāng)調(diào)用await的次數(shù)達(dá)到計(jì)數(shù)時(shí),所有線程就會(huì)同時(shí)喚醒,相當(dāng)于設(shè)置了一個(gè)“起跑線”,需要等所有運(yùn)動(dòng)員都到達(dá)這個(gè)“起跑線”后才能一起開跑。另外它還支持重置計(jì)數(shù),提供了reset方法。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
CyclicBarrier提供了兩個(gè)構(gòu)造方法,我們可以傳入一個(gè)Runnable類型的回調(diào)函數(shù),當(dāng)達(dá)到計(jì)數(shù)時(shí),由最后一個(gè)調(diào)用await的線程觸發(fā)執(zhí)行。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
// 是否打斷,打斷會(huì)喚醒所有條件隊(duì)列中的線程
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
// 計(jì)數(shù)為0時(shí),喚醒條件隊(duì)列中的所有線程
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
for (;;) {
try {
// 不帶超時(shí)時(shí)間直接進(jìn)入條件隊(duì)列等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
這里邏輯比較清晰,就是使用了ReentrantLock以及Condition來實(shí)現(xiàn)。在構(gòu)造方法中我們可以看到保存了兩個(gè)變量count和parties,每次調(diào)用await都會(huì)對count變量遞減,count不為0時(shí)都會(huì)進(jìn)入到trip條件隊(duì)列中等待,否則就會(huì)通過signalAll方法喚醒所有的線程,并將parties重新賦值給count。
reset方法很簡單,這里不詳細(xì)分析了。
Semaphore
Semaphore是信號(hào)的意思,或者說許可,可以用來控制最大并發(fā)量。初始定義好有幾個(gè)信號(hào),然后在需要獲取信號(hào)的地方調(diào)用acquire方法,執(zhí)行完成后,需要調(diào)用release方法回收信號(hào)。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
它也有兩個(gè)構(gòu)造方法,可以指定公平或是非公平,而permits就是state的值。
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 非公平方式
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 公平方式
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
acquire方法和CountDownLatch是一樣的,只是tryAcquireShared區(qū)分了公平和非公平方式。獲取到信號(hào)相當(dāng)于加共享鎖成功,否則則進(jìn)入隊(duì)列阻塞等待;而release方法和讀鎖解鎖方式也是一樣的,只是每次release都會(huì)將state+1。
最后
感謝大家看到這里,如果本文有什么不足之處,歡迎多多指教;如果你覺得對你有幫助,請給我點(diǎn)個(gè)贊。
也歡迎大家關(guān)注我的公眾號(hào):程序員麥冬,每天更新行業(yè)資訊!