ReentrantLock

以下分析均基于jdk1.8

AQS 是一個(gè)用于實(shí)現(xiàn)阻塞鎖和相關(guān)同步器的框架,它提供了一些基本的原子操作(如 CAS,自旋等待)以及一個(gè)等待隊(duì)列來協(xié)調(diào)多個(gè)線程之間的互斥和共享訪問。ReentrantLock 實(shí)現(xiàn)了 AQS 的 tryAcquire 和 release 方法來獲取和釋放鎖,以及 Condition 來支持鎖的條件等待。
ReentrantLock 在基于 AQS 實(shí)現(xiàn)的同時(shí),也可以通過重入鎖的方式實(shí)現(xiàn)線程的可重入性,使得同一個(gè)線程可以多次獲取同一個(gè)鎖而不會死鎖。此外,ReentrantLock 還提供了公平鎖和非公平鎖兩種模式,以適應(yīng)不同的應(yīng)用場景。

AQS

 public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這段代碼是 AQS(AbstractQueuedSynchronizer)類中的 acquire(int arg) 方法實(shí)現(xiàn)。該方法用于獲取獨(dú)占式鎖(exclusive lock)。以下是方法的含義和執(zhí)行流程:

  1. tryAcquire(arg) 嘗試直接獲取獨(dú)占鎖,如果成功返回 true,否則執(zhí)行第二步。
  2. addWaiter(Node.EXCLUSIVE) 創(chuàng)建一個(gè)獨(dú)占節(jié)點(diǎn) Node.EXCLUSIVE,將其加入到等待隊(duì)列中,并返回該節(jié)點(diǎn)。
  3. acquireQueued(node, arg) 將節(jié)點(diǎn) node 加入到等待隊(duì)列中,并阻塞線程,直到獲取獨(dú)占鎖成功為止。
  4. selfInterrupt() 如果線程在等待過程中被中斷,調(diào)用 selfInterrupt() 方法將線程的中斷狀態(tài)重新設(shè)置。

因此,該方法的作用是獲取獨(dú)占式鎖,并在獲取不到鎖時(shí)將線程加入到等待隊(duì)列中,直到鎖被釋放或者線程被中斷才返回。

acquireQueued
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 循環(huán)執(zhí)行步驟 1 到步驟 4,直到成功獲取到鎖或者線程被中斷。
            for (;;) {
                // 1. 返回等待隊(duì)列中當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)。
                final Node p = node.predecessor();
                // 2. 如果前一個(gè)節(jié)點(diǎn)是 head(即等待隊(duì)列的首節(jié)點(diǎn))并且能夠通過 tryAcquire(arg) 方法獲取到鎖,就將當(dāng)前節(jié)點(diǎn)設(shè)為新的 head,將前一個(gè)節(jié)點(diǎn)的 next 引用設(shè)置為 null(以便垃圾回收),然后返回中斷狀態(tài)。
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 3. 如果無法獲取到鎖,調(diào)用 shouldParkAfterFailedAcquire(p, node) 判斷當(dāng)前線程是否應(yīng)該掛起等待。
                // 如果當(dāng)前線程應(yīng)該掛起等待,則調(diào)用 parkAndCheckInterrupt() 方法將線程掛起并檢查中斷狀態(tài),如果線程在等待期間被中斷,將中斷狀態(tài)設(shè)置為 true。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果等待過程中獲取鎖失敗,調(diào)用 cancelAcquire(node) 方法取消該節(jié)點(diǎn)的等待狀態(tài),并從等待隊(duì)列中移除。
            if (failed)
                cancelAcquire(node);
        }
    }

這段代碼是 AQS(AbstractQueuedSynchronizer)類中的 acquireQueued(Node node, int arg) 方法實(shí)現(xiàn)。該方法用于將節(jié)點(diǎn)加入到等待隊(duì)列中,并且在隊(duì)列中自旋等待直到獲取鎖為止。以下是該方法的主要步驟:

  1. node.predecessor() 返回等待隊(duì)列中當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)。
  2. 如果前一個(gè)節(jié)點(diǎn)是 head(即等待隊(duì)列的首節(jié)點(diǎn))并且能夠通過 tryAcquire(arg) 方法獲取到鎖,就將當(dāng)前節(jié)點(diǎn)設(shè)為新的 head,將前一個(gè)節(jié)點(diǎn)的 next 引用設(shè)置為 null(以便垃圾回收),然后返回中斷狀態(tài)。
  3. 如果無法獲取到鎖,調(diào)用 shouldParkAfterFailedAcquire(p, node) 判斷當(dāng)前線程是否應(yīng)該掛起等待。
  4. 如果當(dāng)前線程應(yīng)該掛起等待,則調(diào)用 parkAndCheckInterrupt() 方法將線程掛起并檢查中斷狀態(tài),如果線程在等待期間被中斷,將中斷狀態(tài)設(shè)置為 true。
  5. 循環(huán)執(zhí)行步驟 1 到步驟 4,直到成功獲取到鎖或者線程被中斷。
  6. 如果等待過程中獲取鎖失敗,調(diào)用 cancelAcquire(node) 方法取消該節(jié)點(diǎn)的等待狀態(tài),并從等待隊(duì)列中移除。

