以下分析均基于jdk1.8
AQS 是一個(gè)用于實(shí)現(xiàn)阻塞鎖和相關(guān)同步器的框架,它提供了一些基本的原子操作(如 CAS,自旋等待)以及一個(gè)等待隊(duì)列來協(xié)調(diào)多個(gè)線程之間的互斥和共享訪問。ReentrantLock 實(shí)現(xiàn)了 AQS 的 tryAcquire 和 release 方法來獲取和釋放鎖,以及 Condition 來支持鎖的條件等待。
ReentrantLock 在基于 AQS 實(shí)現(xiàn)的同時(shí),也可以通過重入鎖的方式實(shí)現(xiàn)線程的可重入性,使得同一個(gè)線程可以多次獲取同一個(gè)鎖而不會死鎖。此外,ReentrantLock 還提供了公平鎖和非公平鎖兩種模式,以適應(yīng)不同的應(yīng)用場景。
AQS
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這段代碼是 AQS(AbstractQueuedSynchronizer)類中的 acquire(int arg) 方法實(shí)現(xiàn)。該方法用于獲取獨(dú)占式鎖(exclusive lock)。以下是方法的含義和執(zhí)行流程:
- tryAcquire(arg) 嘗試直接獲取獨(dú)占鎖,如果成功返回 true,否則執(zhí)行第二步。
- addWaiter(Node.EXCLUSIVE) 創(chuàng)建一個(gè)獨(dú)占節(jié)點(diǎn) Node.EXCLUSIVE,將其加入到等待隊(duì)列中,并返回該節(jié)點(diǎn)。
- acquireQueued(node, arg) 將節(jié)點(diǎn) node 加入到等待隊(duì)列中,并阻塞線程,直到獲取獨(dú)占鎖成功為止。
- selfInterrupt() 如果線程在等待過程中被中斷,調(diào)用 selfInterrupt() 方法將線程的中斷狀態(tài)重新設(shè)置。
因此,該方法的作用是獲取獨(dú)占式鎖,并在獲取不到鎖時(shí)將線程加入到等待隊(duì)列中,直到鎖被釋放或者線程被中斷才返回。
acquireQueued
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 循環(huán)執(zhí)行步驟 1 到步驟 4,直到成功獲取到鎖或者線程被中斷。
for (;;) {
// 1. 返回等待隊(duì)列中當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)。
final Node p = node.predecessor();
// 2. 如果前一個(gè)節(jié)點(diǎn)是 head(即等待隊(duì)列的首節(jié)點(diǎn))并且能夠通過 tryAcquire(arg) 方法獲取到鎖,就將當(dāng)前節(jié)點(diǎn)設(shè)為新的 head,將前一個(gè)節(jié)點(diǎn)的 next 引用設(shè)置為 null(以便垃圾回收),然后返回中斷狀態(tài)。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 3. 如果無法獲取到鎖,調(diào)用 shouldParkAfterFailedAcquire(p, node) 判斷當(dāng)前線程是否應(yīng)該掛起等待。
// 如果當(dāng)前線程應(yīng)該掛起等待,則調(diào)用 parkAndCheckInterrupt() 方法將線程掛起并檢查中斷狀態(tài),如果線程在等待期間被中斷,將中斷狀態(tài)設(shè)置為 true。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果等待過程中獲取鎖失敗,調(diào)用 cancelAcquire(node) 方法取消該節(jié)點(diǎn)的等待狀態(tài),并從等待隊(duì)列中移除。
if (failed)
cancelAcquire(node);
}
}
這段代碼是 AQS(AbstractQueuedSynchronizer)類中的 acquireQueued(Node node, int arg) 方法實(shí)現(xiàn)。該方法用于將節(jié)點(diǎn)加入到等待隊(duì)列中,并且在隊(duì)列中自旋等待直到獲取鎖為止。以下是該方法的主要步驟:
- node.predecessor() 返回等待隊(duì)列中當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)。
- 如果前一個(gè)節(jié)點(diǎn)是 head(即等待隊(duì)列的首節(jié)點(diǎn))并且能夠通過 tryAcquire(arg) 方法獲取到鎖,就將當(dāng)前節(jié)點(diǎn)設(shè)為新的 head,將前一個(gè)節(jié)點(diǎn)的 next 引用設(shè)置為 null(以便垃圾回收),然后返回中斷狀態(tài)。
- 如果無法獲取到鎖,調(diào)用 shouldParkAfterFailedAcquire(p, node) 判斷當(dāng)前線程是否應(yīng)該掛起等待。
- 如果當(dāng)前線程應(yīng)該掛起等待,則調(diào)用 parkAndCheckInterrupt() 方法將線程掛起并檢查中斷狀態(tài),如果線程在等待期間被中斷,將中斷狀態(tài)設(shè)置為 true。
- 循環(huán)執(zhí)行步驟 1 到步驟 4,直到成功獲取到鎖或者線程被中斷。
- 如果等待過程中獲取鎖失敗,調(diào)用 cancelAcquire(node) 方法取消該節(jié)點(diǎn)的等待狀態(tài),并從等待隊(duì)列中移除。
因此,acquireQueued(Node node, int arg) 方法實(shí)現(xiàn)了等待隊(duì)列中的節(jié)點(diǎn)自旋等待獲取鎖,直到獲取到鎖或者線程被中斷。在等待期間,會根據(jù)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)和鎖狀態(tài)來判斷是否需要掛起等待,并且通過循環(huán)不斷重試獲取鎖的操作。
公平鎖
在 JDK 1.8 中,ReentrantLock 在公平模式下鎖的搶占規(guī)則如下:
- 當(dāng)線程嘗試獲取鎖時(shí),如果鎖當(dāng)前沒有被其他線程持有,那么該線程將立即獲得鎖并成為鎖的持有者。
- 當(dāng)線程嘗試獲取鎖時(shí),如果鎖當(dāng)前被其他線程持有,那么該線程將進(jìn)入等待隊(duì)列,以 FIFO(先進(jìn)先出)的順序排隊(duì)等待獲取鎖。
- 當(dāng)鎖的持有者釋放鎖時(shí),如果等待隊(duì)列中存在線程,則會選擇隊(duì)列中的第一個(gè)線程作為下一個(gè)持有者,并將其從等待隊(duì)列中移除,然后喚醒該線程,使其嘗試獲取鎖。
- 如果有多個(gè)線程在等待隊(duì)列中等待獲取鎖,那么它們將按照先進(jìn)先出的順序進(jìn)行競爭,直到其中一個(gè)線程成功獲取鎖為止。
在公平模式下,所有線程將按照其請求鎖的順序進(jìn)行競爭,因此不會出現(xiàn)饑餓現(xiàn)象,也就是說,沒有任何線程會無限期地等待獲取鎖。但是,由于需要維護(hù)等待隊(duì)列,因此在高并發(fā)場景下,公平模式可能會導(dǎo)致性能下降。
lock
final void lock() {
acquire(1);
}
tryAcquire
protected final boolean tryAcquire(int acquires) {
// 1. 獲取當(dāng)前線程
final Thread current = Thread.currentThread();
// 2. 獲取當(dāng)前鎖的狀態(tài)值
int c = getState();
// 3. 判斷鎖是否可用
if (c == 0) {
// 3.1 如果當(dāng)前鎖的狀態(tài)值為0,說明鎖是可用的。如果沒有排隊(duì)的線程且能夠通過CAS(compareAndSetState)操作將鎖的狀態(tài)值從0改為acquires,則獲取鎖成功,將當(dāng)前線程設(shè)置為獨(dú)占鎖的線程并返回true。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 4. 判斷是否是獨(dú)占鎖的線程,如果當(dāng)前線程是獨(dú)占鎖的線程,說明已經(jīng)擁有了鎖,此時(shí)通過增加鎖的狀態(tài)值來實(shí)現(xiàn)可重入,并返回true。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 5. 如果無法獲取鎖,則返回false。
return false;
}
hasQueuedPredecessors
其中對于hasQueuedPredecessors方法,官方是這樣描述的:
- 如果當(dāng)前線程前面有一個(gè)排隊(duì)的線程,則為
true - 如果當(dāng)前線程位于隊(duì)列的頭部或隊(duì)列為空,則返回
false
getExclusiveOwnerThread
getExclusiveOwnerThread()是Java中AbstractOwnableSynchronizer(AOS)類的方法,用于獲取當(dāng)前鎖的獨(dú)占線程。該方法通常與可重入鎖(ReentrantLock)結(jié)合使用。
在可重入鎖中,同一個(gè)線程可以多次獲取鎖,每次獲取鎖時(shí),都會將鎖的狀態(tài)值加1。如果當(dāng)前線程已經(jīng)獲取了鎖,則它就是鎖的獨(dú)占線程。getExclusiveOwnerThread()方法可以獲取當(dāng)前鎖的獨(dú)占線程,如果當(dāng)前鎖沒有被線程占用,則返回null。
在tryAcquire()方法中,如果當(dāng)前線程已經(jīng)獲取了鎖,則可以直接對鎖的狀態(tài)值進(jìn)行修改,而無需再次獲取鎖。因此,通過getExclusiveOwnerThread()方法來判斷當(dāng)前線程是否已經(jīng)獲取了鎖。
總的來說,getExclusiveOwnerThread()方法的作用是獲取當(dāng)前鎖的獨(dú)占線程,可以用于判斷當(dāng)前線程是否已經(jīng)獲取了鎖,從而進(jìn)行相應(yīng)的處理。
非公平鎖
lock
final void lock() {
// 1. 嘗試獲取鎖,如果成功,設(shè)置當(dāng)前線程為獨(dú)占線程
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 2. 否則調(diào)用AQS的acquire方法,子類實(shí)現(xiàn)nonfairTryAcquire方法
acquire(1);
}
nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {
// 1. 首先,獲取當(dāng)前線程對象并獲取當(dāng)前鎖的狀態(tài)值。
final Thread current = Thread.currentThread();
int c = getState();
// 2. 如果當(dāng)前鎖的狀態(tài)值為0,表示當(dāng)前鎖沒有被其他線程占用,則當(dāng)前線程可以直接獲取鎖。
if (c == 0) {
// 3. cas嘗試獲取鎖,如果成功,設(shè)置當(dāng)前線程為獨(dú)占鎖,并且返回true
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 4. 如果當(dāng)前鎖的狀態(tài)值不為0,但是當(dāng)前線程已經(jīng)獲取了鎖,則可以直接對鎖的狀態(tài)值進(jìn)行修改,而無需再次獲取鎖。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
condition
ReentrantLock是一個(gè)可重入的互斥鎖,它提供了比內(nèi)置鎖更高級的同步功能。在使用ReentrantLock時(shí),我們可以通過調(diào)用它的newCondition()方法創(chuàng)建一個(gè)Condition對象,來實(shí)現(xiàn)更加靈活的線程同步。
Condition是在Java 5中引入的一種新的線程同步機(jī)制,它提供了await()和signal()等方法,可以用于線程之間的通信和協(xié)調(diào)。
ReentrantLock的newCondition()方法可以創(chuàng)建一個(gè)與當(dāng)前鎖關(guān)聯(lián)的Condition對象。調(diào)用該Condition對象的await()方法可以使當(dāng)前線程等待,直到另一個(gè)線程調(diào)用該Condition對象的signal()方法或signalAll()方法喚醒它。
舉個(gè)例子,假設(shè)我們有一個(gè)任務(wù)隊(duì)列,多個(gè)線程需要從隊(duì)列中獲取任務(wù)并執(zhí)行。我們可以使用ReentrantLock來實(shí)現(xiàn)對隊(duì)列的同步,并且為每個(gè)線程分配一個(gè)Condition對象,以便在隊(duì)列為空時(shí)等待任務(wù)的到來。具體實(shí)現(xiàn)可以參考下面的代碼:
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class TaskQueue {
private Queue<String> queue = new LinkedList<>();
private ReentrantLock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
public void addTask(String task) {
lock.lock();
try {
queue.add(task);
notEmpty.signal(); // 通知等待的線程
} finally {
lock.unlock();
}
}
public String getTask() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待任務(wù)的到來
}
return queue.remove();
} finally {
lock.unlock();
}
}
}
在這個(gè)示例中,我們使用ReentrantLock和Condition實(shí)現(xiàn)了一個(gè)線程安全的任務(wù)隊(duì)列。當(dāng)任務(wù)隊(duì)列為空時(shí),調(diào)用getTask()方法的線程會等待,直到其他線程調(diào)用addTask()方法向隊(duì)列中添加任務(wù)并通過notEmpty.signal()通知它們。
注意,在使用Condition對象時(shí),一定要在調(diào)用await()方法前獲得鎖,并在finally塊中釋放鎖。否則,如果線程在等待時(shí)被中斷,它會持有鎖而無法釋放,導(dǎo)致其他線程無法獲取鎖。