加鎖邏輯將分成三個(gè)部分來看:
- 競(jìng)爭(zhēng)鎖
- 加入等待隊(duì)列
- 阻塞等待
1.競(jìng)爭(zhēng)鎖
我們先從公平鎖入手
public void lock() {
// sync的實(shí)例是new FairSync()
sync.acquire(1);
}
// 加鎖的代碼就是這幾行
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上述代碼可以拆分成以下幾段:
// 競(jìng)爭(zhēng)鎖
tryAcquire(arg)
// 加入等待隊(duì)列
addWaiter(Node.EXCLUSIVE)
// 阻塞等待
acquireQueued(node, arg)
- 競(jìng)爭(zhēng)鎖
protected final boolean tryAcquire(int acquires) {
// 獲取當(dāng)前線程
final Thread current = Thread.currentThread();
// 獲取當(dāng)前state狀態(tài)
int c = getState();
// 如果當(dāng)前state是沒有任何線程搶占的話
if (c == 0) {
// 如果等待隊(duì)列中有任何一個(gè)等待的節(jié)點(diǎn),都不會(huì)搶占鎖
if (!hasQueuedPredecessors() &&
// CAS搶占鎖成功
compareAndSetState(0, acquires)) {
// 搶占成功后,標(biāo)記當(dāng)前線程已經(jīng)搶占到鎖了。
setExclusiveOwnerThread(current);
// 返回加鎖成功
return true;
}
}
// 如果是同一個(gè)線程重復(fù)加鎖的情況下
else if (current == getExclusiveOwnerThread()) {
// 在這種情況下,只是簡(jiǎn)單地操作state
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 因?yàn)楫?dāng)前線程已經(jīng)加鎖成功了,再次加鎖的話,直接在state上增加加鎖次數(shù)即可。
setState(nextc);
// 返回加鎖成功
return true;
}
// 如果已經(jīng)有別的線程加鎖了,或者還有很多線程在排隊(duì)等待,那么返回false加鎖失敗。
return false;
}
上述代碼分幾部分:
-
如果當(dāng)前state=0,也就是沒有任何線程搶占鎖的情況下
1.1: 沒有等待隊(duì)列的情況下,可以CAS搶占鎖
1.2: 有等待隊(duì)列的話,該隊(duì)列中第一個(gè)等待節(jié)點(diǎn)不是當(dāng)前線程,不可以搶占鎖,因?yàn)檫@是公平鎖。
image.png
如果當(dāng)前等待隊(duì)列中還有任意節(jié)點(diǎn),并且當(dāng)前節(jié)點(diǎn)中的線程不是當(dāng)前線程,說明有其他線程處于等待過程中,那么當(dāng)前線程就應(yīng)該乖乖排隊(duì)去。