因此,acquireQueued(Node node, int arg) 方法實(shí)現(xiàn)了等待隊(duì)列中的節(jié)點(diǎn)自旋等待獲取鎖,直到獲取到鎖或者線程被中斷。在等待期間,會根據(jù)節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)和鎖狀態(tài)來判斷是否需要掛起等待,并且通過循環(huán)不斷重試獲取鎖的操作。

公平鎖

在 JDK 1.8 中,ReentrantLock 在公平模式下鎖的搶占規(guī)則如下:

  1. 當(dāng)線程嘗試獲取鎖時(shí),如果鎖當(dāng)前沒有被其他線程持有,那么該線程將立即獲得鎖并成為鎖的持有者。
  2. 當(dāng)線程嘗試獲取鎖時(shí),如果鎖當(dāng)前被其他線程持有,那么該線程將進(jìn)入等待隊(duì)列,以 FIFO(先進(jìn)先出)的順序排隊(duì)等待獲取鎖。
  3. 當(dāng)鎖的持有者釋放鎖時(shí),如果等待隊(duì)列中存在線程,則會選擇隊(duì)列中的第一個(gè)線程作為下一個(gè)持有者,并將其從等待隊(duì)列中移除,然后喚醒該線程,使其嘗試獲取鎖。
  4. 如果有多個(gè)線程在等待隊(duì)列中等待獲取鎖,那么它們將按照先進(jìn)先出的順序進(jìn)行競爭,直到其中一個(gè)線程成功獲取鎖為止。

在公平模式下,所有線程將按照其請求鎖的順序進(jìn)行競爭,因此不會出現(xiàn)饑餓現(xiàn)象,也就是說,沒有任何線程會無限期地等待獲取鎖。但是,由于需要維護(hù)等待隊(duì)列,因此在高并發(fā)場景下,公平模式可能會導(dǎo)致性能下降。

lock
final void lock() {
            acquire(1);
        }
