并發(fā)系列四:基于兩種案例來認識ReentrantLock源碼加鎖過程(公平鎖)

前言

  • 上篇文章咱們證明了synchronized關鍵字的特性:無鎖、偏向鎖、輕量鎖、重(chong)偏向、重(chong)輕量、重量鎖??梢哉fsynchronized是jvm層面實現(xiàn)同步的方式。在jdk中,存在一個叫java.util.concurrent的包,簡稱JUC,它是一個jdk層面的并發(fā)包,里面存在了大量與并發(fā)編程相關的api,其中最代表意義的就是atomic和lock兩種類別,前者是基于樂觀鎖CAS(Compare And Swap)的實現(xiàn),后者是基于AQS(Abstract Queued Synchronizer)實現(xiàn)。本文將詳細講解下AQS原理以及根據(jù)兩個案例來解讀ReentrantLock源碼。
  • 兩個案例:

    1.線程A單獨加鎖
    2.線程A正在持有鎖的過程中,線程t1來加鎖

一、AQS原理

  • AQS簡稱Abstract Queued Synchronizer,它的核心是基于一個雙向鏈表組成的隊列(CLH隊列) + volatile關鍵字修飾的int類型變量實現(xiàn)的。(關于volatile關鍵字可以參考其他博主的一些總結(jié): 傳送門),大致核心可以以如下圖來呈現(xiàn):

    在這里插入圖片描述

    簡單總結(jié)就是:內(nèi)部使用雙向鏈表維護了一個隊列,其中Node數(shù)據(jù)結(jié)構(gòu)為此隊列的基石,內(nèi)部維護了prev(指向上一個節(jié)點)、next(指向下一個節(jié)點)、waitStatus(當前node的狀態(tài))、thread(當前維護的線程)四個重要的屬性。其中waitStatus分別有如下取值:

    Node中waitStatus具體取值 含義
    CANCELLED(1) 中斷或取消,此狀態(tài)下的節(jié)點會從隊列中移除
    SIGNAL(-1) 此狀態(tài)下的節(jié)點一定是在隊列排隊中
    CONDITION(-2) 條件阻塞,比如說內(nèi)部因Condition而阻塞的節(jié)點
    PROPAGATE(-3) 表示下一個acquireShared應該無條件傳播
    0 默認狀態(tài)

    除此之外,隊列中還維護了三個屬性,head(指向隊列中的頭節(jié)點)、state(鎖的狀態(tài))、tail(指向隊列中的尾節(jié)點)。其中,state的取值有兩種情況,將以如下表展示出來:

    AQS中state具體取值 含義
    0 表示當前鎖沒有被線程持有
    1 表示當前鎖正在被線程持有
    大于1 表示當前鎖被線程重入了(重入鎖),這里要注意:ReentrantLock重入了幾次,就要釋放幾次鎖