假如當(dāng)前線程是被剛剛喚醒的,并且它處于等待隊(duì)列中的第一個(gè)等待的位置,那么這個(gè)時(shí)候是可以去搶占鎖的。
- 如果已經(jīng)搶占了鎖的線程就是當(dāng)前線程。這種情況我們叫做重入。
示例如下:
ReentrantLock lock = new ReentrantLock();
try {
// 加鎖
lock.lock();
// 執(zhí)行業(yè)務(wù)邏輯
System.out.println("獲取的鎖");
try {
// 再次獲取鎖
lock.lock();
// 執(zhí)行業(yè)務(wù)邏輯
System.out.println("再次獲取的鎖");
} finally {
// 解鎖
lock.unlock();
}
} finally {
// 解鎖
lock.unlock();
}
小結(jié)一下:
- 如果當(dāng)前鎖未被搶占,并且沒有其他線程等待,那么直接搶占鎖
- 如果當(dāng)前鎖未被搶占,有其他線程等待,不可用搶占鎖
- 如果當(dāng)前鎖被當(dāng)前線程搶占了,那么直接重入即可
- 不符合上述情況,直接加鎖失敗。也就是鎖被其他線程搶占了,或者目前還有其他線程處于等待中,都會(huì)導(dǎo)致公平鎖加鎖失敗。
// 判斷等待隊(duì)列中是否有其他線程等待
public final boolean hasQueuedPredecessors() {
Node h, s;
// 如果等待隊(duì)列頭節(jié)點(diǎn)不為空,說明等待隊(duì)列已經(jīng)創(chuàng)建出來了。否則直接返回false。
if ((h = head) != null) {
// 如果頭節(jié)點(diǎn)后面的節(jié)點(diǎn)為空,或者該節(jié)點(diǎn)的狀態(tài)是取消狀態(tài)
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
// 從后往前遍歷,直至最后一個(gè)狀態(tài)小于等于0的節(jié)點(diǎn)。只有小于等于0的節(jié)點(diǎn)才是正常的可以競(jìng)爭(zhēng)鎖的節(jié)點(diǎn)。
for (Node p = tail; p != h && p != null; p = p.prev) {
// 發(fā)現(xiàn)小于等于0的節(jié)點(diǎn),就賦值給s
if (p.waitStatus <= 0)
s = p;
}
}
// 如果最終得到的節(jié)點(diǎn)不為空。有可能當(dāng)前沒有任何等待的節(jié)點(diǎn),s=null。
// 并且這個(gè)不為空的等待線程不是當(dāng)前線程。其實(shí)就是說明前面還有其他線程排隊(duì)。
if (s != null && s.thread != Thread.currentThread())
// 返回true,說明有其他線程在排隊(duì)。
return true;
}
// 1.如果等待隊(duì)列不存在,直接返回false
// 2.如果當(dāng)前等待隊(duì)列中,沒有任何其他節(jié)點(diǎn)的waitStatus<=0
return false;
}
至此,線程競(jìng)爭(zhēng)鎖的邏輯就完畢了。
- 加入等待隊(duì)列
private Node addWaiter(Node mode) {
// 創(chuàng)建一個(gè)節(jié)點(diǎn),該節(jié)點(diǎn)默認(rèn)
// waitStatus=0, thread=currentThread
Node node = new Node(mode);
// 開啟自旋
for (;;) {
// 取出尾節(jié)點(diǎn)
Node oldTail = tail;
// 如果尾節(jié)點(diǎn)不為空
if (oldTail != null) {
// 設(shè)置node的前一個(gè)節(jié)點(diǎn)為尾節(jié)點(diǎn)
node.setPrevRelaxed(oldTail);
// CAS把尾節(jié)點(diǎn)設(shè)置為node
if (compareAndSetTail(oldTail, node)){
// 如果CAS設(shè)置成功,那么就把oldTail的next引用設(shè)置成node
oldTail.next = node;
// 返回node節(jié)點(diǎn)
return node;
}
} else {
// 如果尾節(jié)點(diǎn)為null,說明等待隊(duì)列還不存在,這個(gè)時(shí)候就要準(zhǔn)備初始化等待隊(duì)列。
// 初始化完畢后繼續(xù)自旋,最終把新創(chuàng)建的節(jié)點(diǎn)添加進(jìn)等待隊(duì)列
initializeSyncQueue();
}
}
}
// 初始化等待隊(duì)列。其實(shí)是一個(gè)雙向鏈表,所以只要初始化head、tail節(jié)點(diǎn)即可。
private final void initializeSyncQueue() {
Node h;
// CAS設(shè)置head節(jié)點(diǎn)。如果head節(jié)點(diǎn)為null,就設(shè)置為new Node()。該node節(jié)點(diǎn)waitStatus=0,thread=null。
if (HEAD.compareAndSet(this, null, (h = new Node())))
// 頭節(jié)點(diǎn)設(shè)置成功后,尾節(jié)點(diǎn)初始化為同一個(gè)節(jié)點(diǎn)。
tail = h;
}
- 初始化等待隊(duì)列
// 初始化等待隊(duì)列。其實(shí)是一個(gè)雙向鏈表,所以只要初始化head、tail節(jié)點(diǎn)即可。
private final void initializeSyncQueue() {
Node h;
// CAS設(shè)置head節(jié)點(diǎn)。如果head節(jié)點(diǎn)為null,就設(shè)置為new Node()。該node節(jié)點(diǎn)waitStatus=0,thread=null。
if (HEAD.compareAndSet(this, null, (h = new Node())))
// 頭節(jié)點(diǎn)設(shè)置成功后,尾節(jié)點(diǎn)初始化為同一個(gè)節(jié)點(diǎn)。
tail = h;
}

