ReentrantLock實(shí)現(xiàn)機(jī)制(CLH隊(duì)列鎖)

如何實(shí)現(xiàn)一個(gè)鎖

實(shí)現(xiàn)一個(gè)鎖,主要需要考慮2個(gè)問(wèn)題

  1. 如何線程安全的修改鎖狀態(tài)位?
  2. 得不到鎖的線程,如何排隊(duì)?

帶著這2個(gè)問(wèn)題,我們看一下JUC中的ReentrantLock是如何做的?

ReentrantLock鎖實(shí)現(xiàn)

ReentrantLock類的大部分邏輯,都是其均繼承自AQS的內(nèi)部類Sync實(shí)現(xiàn)的

如何線程安全的修改鎖狀態(tài)位?

鎖狀態(tài)位的修改主要通過(guò),內(nèi)部類Sync實(shí)現(xiàn)的

    public abstract class AbstractQueuedSynchronizer{
         //鎖狀態(tài)標(biāo)志位:volatile變量(多線程間通過(guò)此變量判斷鎖的狀態(tài))
          private volatile int state;
         
          protected final int getState() {
                 return state;
          }

 
         protected final void setState(int newState) {
              state = newState;
         }

    }
  
    abstract static  Sync extends AbstractQueuedSynchronizer {
    
        
         final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //volatile讀,確保了鎖狀態(tài)位的內(nèi)存可見(jiàn)性
            int c = getState();
            //鎖還沒(méi)有被其他線程占用
            if (c == 0) {
                //此時(shí),如果多個(gè)線程同時(shí)進(jìn)入,CAS操作會(huì)確保,只有一個(gè)線程修改成功
                if (compareAndSetState(0, acquires)) {
                    //設(shè)置當(dāng)前線程擁有獨(dú)占訪問(wèn)權(quán)
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //當(dāng)前線程就是擁有獨(dú)占訪問(wèn)權(quán)的線程,即鎖重入
            else if (current == getExclusiveOwnerThread()) {
                //重入鎖計(jì)數(shù)+1
                int nextc = c + acquires;
                if (nextc < 0) //溢出
                    throw new Error("Maximum lock count exceeded");
                //只有獲取鎖的線程,才能進(jìn)入此段代碼,因此只需要一個(gè)volatile寫(xiě)操作,確保其內(nèi)存可見(jiàn)性即可
                setState(nextc);
                return true;
            }
            return false;
        }
        
        //只有獲取鎖的線程才會(huì)執(zhí)行此方法,因此只需要volatile讀寫(xiě)確保內(nèi)存可見(jiàn)性即可
        protected final boolean tryRelease(int releases) {
            //鎖計(jì)數(shù)器-1
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //鎖計(jì)數(shù)器為0,說(shuō)明鎖被釋放
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    }
   

從代碼中,我們可以發(fā)現(xiàn)線程安全的關(guān)鍵在于:<font color="#dd0000">volatile變量</font>和<font color="#dd0000">CAS原語(yǔ)</font>的配合使用

得不到鎖的線程,如何排隊(duì)?

JUC中鎖的排隊(duì)策略,是基于CLH隊(duì)列的變種實(shí)現(xiàn)的。因此,我們先看看啥是CLH隊(duì)列

CLH隊(duì)列

image.png

CLH 隊(duì)列鎖的特點(diǎn):
? 基于鏈表隊(duì)列的 FIFO 排隊(duì)機(jī)制。
? 獲取不到鎖的線程會(huì)進(jìn)入隊(duì)列尾部
? 每個(gè)線程僅自旋檢測(cè)自己的 前驅(qū)節(jié)點(diǎn) 狀態(tài),而不是直接檢查鎖狀態(tài)。
? 線程獲取鎖時(shí)不會(huì)競(jìng)爭(zhēng) CPU,但也不會(huì)掛起線程,而是通過(guò)自旋監(jiān)控前驅(qū)節(jié)點(diǎn)。

CLH 隊(duì)列鎖的優(yōu)點(diǎn):
1. 每個(gè)線程只需輪詢自己前驅(qū)節(jié)點(diǎn)的狀態(tài),降低了對(duì)共享變量的頻繁訪問(wèn)。
2. 減少了鎖競(jìng)爭(zhēng)的 “假喚醒” 現(xiàn)象,提升了并發(fā)性能。
3. FIFO 排隊(duì)保證鎖獲取的公平性。

CLH 隊(duì)列鎖的缺點(diǎn):
? 線程在等待時(shí)仍需要 “自旋”,會(huì)占用 CPU 資源。
? 在高并發(fā)和長(zhǎng)時(shí)間阻塞時(shí),效率可能不如阻塞鎖(如 ReentrantLock)。

JUC中的CLH隊(duì)列實(shí)現(xiàn)

  • ReentrantLock 并不是傳統(tǒng)意義上的 CLH 自旋鎖,而是基于 AQS 框架實(shí)現(xiàn)的阻塞鎖,但它借鑒了 CLH 鎖的 FIFO 排隊(duì)機(jī)制來(lái)管理線程的等待順序
  • 相較于 CLH 隊(duì)列鎖,ReentrantLock 在實(shí)現(xiàn)上支持更豐富的特性,如 可重入、超時(shí)等待、公平/非公平控制、阻塞機(jī)制等
  • CLH 是自旋鎖,而 AQS 通過(guò) park/unpark 實(shí)現(xiàn)了高效的線程阻塞與喚醒,避免了高 CPU 資源消耗
image.png

我們來(lái)看看AbstractQueuedSynchronizer類中的acquire方法實(shí)現(xiàn)

    
    public final void acquire(int arg) {
        //嘗試獲取鎖
        if (!tryAcquire(arg) &&
            //獲取不到,則進(jìn)入等待隊(duì)列,返回是否中斷
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //如果acquireQueued返回中斷,則調(diào)用當(dāng)前線程的interrupt()方法
            selfInterrupt();
    }

入隊(duì)

如果線程調(diào)用tryAcquire(其最終實(shí)現(xiàn)是調(diào)用上面分析過(guò)的nonfairTryAcquire方法)獲取鎖失敗。則首先調(diào)用addWaiter(Node.EXCLUSIVE)方法,將自己加入CLH隊(duì)列的尾部。

    private Node addWaiter(Node mode) {
        //線程對(duì)應(yīng)的Node
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        //尾節(jié)點(diǎn)不為空
        if (pred != null) {
            //當(dāng)前node的前驅(qū)指向尾節(jié)點(diǎn)
            node.prev = pred;
            //將當(dāng)前node設(shè)置為新的尾節(jié)點(diǎn)
            //如果cas操作失敗,說(shuō)明線程競(jìng)爭(zhēng)
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //lockfree的方式插入隊(duì)尾
        enq(node);
        return node;
    }
    private Node enq(final Node node) {
        //經(jīng)典的lockfree算法:循環(huán)+CAS
        for (;;) {
            Node t = tail;
            //尾節(jié)點(diǎn)為空
            if (t == null) { // Must initialize
                //初始化頭節(jié)點(diǎn)
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

入隊(duì)過(guò)程,入下圖所示

1 T0持有鎖時(shí),其CLH隊(duì)列的頭尾指針為NULL

image.png

2 線程T1,此時(shí)請(qǐng)求鎖,由于鎖被T0占有。因此加入隊(duì)列尾部。具體過(guò)程如下所示:

(1) 初始化頭節(jié)點(diǎn)

(2) 初始化T1節(jié)點(diǎn),入隊(duì),尾指針指向T1

image.png

3 此時(shí)如果有一個(gè)T10線程先于T1入隊(duì),則T1執(zhí)行compareAndSetTail(t, node)會(huì)失敗,然后回到for循環(huán)開(kāi)始處,重新入隊(duì)。


image.png

由自旋到阻塞

入隊(duì)后,調(diào)用acquireQueued方法,進(jìn)入阻塞狀態(tài),直到被前驅(qū)節(jié)點(diǎn)喚醒

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //其前驅(qū)是頭結(jié)點(diǎn),并且再次調(diào)用tryAcquire成功獲取鎖
                if (p == head && tryAcquire(arg)) {
                    //將自己設(shè)為頭結(jié)點(diǎn)
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    //成功獲取鎖,返回
                    return interrupted;
                }
                //沒(méi)有得到鎖時(shí):
                //shouldParkAfterFailedAcquire方法:返回是否需要阻塞當(dāng)前線程
                //parkAndCheckInterrupt方法:阻塞當(dāng)前線程,當(dāng)線程再次喚醒時(shí),返回是否被中斷
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //修改中斷標(biāo)志位
                    interrupted = true;
            }
        } finally {
            if (failed)
                //獲取鎖失敗,則將此線程對(duì)應(yīng)的node的waitStatus改為CANCEL
                cancelAcquire(node);
        }
    }
    
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }
    
      /**
     * 獲取鎖失敗時(shí),檢查并更新node的waitStatus。
     * 如果線程需要阻塞,返回true。
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        
        //前驅(qū)節(jié)點(diǎn)的waitStatus是SIGNAL。
        if (ws == Node.SIGNAL)
            /* 
             * SIGNAL狀態(tài)的節(jié)點(diǎn),釋放鎖后,會(huì)喚醒其后繼節(jié)點(diǎn)。
             * 因此,此線程可以安全的阻塞(前驅(qū)節(jié)點(diǎn)釋放鎖時(shí),會(huì)喚醒此線程)。
             */
            return true;
            
        //前驅(qū)節(jié)點(diǎn)對(duì)應(yīng)的線程被取消
        if (ws > 0) {
            do {
                //跳過(guò)此前驅(qū)節(jié)點(diǎn)
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
               此時(shí),需要將前驅(qū)節(jié)點(diǎn)的狀態(tài)設(shè)置為SIGNAL。
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
     //當(dāng)shouldParkAfterFailedAcquire方法返回true,則調(diào)用parkAndCheckInterrupt方法阻塞當(dāng)前線程
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

自旋過(guò)程,入下圖所示

image.png
image.png
image.png
image.png
image.png



然后線程T2,加入了請(qǐng)求鎖的隊(duì)列,尾指針后移。

image.png
image.png
image.png

終上所述,每個(gè)得不到鎖的線程,都會(huì)講自己封裝成Node,加入隊(duì)尾,或自旋或阻塞,直到獲取鎖(為簡(jiǎn)化問(wèn)題,不考慮取消的情況)

鎖的釋放

前文提到,T1,T2在阻塞之前,都回去修改其前驅(qū)節(jié)點(diǎn)的waitStatus=-1。這是為什么?
我們看下鎖釋放的代碼,便一目了然

    public final boolean release(int arg) {
        //修改鎖計(jì)數(shù)器,如果計(jì)數(shù)器為0,說(shuō)明鎖被釋放
        if (tryRelease(arg)) {
            Node h = head;
            //head節(jié)點(diǎn)的waitStatus不等于0,說(shuō)明head節(jié)點(diǎn)的后繼節(jié)點(diǎn)對(duì)應(yīng)的線程,正在阻塞,等待被喚醒
            if (h != null && h.waitStatus != 0)
                //喚醒后繼節(jié)點(diǎn)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    
   
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

      
        //后繼節(jié)點(diǎn)
        Node s = node.next;
        //如果s被取消,跳過(guò)被取消節(jié)點(diǎn)
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            //喚醒后繼節(jié)點(diǎn)
            LockSupport.unpark(s.thread);
    }

如代碼所示,waitStatus=-1的作用,主要是告訴釋放鎖的線程:后面還有排隊(duì)等待獲取鎖的線程,請(qǐng)喚醒他!

釋放鎖的過(guò)程,如圖所示:

image.png
image.png

總結(jié)

實(shí)現(xiàn)鎖的關(guān)鍵在于:

  1. 通過(guò)CAS操作與volatile變量互相配合,線程安全的修改鎖標(biāo)志位
  2. 基于CLH隊(duì)列,實(shí)現(xiàn)鎖的排隊(duì)策略

問(wèn)題討論

1 unparkSuccessor時(shí)為什么會(huì)出現(xiàn)s==null || s.waitStatus>0的情況
2 這種情況下,為什么要通過(guò)prev指針?lè)聪虿檎襍uccessor節(jié)點(diǎn)

//unparkSuccessor方法的相關(guān)代碼
//釋放鎖時(shí),s就是head.next
 if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }

原因如下:
當(dāng)釋放鎖的線程在執(zhí)行unparkSuccessor(head)時(shí),分情況進(jìn)行分析:
(1)s==0的情況:
觸發(fā)條件:head的successor節(jié)點(diǎn)獲取鎖成功后,執(zhí)行了head.next=null的操作后,解鎖線程讀取了head.next,因此s==null
反向遍歷的原因:next指針在head.next處斷開(kāi)了,只能通過(guò)prev指針遍歷
關(guān)鍵代碼,如圖所示:


image.png

(2)s.waitStatus > 0的情況
觸發(fā)條件:head的successor節(jié)點(diǎn)被取消(cancelAcquire)時(shí),執(zhí)行了如下操作:
successor.waitStatus=1 ;
successor.next = successor;
反向遍歷的原因:next指針在head節(jié)點(diǎn)的后繼節(jié)點(diǎn)處斷開(kāi)(head.next.next),因此只能通過(guò)prev指針遍歷
關(guān)鍵代碼,如圖所示:


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容