ReentrantLock源碼分析及AQS原理
ReentrantLock源碼分析
ReentrantLock(可重入互斥鎖)??芍厝腈i又名遞歸鎖,是指在同一個(gè)線程在外層方法獲取鎖的時(shí)候,再進(jìn)入該線程的內(nèi)層方法會(huì)自動(dòng)獲取鎖(前提鎖對(duì)象得是同一個(gè)對(duì)象或者class),不會(huì)因?yàn)橹耙呀?jīng)獲取過還沒釋放而阻塞。
我們從構(gòu)造函數(shù)開始逐步分析。
ReentrantLock的兩個(gè)構(gòu)造函數(shù),默認(rèn)使用的是非公平sync對(duì)象
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
看下繼承自Sync的非公平同步類NonfairSync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 非公平鎖:線程加鎖時(shí)直接嘗試獲取鎖,獲取不到才會(huì)到等待隊(duì)列的隊(duì)尾排隊(duì)等待
*/
final void lock() {
if (compareAndSetState(0, 1))//CAS
setExclusiveOwnerThread(Thread.currentThread());//設(shè)置所有者線程為當(dāng)前線程
else
acquire(1);//失敗后嘗試獲取鎖。
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
compareAndSetState(0, 1)。0是期望值,1是更新值。只要期待值與當(dāng)前同步狀態(tài)state相等,則把state更新為update(更新值)。
這里state就是當(dāng)前線程獲得鎖的狀態(tài)。先看下state的說明。
private volatile int state;//state是Volatile修飾的,用于保證一定的可見性和有序性。
- State初始化的時(shí)候?yàn)?,表示沒有任何線程持有鎖。
- 當(dāng)有線程持有該鎖時(shí),值就會(huì)在原來的基礎(chǔ)上+1,同一個(gè)線程多次獲得鎖時(shí),就會(huì)多次+1,這里就是可重入的概念。
- 解鎖也是對(duì)這個(gè)字段-1,一直到0,此線程對(duì)鎖釋放。
所以此處的compareAndSetState(0, 1)就是為了判斷當(dāng)前線程state是否為0,是0則獲取鎖成功,并更新state為1.
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
如果獲取鎖失敗呢?也就是當(dāng)前線程的state不為0,那就會(huì)執(zhí)行acquire(1);
//arg代表獲取鎖的次數(shù)
public final void acquire(int arg) {
//tryAcquire(arg)為true,代表獲取鎖成功(當(dāng)前線程為獨(dú)占線程)。
//獲取鎖失敗后,再執(zhí)行acquireQueued將線程排隊(duì)。
if (!tryAcquire(arg) &&
//addWaiter方法其實(shí)就是把對(duì)應(yīng)的線程以Node的數(shù)據(jù)結(jié)構(gòu)形式加入到雙端隊(duì)列里,返回的是一個(gè)包含該線程的Node。而這個(gè)Node會(huì)作為參數(shù),進(jìn)入到acquireQueued方法中。acquireQueued方法可以對(duì)排隊(duì)中的線程進(jìn)行“獲鎖”操作。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中斷當(dāng)前線程
selfInterrupt();
}
這幾個(gè)方法都是由ReentrantLock中的(非)公平同步類調(diào)用,而(Non)fairSync繼承自Sync類,Sync類繼承自AQS,所以這些方法都在AQS中定義。例如上面的tryAcquire方法就被ReentrantLock重寫了,直接調(diào)用AQS的tryAcquire會(huì)拋出UnsupportedOperationException異常。
可以看到ReentrantLock中的公平同步類和非公平同步類都重寫了tryAcquire。
所以acquire調(diào)用的就是NonfairSync中已經(jīng)重寫了的tryAcquire
//NonfairSync類里最后一個(gè)方法,當(dāng)前acquires=1
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//當(dāng)前線程
final Thread current = Thread.currentThread();
//當(dāng)前state值
int c = getState();
//如果沒有線程獲取鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//獲取鎖成功,并將當(dāng)前線程設(shè)置為獨(dú)占線程
setExclusiveOwnerThread(current);
return true;
}
}
//如果當(dāng)前線程為獨(dú)占線程,則將state加1。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
查看addWaiter方法前,要先來看下Node節(jié)點(diǎn),也就是未獲取到鎖的線程加入到了哪種隊(duì)列中去。
這就涉及到AQS核心思想:如果被請(qǐng)求的共享資源空閑,那么就將當(dāng)前請(qǐng)求資源的線程設(shè)置為有效的工作線程,將共享資源設(shè)置為鎖定狀態(tài);如果共享資源被占用,就需要一定的阻塞等待喚醒機(jī)制來保證鎖分配。這個(gè)機(jī)制主要用的是CLH隊(duì)列的變體---虛擬雙向隊(duì)列(FIFO)---實(shí)現(xiàn)的,將暫時(shí)獲取不到鎖的線程加入到隊(duì)列中。
AQS使用一個(gè)Volatile的int類型的state來表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取的排隊(duì)工作,通過CAS完成對(duì)State值的修改。
AQS中基本的數(shù)據(jù)結(jié)構(gòu)——Node,虛擬雙向隊(duì)列(FIFO)中的節(jié)點(diǎn)。
static final class Node {
/** 標(biāo)志線程以共享/獨(dú)占方式等待鎖 */
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
/** 表示線程獲取鎖的請(qǐng)求已經(jīng)取消 */
static final int CANCELLED = 1;
/** 表示線程已經(jīng)準(zhǔn)備好,就等資源釋放了(被喚醒了) */
static final int SIGNAL = -1;
/** 表示節(jié)點(diǎn)在等待隊(duì)列中,節(jié)點(diǎn)線程等待喚醒 */
static final int CONDITION = -2;
/** 當(dāng)前線程處在SHARED情況下,該字段才會(huì)使用(其他操作介入,也要確保傳播繼續(xù)) */
static final int PROPAGATE = -3;
//當(dāng)前節(jié)點(diǎn)在隊(duì)列中的狀態(tài)
volatile int waitStatus;
//前驅(qū)指針
volatile Node prev;
//后繼指針
volatile Node next;
//表示處于該節(jié)點(diǎn)的線程
volatile Thread thread;
//指向下一個(gè)處于CONDITION狀態(tài)的節(jié)點(diǎn)
Node nextWaiter;
/** 返回前驅(qū)節(jié)點(diǎn),沒有則拋出空指針異常 */
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
/** 幾個(gè)Node構(gòu)造函數(shù) */
//用來初始化頭節(jié)點(diǎn)或SHARED標(biāo)志
Node() {
}
//用于addWaiter方法
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
//用于Condition
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
//隊(duì)列頭節(jié)點(diǎn)
private transient volatile Node head;
//隊(duì)列尾節(jié)點(diǎn)
private transient volatile Node tail;
現(xiàn)在再來看addWaiter方法是如何把線程加入到雙端隊(duì)列的。
//這里的mode就是Node.EXCLUSIVE,表示獨(dú)占模式
private Node addWaiter(Node mode) {
//通過當(dāng)前的線程和鎖模式新建一個(gè)節(jié)點(diǎn)。
Node node = new Node(Thread.currentThread(), mode);
//pred指向尾節(jié)點(diǎn)tail
Node pred = tail;
//如果尾節(jié)點(diǎn)不為空,則將當(dāng)前節(jié)點(diǎn)插入到尾節(jié)點(diǎn)后面(設(shè)為尾節(jié)點(diǎn)),并返回node
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果Pred指針是Null(說明等待隊(duì)列中沒有元素),或者當(dāng)前Pred指針和Tail指向的位置不同(說明已經(jīng)被別的線程修改),則進(jìn)入enq初始化head節(jié)點(diǎn),并將node設(shè)為尾節(jié)點(diǎn)。
enq(node);
return node;
}
注意雙向鏈表中,第一個(gè)節(jié)點(diǎn)(頭節(jié)點(diǎn))為虛節(jié)點(diǎn),其實(shí)并不存儲(chǔ)任何信息,只是占位。真正的第一個(gè)有數(shù)據(jù)的節(jié)點(diǎn),是在第二個(gè)節(jié)點(diǎn)開始的。
回到上面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,acquireQueued會(huì)把放入隊(duì)列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲?。ㄖ袛啵?。
那么隊(duì)列中的線程什么時(shí)候可以獲取到鎖?一直獲取不到又會(huì)怎樣呢?
final boolean acquireQueued(final Node node, int arg) {
// 標(biāo)記是否成功拿到資源
boolean failed = true;
try {
// 標(biāo)記等待過程中是否中斷過
boolean interrupted = false;
// 開始自旋,要么獲取鎖,要么中斷
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果p是頭結(jié)點(diǎn),說明當(dāng)前節(jié)點(diǎn)在真實(shí)數(shù)據(jù)隊(duì)列的首部,就嘗試獲取鎖(別忘了頭結(jié)點(diǎn)是虛節(jié)點(diǎn))
if (p == head && tryAcquire(arg)) {
// 獲取鎖成功,頭指針移動(dòng)到當(dāng)前node
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 說明p為頭節(jié)點(diǎn)且當(dāng)前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結(jié)點(diǎn),這個(gè)時(shí)候就要判斷當(dāng)前node是否要被阻塞(被阻塞條件:前驅(qū)節(jié)點(diǎn)的waitStatus為-1),防止無限循環(huán)浪費(fèi)資源。
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)//未成功拿到資源,設(shè)置當(dāng)前節(jié)點(diǎn)狀態(tài)為CANCELLED
cancelAcquire(node);
}
}
// 靠前驅(qū)節(jié)點(diǎn)判斷當(dāng)前線程是否應(yīng)該被阻塞,true阻塞,false不阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前驅(qū)結(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)
int ws = pred.waitStatus;
// 說明前驅(qū)結(jié)點(diǎn)處于喚醒狀態(tài),此時(shí)需要阻塞當(dāng)前節(jié)點(diǎn),返回true
if (ws == Node.SIGNAL)
return true;
// 通過枚舉值我們知道waitStatus>0是取消狀態(tài)
if (ws > 0) {
do {
// 循環(huán)向前查找取消節(jié)點(diǎn),把取消節(jié)點(diǎn)從隊(duì)列中剔除
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//直到前面某一個(gè)節(jié)點(diǎn)非取消節(jié)點(diǎn),將非取消節(jié)點(diǎn)連接當(dāng)前節(jié)點(diǎn)
pred.next = node;
} else {//前驅(qū)節(jié)點(diǎn)waitStatus為0或-3。
// 設(shè)置前驅(qū)節(jié)點(diǎn)等待狀態(tài)為SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//parkAndCheckInterrupt主要用于掛起當(dāng)前線程,阻塞調(diào)用棧,返回當(dāng)前線程的中斷狀態(tài)。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
前面的兩個(gè)問題得到解決:只要前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),就嘗試獲取鎖。前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),或獲取鎖失敗,則判斷是否需要阻塞當(dāng)前線程。如果前驅(qū)節(jié)點(diǎn)狀態(tài)是SIGNAL,表明前驅(qū)節(jié)點(diǎn)準(zhǔn)備獲取資源,所以當(dāng)前節(jié)點(diǎn)阻塞;如果前驅(qū)節(jié)點(diǎn)是取消狀態(tài)CANCELLED(不再獲取資源),移除所有取消狀態(tài)的前驅(qū)節(jié)點(diǎn),繼續(xù)自旋嘗試獲取鎖;如果前驅(qū)節(jié)點(diǎn)waitStatus為0(默認(rèn)值)或-3,則更改前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL,繼續(xù)自旋。
參考鏈接 : 美團(tuán)技術(shù)團(tuán)隊(duì):從ReentrantLock的實(shí)現(xiàn)看AQS的原理及應(yīng)用