- 添加新的節(jié)點(diǎn)
// 創(chuàng)建新節(jié)點(diǎn)
Node node = new Node(mode);
// 取出尾節(jié)點(diǎn)
Node oldTail = tail;

// 設(shè)置node的前一個(gè)節(jié)點(diǎn)為尾節(jié)點(diǎn)
node.setPrevRelaxed(oldTail);
// CAS把尾節(jié)點(diǎn)設(shè)置為node
if (compareAndSetTail(oldTail, node)){
// 如果CAS設(shè)置成功,那么就把oldTail的next引用設(shè)置成node
oldTail.next = node;

經(jīng)過上面幾步,新的節(jié)點(diǎn)就被添加到等待隊(duì)列中了。
有一個(gè)注意點(diǎn)需要提的是:
為什么判斷等待隊(duì)列是否存在,使用的是if(tail!=null),而不是if(head!=null)?
這個(gè)問題其實(shí)跟初始化等待隊(duì)列有關(guān)系,初始化的時(shí)候是使用CAS設(shè)置head節(jié)點(diǎn),成功后再設(shè)置tail節(jié)點(diǎn)。也就是說,隊(duì)列初始化完畢的標(biāo)識(shí)是tail!=null。
如果使用if(head!=null)來判斷隊(duì)列已經(jīng)存在,那么有可能此時(shí)tail還沒有初始化完畢。就會(huì)導(dǎo)致使用tail節(jié)點(diǎn)的時(shí)候空指針異常。
- 阻塞等待
final boolean acquireQueued(final Node node, int arg) {
// 默認(rèn)線程未被打斷
boolean interrupted = false;
try {
// 開啟自旋
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前一個(gè)節(jié)點(diǎn)是head節(jié)點(diǎn),那么就嘗試競(jìng)爭(zhēng)鎖
if (p == head && tryAcquire(arg)) {
// 競(jìng)爭(zhēng)鎖成功,把當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn)
setHead(node);
// 把前一個(gè)節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)斷開
// 因?yàn)楫?dāng)前節(jié)點(diǎn)已經(jīng)設(shè)置為head節(jié)點(diǎn)了,之前的head就可以GC了
p.next = null; // help GC
// 返回是否當(dāng)前線程被打斷。
// 這個(gè)返回結(jié)果的作用會(huì)被用在lockInterruptibly()這個(gè)方法上。
// lock()方法可忽略。
return interrupted;
}
// 判斷當(dāng)前節(jié)點(diǎn)是否應(yīng)該阻塞。
if (shouldParkAfterFailedAcquire(p, node))
// 下面這個(gè)代碼可以翻譯成:
// if(parkAndCheckInterrupt()){
// interrupted = true;
// }
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
// 拋出任何異常,都直接取消當(dāng)前節(jié)點(diǎn)正在競(jìng)爭(zhēng)鎖的操作
// 如果在等待隊(duì)列中,就從等待隊(duì)列中移除。
// 如果當(dāng)前線程已經(jīng)搶占到鎖了,那么就解鎖。
cancelAcquire(node);
// 如果當(dāng)前線程已經(jīng)被中斷
if (interrupted)
// 重新設(shè)置中斷信號(hào)
selfInterrupt();
// 拋出當(dāng)前異常
throw t;
}
}
- 獲取當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)
// 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
final Node predecessor() {
// 上一個(gè)節(jié)點(diǎn)
Node p = prev;
// 如果為null,直接拋異常
if (p == null)
throw new NullPointerException();
else
// 返回上一個(gè)節(jié)點(diǎn)
return p;
}
- 如果上一個(gè)節(jié)點(diǎn)為head節(jié)點(diǎn)
// 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前一個(gè)節(jié)點(diǎn)是head節(jié)點(diǎn),那么就嘗試競(jìng)爭(zhēng)鎖
if (p == head && tryAcquire(arg))

