ReentrantLock源碼分析
相關(guān)知識(shí)
1.ReentrantLock概念
ReentrantLock是實(shí)現(xiàn)了Lock接口的類,是一個(gè)可重入鎖,用來(lái)解決多線程應(yīng)用中的資源的同步性問(wèn)題。它是JUC(java.util.concurrent)中最核心的類。
2.相關(guān)類圖

簡(jiǎn)單使用
public class ReentrantLockDemo {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
lock.lock();
try {
//do somethings
} finally {
lock.unlock();
}
}
}
分析: 使用lock.lock()方法,會(huì)獲取當(dāng)前鎖的對(duì)象,lock.unlock()會(huì)釋放鎖對(duì)象。
底層源碼分析
ReentrantLock實(shí)現(xiàn)互斥鎖主要是依賴內(nèi)部的AbstractQueueSynchronized工具類來(lái)完成線程同步的功能。
具體源碼
爭(zhēng)搶鎖的過(guò)程
以NonFairSync非公平鎖為例。
final void lock() {
//這里首先根據(jù)CAS設(shè)置當(dāng)前的state,期望是0,設(shè)置為1
//state保存的是當(dāng)前鎖的狀態(tài)0表示沒(méi)有線程獲取鎖,1表示有線程獲
//取當(dāng)前鎖,大于1表示重入
if(compareAndSetState(0,1))
//如果設(shè)置成功,則把當(dāng)前線程設(shè)置為獨(dú)占鎖
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
public final void acquire(int arg){
if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這里的代碼表示如果tryAcquire方法嘗試獲取鎖失敗,那么就調(diào)用acquireQueued方法將當(dāng)前線程加入到同步隊(duì)列中。其中tryAcquire方法如下。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//這里表示當(dāng)前state=0,沒(méi)有線程獲取鎖,當(dāng)前線程可以直接獲取鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//這里表示如果獨(dú)占鎖與當(dāng)前線程一樣,則state會(huì)+1,即重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再看addWaiter方法之前,先要介紹一下,AQS中維護(hù)的雙向鏈表隊(duì)列每一個(gè)元素都是一個(gè)Node節(jié)點(diǎn),節(jié)點(diǎn)中保存了當(dāng)前線程thread下一個(gè)節(jié)點(diǎn)next,前一個(gè)節(jié)點(diǎn)prev以及當(dāng)前的等待狀態(tài)waitStatus,waitStatus有以下幾種狀態(tài)
- CANCELLED = 1 表示取消
- SIGNAL = -1 標(biāo)記當(dāng)前的節(jié)點(diǎn)保存的線程需要
unpark - CONDITION = -2 表示當(dāng)前節(jié)點(diǎn)所在線程在
Condition隊(duì)列中 - PROPAGATE = -3共享模式下,該狀態(tài)的線程處于可運(yùn)行狀態(tài)
addWaiter(Node.EXCLUSIVE)的具體實(shí)現(xiàn)
//當(dāng)前方法會(huì)創(chuàng)建一個(gè)節(jié)點(diǎn)并加入到AQS隊(duì)列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//tail指向同步隊(duì)列的最后一個(gè)node,這里第一次為空,會(huì)走enq方法,當(dāng)有其他線程再次競(jìng)爭(zhēng)鎖的時(shí)候,會(huì)通過(guò)下面的代碼添加到隊(duì)列中,并且更新tail為新的node
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
//死循環(huán),會(huì)遍歷2次,一次初始化,一次設(shè)置第一個(gè)node的next,prev屬性
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//第一次t為空,將head設(shè)置為一個(gè)新的node,不包含thread的node
if (compareAndSetHead(new Node()))
//把隊(duì)列的tail也設(shè)置成node
tail = head;
} else {
//此時(shí)隊(duì)列為空,將新加入的node的prev設(shè)置為tail
node.prev = t;
//通過(guò)CAS設(shè)置隊(duì)列的tail為當(dāng)前node
if (compareAndSetTail(t, node)) {
//設(shè)置tail的下一個(gè)節(jié)點(diǎn)也為node
t.next = node;
return t;
}
}
}
}
addWaiter方法執(zhí)行完成后,再調(diào)用acquireQueued方法去競(jìng)爭(zhēng)鎖
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//死循環(huán),只有當(dāng)當(dāng)前線程的prev是head,也就是自己是隊(duì)列中的第一個(gè)node,且爭(zhēng)搶到鎖之后才會(huì)退出
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//shouldParkAfterFailedAcquire
//判斷是否要在獲取鎖失敗之后掛起鎖,主要是通過(guò)
//CAS將當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)的wait設(shè)置為SIGNAL
//parkAndCheckInterrupt方法調(diào)用
//LockSupport.park()方法將當(dāng)前線程掛起,
//并且返回當(dāng)前線程是否被中斷的標(biāo)記,如果被中斷
//了,則會(huì)在上面的acquire方法中調(diào)用
//selfInterrupt()方法,重新將線程設(shè)置為可中
//斷的狀態(tài),這里主要是因?yàn)楸粧炱鸬木€程不能響應(yīng)中
//斷,所以要記錄下來(lái),并且在被喚醒的時(shí)候重新設(shè)
//置
if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面來(lái)看一下釋放鎖的過(guò)程
lock.unlock方法內(nèi)部調(diào)ReentrantLock中AQS的release()方法
public final boolean release(int arg) {
//如果當(dāng)前線程只重入一次,tryRelease會(huì)返回true
if (tryRelease(arg)) {
Node h = head;
//如果head不為空,且head的waitStatus=SIGNAL(在前面
//shouldParkAfterFailedAcquire方法中設(shè)置了)
//unparkSuccessor解鎖繼任者(head指向的node)
//內(nèi)部主要調(diào)用LockSupport.unpark(s.thread)方法
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//將當(dāng)前的獨(dú)占鎖置為空
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor()方法把當(dāng)前處在隊(duì)列的第一個(gè)節(jié)點(diǎn)所包含的thread喚醒,那么這個(gè)線程會(huì)繼續(xù)執(zhí)行acquireQueued中的死循環(huán),通過(guò)CAS獲取鎖。
到此整個(gè)流程就結(jié)束了。