tryAcquire
   protected final boolean tryAcquire(int acquires) {
            // 1. 獲取當(dāng)前線程
            final Thread current = Thread.currentThread();
            // 2. 獲取當(dāng)前鎖的狀態(tài)值
            int c = getState();
            // 3. 判斷鎖是否可用
            if (c == 0) {
            // 3.1 如果當(dāng)前鎖的狀態(tài)值為0,說明鎖是可用的。如果沒有排隊(duì)的線程且能夠通過CAS(compareAndSetState)操作將鎖的狀態(tài)值從0改為acquires,則獲取鎖成功,將當(dāng)前線程設(shè)置為獨(dú)占鎖的線程并返回true。
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 4. 判斷是否是獨(dú)占鎖的線程,如果當(dāng)前線程是獨(dú)占鎖的線程,說明已經(jīng)擁有了鎖,此時(shí)通過增加鎖的狀態(tài)值來實(shí)現(xiàn)可重入,并返回true。
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            // 5. 如果無法獲取鎖,則返回false。
            return false;
        }
hasQueuedPredecessors

其中對于hasQueuedPredecessors方法,官方是這樣描述的:

  1. 如果當(dāng)前線程前面有一個(gè)排隊(duì)的線程,則為 true
  2. 如果當(dāng)前線程位于隊(duì)列的頭部或隊(duì)列為空,則返回 false
getExclusiveOwnerThread

getExclusiveOwnerThread()是Java中AbstractOwnableSynchronizer(AOS)類的方法,用于獲取當(dāng)前鎖的獨(dú)占線程。該方法通常與可重入鎖(ReentrantLock)結(jié)合使用。
在可重入鎖中,同一個(gè)線程可以多次獲取鎖,每次獲取鎖時(shí),都會將鎖的狀態(tài)值加1。如果當(dāng)前線程已經(jīng)獲取了鎖,則它就是鎖的獨(dú)占線程。getExclusiveOwnerThread()方法可以獲取當(dāng)前鎖的獨(dú)占線程,如果當(dāng)前鎖沒有被線程占用,則返回null。
在tryAcquire()方法中,如果當(dāng)前線程已經(jīng)獲取了鎖,則可以直接對鎖的狀態(tài)值進(jìn)行修改,而無需再次獲取鎖。因此,通過getExclusiveOwnerThread()方法來判斷當(dāng)前線程是否已經(jīng)獲取了鎖。
總的來說,getExclusiveOwnerThread()方法的作用是獲取當(dāng)前鎖的獨(dú)占線程,可以用于判斷當(dāng)前線程是否已經(jīng)獲取了鎖,從而進(jìn)行相應(yīng)的處理。

非公平鎖

lock
 final void lock() {
             // 1. 嘗試獲取鎖,如果成功,設(shè)置當(dāng)前線程為獨(dú)占線程
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
            // 2. 否則調(diào)用AQS的acquire方法,子類實(shí)現(xiàn)nonfairTryAcquire方法
                acquire(1);
        }
nonfairTryAcquire
  final boolean nonfairTryAcquire(int acquires) {
              // 1. 首先,獲取當(dāng)前線程對象并獲取當(dāng)前鎖的狀態(tài)值。
            final Thread current = Thread.currentThread();
            int c = getState();
            // 2. 如果當(dāng)前鎖的狀態(tài)值為0,表示當(dāng)前鎖沒有被其他線程占用,則當(dāng)前線程可以直接獲取鎖。
            if (c == 0) {
                // 3. cas嘗試獲取鎖,如果成功,設(shè)置當(dāng)前線程為獨(dú)占鎖,并且返回true
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            // 4. 如果當(dāng)前鎖的狀態(tài)值不為0,但是當(dāng)前線程已經(jīng)獲取了鎖,則可以直接對鎖的狀態(tài)值進(jìn)行修改,而無需再次獲取鎖。
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

condition

ReentrantLock是一個(gè)可重入的互斥鎖,它提供了比內(nèi)置鎖更高級的同步功能。在使用ReentrantLock時(shí),我們可以通過調(diào)用它的newCondition()方法創(chuàng)建一個(gè)Condition對象,來實(shí)現(xiàn)更加靈活的線程同步。
Condition是在Java 5中引入的一種新的線程同步機(jī)制,它提供了await()和signal()等方法,可以用于線程之間的通信和協(xié)調(diào)。
ReentrantLock的newCondition()方法可以創(chuàng)建一個(gè)與當(dāng)前鎖關(guān)聯(lián)的Condition對象。調(diào)用該Condition對象的await()方法可以使當(dāng)前線程等待,直到另一個(gè)線程調(diào)用該Condition對象的signal()方法或signalAll()方法喚醒它。
舉個(gè)例子,假設(shè)我們有一個(gè)任務(wù)隊(duì)列,多個(gè)線程需要從隊(duì)列中獲取任務(wù)并執(zhí)行。我們可以使用ReentrantLock來實(shí)現(xiàn)對隊(duì)列的同步,并且為每個(gè)線程分配一個(gè)Condition對象,以便在隊(duì)列為空時(shí)等待任務(wù)的到來。具體實(shí)現(xiàn)可以參考下面的代碼:

import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class TaskQueue {
    private Queue<String> queue = new LinkedList<>();
    private ReentrantLock lock = new ReentrantLock();
    private Condition notEmpty = lock.newCondition();

    public void addTask(String task) {
        lock.lock();
        try {
            queue.add(task);
            notEmpty.signal(); // 通知等待的線程
        } finally {
            lock.unlock();
        }
    }

    public String getTask() throws InterruptedException {
        lock.lock();
        try {
            while (queue.isEmpty()) {
                notEmpty.await(); // 等待任務(wù)的到來
            }
            return queue.remove();
        } finally {
            lock.unlock();
        }
    }
}

在這個(gè)示例中,我們使用ReentrantLock和Condition實(shí)現(xiàn)了一個(gè)線程安全的任務(wù)隊(duì)列。當(dāng)任務(wù)隊(duì)列為空時(shí),調(diào)用getTask()方法的線程會等待,直到其他線程調(diào)用addTask()方法向隊(duì)列中添加任務(wù)并通過notEmpty.signal()通知它們。
注意,在使用Condition對象時(shí),一定要在調(diào)用await()方法前獲得鎖,并在finally塊中釋放鎖。否則,如果線程在等待時(shí)被中斷,它會持有鎖而無法釋放,導(dǎo)致其他線程無法獲取鎖。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容