- 搶占成功鎖后
// 競(jìng)爭(zhēng)鎖成功,把當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn)
setHead(node);
// 把前一個(gè)節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)斷開
p.next = null;

- 判斷當(dāng)前節(jié)點(diǎn)的狀態(tài)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 獲取前一個(gè)節(jié)點(diǎn)的狀態(tài)
int ws = pred.waitStatus;
// 如果狀態(tài)等于-1。Node.SIGNAL的值就是-1
if (ws == Node.SIGNAL)
// 直接返回true,這個(gè)時(shí)候就要準(zhǔn)備阻塞。
return true;
// 如果狀態(tài)值大于0,說明是要取消的節(jié)點(diǎn)。
if (ws > 0) {
// 跳過“取消”狀態(tài)節(jié)點(diǎn)
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// ws小于等于0的話,直接把前一個(gè)節(jié)點(diǎn)的狀態(tài)置為-1
// 因?yàn)樾聞?chuàng)建的節(jié)點(diǎn)初始化狀態(tài)是0,
// 那么意味著執(zhí)行到這里后,還要返回去重新自旋一次才能返回true。
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
// 返回false
return false;
}
- 當(dāng)前線程阻塞
private final boolean parkAndCheckInterrupt() {
// 阻塞當(dāng)前線程。
// 1. 調(diào)用LockSupport.unpark()才能重新喚醒被阻塞的線程。
// 2.調(diào)用thread.interrupt()也可以喚醒阻塞線程。
LockSupport.park(this);
// 判斷當(dāng)前線程是否被打斷。
// 如果當(dāng)前線程是被打斷的,那么返回true,否則返回false。
return Thread.interrupted();
}
小結(jié)一下:
- 先獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn),如果是head節(jié)點(diǎn),那么嘗試競(jìng)爭(zhēng)鎖
- 競(jìng)爭(zhēng)鎖成功后,重置head節(jié)點(diǎn),返回false(代表沒有被打斷)。
- 如果前一個(gè)節(jié)點(diǎn)狀態(tài)小于等于0,那么置為-1。
- 重新自旋一次,從第一步開始
- 如果前一個(gè)節(jié)點(diǎn)狀態(tài)等于-1,返回true,準(zhǔn)備阻塞。
- 調(diào)用LockSupport.park()阻塞當(dāng)前線程,直至unpark()或者interrupt()喚醒當(dāng)前線程。
- 通過unpark()喚醒,沒有被打斷,返回false
- 通過interrupt()喚醒,被打斷,返回true。
- 被喚醒的線程又開始自旋,直至獲取到鎖后返回是否被打斷的結(jié)果。
- 如果是被打斷后獲取鎖返回,那么返回true。
- 否則返回false。
public final void acquire(int arg) {
// 嘗試獲取鎖
if (!tryAcquire(arg) &&
// addWaiter(Node.EXCLUSIVE):競(jìng)爭(zhēng)鎖失敗后,添加到等待隊(duì)列
// acquireQueued(node, arg):阻塞等待,自旋獲取鎖后,返回判斷是否被打斷
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果被打斷,需要恢復(fù)中斷信號(hào)
selfInterrupt();
}
// 其實(shí)就是重新中斷一次。
// 因?yàn)閳?zhí)行過Thread.interrupted()方法后,會(huì)讓中斷信號(hào)重置為false。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
以上就是我對(duì)于公平鎖-加鎖實(shí)現(xiàn)的淺析。