二、案例1:線程A單獨加鎖

  • 代碼如下:

    public class SimpleThreadLock {
    
        static ReentrantLock lock = new ReentrantLock(true);
    
        public static void main(String[] args) throws InterruptedException {
            Thread a = new Thread(() -> {
                try {
                    lock.lock();
                    System.out.println("Get lock");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "線程a");
    
            a.start();
            a.join();
            System.out.println("end");
        }
    }
    

    代碼也比較簡單,就是在主線程中創(chuàng)建了一個線程,并且內(nèi)部去使用ReentrantLock加鎖,獲取到鎖后就打印出Get lock這句話,當t1線程執(zhí)行完后再繼續(xù)執(zhí)行主線程的邏輯。這里就不一步步演示斷點了,直接上源碼。

  • 這里先說明下ReentrantLock重載的兩個構(gòu)造方法

    // 默認非公平鎖
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    
    // 若傳入true則是公平鎖
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    

    因為咱們傳入了true進去,所以此時,它是一把公平鎖。

  • lock.lock()方法,因為咱們指定了使用公平鎖,所以最終會進入ReentrantLock內(nèi)部維護的FairSync類的lock方法

    // FairSync類下的lock方法
    final void lock() {
        acquire(1);
    }
    

    于是,我們需要找到acquire方法,此方法為AQS(父類AbstractQueuedSynchronizer)的方法,所以最終會進入如下這么一段代碼:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    這段代碼,看似很精簡,但是它做的事真的太多了。濃縮的才是精華呀!好了,咱們不偏題,繼續(xù)按照咱們的主題:線程A單獨加鎖。不過要繼續(xù)往下看,還是要加深下acquire方法的含義,我們必須要tryAcquire方法返回false,才能繼續(xù)走if條件中后面的邏輯,以及if條件內(nèi)部的邏輯。于是,我們直接看tryAcquire方法源碼:

  • tryAcquire方法

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    tryAcquire方法是一個protected方法,內(nèi)部直接拋出了一個異常,還記得咱們是從哪個類掉用到父類AbstractQueuedSynchronizer的acquire方法的?沒錯,就是FairSync類。那么咱們就直接定位到FairSync類的tryAcquire方法唄。

    protected final boolean tryAcquire(int acquires) {
        // 拿到當前線程,也就是線程A
        final Thread current = Thread.currentThread();
    
        // 拿到當前aqs的state變量,我們沒有修改過它,
        // 默認為0
        int c = getState();
        if (c == 0) {
            // 進入此邏輯,此邏輯跟acquire方法有點類似
            // 必須要hasQueuedPredecessors()方法返回false
            // 才能繼續(xù)往下執(zhí)行,于是我們把hasQueuedPredecessors的源碼也貼出來
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    
  • hasQueuedPredecessors方法源碼

    public final boolean hasQueuedPredecessors() {
        // 拿到aqs中的tail
        Node t = tail; 
        // 拿到aqs中的head
        Node h = head;
        Node s;
    
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    此方法涵蓋的情景比較多,但是就當前情景而言,它很容易理解,在當前情形中,我們壓根沒操作過tail和head那么h 肯定等于 t,所以此方法返回false,返回false后,我們回到FairSync類的tryAcquire方法,

    protected final boolean tryAcquire(int acquires) {
        // .... 上半部分代碼省略
        if (c == 0) {
            // 在當前情景下,hasQueuedPredecessors返回的是false
            // 也就是說會繼續(xù)走if后面的邏輯,
            // if后面的邏輯就是執(zhí)行CAS操作,
            // 將state屬性從0設置成1
            // 由于此時只有一個線程在執(zhí)行,
            // 這個cas操作一定是成功的
            // cas成功后就會執(zhí)行setExclusiveOwnerThread代碼,這段代碼很有用
            // 它是一個賦值的操作,也就是記錄
            // 當前擁有鎖的線程
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // .... 下半部分else if邏輯也省略了
        return false;
    }
    

    通過上述代碼中的注釋,我們可以發(fā)現(xiàn),線程A加鎖成功后會返回true,至此,tryAcquire的返回值為true。還記的我們是從哪個方法進來的嗎?是的,是從父類AbstractQueuedSynchronizer的acquire方法進來的,上面總結(jié)到了,只有當tryAcquire返回false,才會繼續(xù)往下執(zhí)行。至此,線程A單獨加鎖的案例就結(jié)束了。通過這么一個單線程加鎖的案例,如果你認為AQS很簡單的話,那就大錯特錯了,單線程加鎖的案例中,我們僅使用到了AQS中的state變量,CLH隊列卻始終沒有涉及到,而且從加鎖到加鎖結(jié)束的整個過程,我們連一個Node類型的數(shù)據(jù)結(jié)構(gòu)都沒有看到過。那Node類型的數(shù)據(jù)結(jié)構(gòu)什么時候會被用到呢?我們來看下一個案例線程A正在持有鎖的過程中,線程t1來加鎖

三、案例2:線程A正在持有鎖的過程中,線程t1來加鎖

  • 同樣的,咱們改造下代碼:

    public class TwoThreadLock {
    
        static ReentrantLock lock = new ReentrantLock(true);
    
        public static void main(String[] args) throws InterruptedException {
            new Thread(() -> {
                try {
                    lock.lock();
                    System.out.println("Thread a get lock");
                    TimeUnit.SECONDS.sleep(60);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "線程a").start();
    
            Thread t1 = new Thread(() -> {
                try {
                    lock.lock();
                    System.out.println("Thread t1 get lock");
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }, "線程t1");
    
            t1.start();
            t1.join();
    
            System.out.println("end");
        }
    }
    

    上段代碼,毫無疑問,線程t1在調(diào)用lock.lock()方法時,就阻塞到那里了,要等線程a睡60s后才會繼續(xù)執(zhí)行,那么這里面到底做了哪些事呢?我們來一起研究下。

  • 同案例1,使用的是公平鎖,最終肯定會調(diào)用到tryAcquire方法去,咱們這次就一次性的把tryAcquire方法給講清楚

    protected final boolean tryAcquire(int acquires) {
        // 拿到當前線程,也就是線程t1
        final Thread current = Thread.currentThread();
    
        // 拿到當前aqs的state變量,此時的c是多少呢?
        // 沒錯,是1,因為鎖已經(jīng)被線程A占有了,此時的
        // state為1。于是它會走else if邏輯
        int c = getState();
        if (c == 0) {
            // 進入此邏輯,此邏輯跟acquire方法有點類似
            // 必須要hasQueuedPredecessors()方法返回false
            // 才能繼續(xù)往下執(zhí)行,于是我們把hasQueuedPredecessors的源碼也貼出來
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 走了else if邏輯,它也發(fā)現(xiàn)當前持有鎖的線程不是自己呀,于是直接return false
        // 這里順帶解釋下這個else if的邏輯,這個else if
        // 就是判斷當前調(diào)用lock方法的線程是不是和當前持有
        // 鎖的線程一樣,如果是一樣的,則將state + 1并賦值給nextc屬性
        // 這就表示了ReentrantLock支持重入性
        // 那么什么時候會出現(xiàn)nextc屬性小于0的情況呢?
        // nextc是一個int類型,當超過了它的存儲返回后
        // 會出現(xiàn)小于0的情況 ===> 也就是說ReentrantLock
        // 的重入次數(shù)最大為支持int類型最大值
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    

    通過上述代碼塊中的注釋可知,線程t1的加鎖流程并沒有這么順利,在tryAcquire方法中返回了false,那這代表了什么呢?是的,它代表著線程t1可以繼續(xù)走acquire后面的邏輯了,咱們繼續(xù)把acquire方法貼出來:

    public final void acquire(int arg) {
        // 在案例2的情況下,tryAcquire方法返回了false
        // 于是會執(zhí)行后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
        // 當acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回了true才會執(zhí)行內(nèi)部的selfInterrupt()方法
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    于是,咱們先了解下addWaiter(Node.EXCLUSIVE)方法,它的源碼如下:

    private Node addWaiter(Node mode) {
        // 此時的mode是由上述代碼塊傳入的,
        // 它的值為Node.EXCLUSIVE ===> 這是一個空節(jié)點,
        // 值為null,
        // 創(chuàng)建了一個node節(jié)點, 內(nèi)部維護了當前線程(線程t1),并且它的next節(jié)點為null(有Node的構(gòu)造方法可知)
        Node node = new Node(Thread.currentThread(), mode);
        // 拿到aqs隊列中的tail屬性,
        // 此時肯定為null啊(aqs隊列都沒初始化,哪來的隊尾節(jié)點)
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        
        // 此時pred為null,即不會走上面的if邏輯,于是執(zhí)行enq方法,記?。捍藭r傳入enq方法時的形參為新new出來的Node
        // 內(nèi)部維護的是當前線程(線程t1)
        enq(node);
        return node;
    }
    

    上面代碼塊的注釋也說了,最終會執(zhí)行到enq方法,enq方干啥的呢?猜一下?是的,它就是初始aqs隊列的。我們來看一下它的源碼:

    /**
     形參node內(nèi)部維護的線程為t2, 并且它的next屬性指向為null
     */
    private Node enq(final Node node) {
        // 此處寫了一個死循環(huán),也就是常說的自旋鎖
        for (;;) {
            // 自旋的過程中
            // 第一次自旋:
            //  拿到隊尾元素, 此時隊列都沒有,肯定為null
            //  發(fā)現(xiàn)隊列中的tail指向的是null,于是初始化tail節(jié)點,并讓aqs中的head指向了tail,
            //  至此,aqs簡易版本的隊列就出來啦,
            //  head和tail指向同一個node,并且此node內(nèi)部
            //  維護的thread、prev、next、waitStatus全是默認值
            // 由于是if else邏輯,所以初始化tail屬性后,就會進行第二次自旋
            // 第二次自旋:
            //  再次拿到tail, 由于第一次自旋把tail給初始化了,所以此時拿到的tail不為null, 于是走了else邏輯
            //  在else中,主要操作的是形參node, 還記得形參node是什么嗎? ==> 維護當前線程(線程t1)的node節(jié)點,
            //  此時會將node的上一個節(jié)點指向t節(jié)點
            //  同時進行cas操作,將node節(jié)點變成tail
            //  當cas成功后,再設置t的next指向node
            //  最終返回這個t.
            //  此時此刻這個t是什么樣的數(shù)據(jù)結(jié)構(gòu)呢?
            //  此時的這個t就是隊列中的head節(jié)點了,
            //  并且它的next為node(維護線程t1)
            //  所以此時此刻隊列中現(xiàn)在有兩個元素了
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    代碼中的注釋描述了enq的過程,我專門畫了一個圖來描述aqs隊列產(chǎn)生的過程,幫助理解:


    在這里插入圖片描述

    enq初始化aqs隊列的過程后,就執(zhí)行到了addWaiter方法的出口了

    private Node addWaiter(Node mode) {
        // ....上述代碼省略
        // enq初始化隊列后,會將node進行返回
        // 這個node就是維護線程t1的node,它已經(jīng)是
        // 隊列中的隊列了
        enq(node);
        return node;
    }
    

    addWaiter方法執(zhí)行完了之后,將繼續(xù)執(zhí)行acquire方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    此時應該接著執(zhí)行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)了,由于addWaiter方法已經(jīng)執(zhí)行完成,返回的是擁有當前線程的node,同時它也是當前隊列中的隊尾。我們來查看下acquireQueued的源碼:

    /**
     node形參為維護當前線程(t1)的節(jié)點,
     同時arg為1
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 此處又自旋了
            for (;;) {
                // 獲取到當前節(jié)點的上一個節(jié)點,在
                // 當前案例下,它是head節(jié)點
                final Node p = node.predecessor();
                // 第一次自旋:
                //   做判斷,發(fā)現(xiàn)上一個節(jié)點是head節(jié)點
                //   于是繼續(xù)執(zhí)行加鎖方法tryAcquire
                //   因為在當前案例下,線程a睡眠了60s
                //   肯定還是加鎖失敗的,加鎖失敗后,
                //   則走下面的邏輯,這里就是為了當前
                //   節(jié)點繼續(xù)上鎖、因為有可能前面的
                //   節(jié)點已經(jīng)釋放鎖了,或者說被park
                //   的線程被unpark了,要繼續(xù)自旋,
                //   嘗試獲取鎖
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                
                // 判斷當前這個節(jié)點是否需要park
                // 什么是park?就是使用unsafe類來阻塞指定的線程,
                // 在shouldParkAfterFailedAcquire方法中
                // 傳入的是當前節(jié)點和上一個節(jié)點,
                // 大致邏輯為:
                //   1. 判斷當前節(jié)點的上一個節(jié)點(即p)的waitStatus是不是SIGNAL(-1)狀態(tài),如果是則返回true
                //     SIGNAL代表什么呢?上面的表格中有說到
                //     SIGNAL代表這個Node是處于排隊狀態(tài)
                //     因此可以得出一個結(jié)論:如果上一個節(jié)點也處于排隊狀態(tài)
                //     那么我就返回true,進而執(zhí)行parkAndCheckInterrupt方法,parkAndCheckInterrupt方法就是讓park當前線程,讓當前線程進入阻塞狀態(tài),自旋再此暫停
                //   2. 如果p節(jié)點的waitStatus為負數(shù),即不是中斷或者取消狀態(tài)
                //      那么它會將p的waitStatus置為-1.并返回false
                //      進而進入第二次自旋,當進入第二次自旋時,若上面還未獲取鎖成功,那么當前線程就會被park
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    所以,當線程t2在執(zhí)行到此方法時,發(fā)現(xiàn)head即線程a對應的node的waitStatus為0,于是會自旋一次將head的waitStatus置為-1,然后再繼續(xù)自旋,此時自己嘗試加鎖又失敗了,此時就會進入park狀態(tài)。所以就在acquireQueued方法處阻塞了,等待線程a釋放鎖后喚醒線程t1。至此案例2的加鎖過程也結(jié)束了

四、總結(jié)

  • 本次只是基于兩個簡單的案例來認識ReentrantLock加鎖流程的源碼,其中還有很多其他的case沒有涉及到。這兩種案例算是認識ReentrantLock加鎖源碼的入門吧。下篇博客將介紹下基于這兩種案例的解鎖過程。
  • ReentrantLock加鎖流程涉及到每個方法的詳細步驟可查看在github中的總結(jié):傳送門
  • I am a slow walker, but I never walk backwards.
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容