前言
-
上篇文章咱們證明了synchronized關鍵字的特性:無鎖、偏向鎖、輕量鎖、重(chong)偏向、重(chong)輕量、重量鎖??梢哉fsynchronized是jvm層面實現(xiàn)同步的方式。在jdk中,存在一個叫
java.util.concurrent的包,簡稱JUC,它是一個jdk層面的并發(fā)包,里面存在了大量與并發(fā)編程相關的api,其中最代表意義的就是atomic和lock兩種類別,前者是基于樂觀鎖CAS(Compare And Swap)的實現(xiàn),后者是基于AQS(Abstract Queued Synchronizer)實現(xiàn)。本文將詳細講解下AQS原理以及根據(jù)兩個案例來解讀ReentrantLock源碼。 - 兩個案例:
1.線程A單獨加鎖
2.線程A正在持有鎖的過程中,線程t1來加鎖
一、AQS原理
-
AQS簡稱Abstract Queued Synchronizer,它的核心是基于一個雙向鏈表組成的隊列(CLH隊列) + volatile關鍵字修飾的int類型變量實現(xiàn)的。(關于volatile關鍵字可以參考其他博主的一些總結(jié): 傳送門),大致核心可以以如下圖來呈現(xiàn):
在這里插入圖片描述
簡單總結(jié)就是:內(nèi)部使用雙向鏈表維護了一個隊列,其中Node數(shù)據(jù)結(jié)構(gòu)為此隊列的基石,內(nèi)部維護了prev(指向上一個節(jié)點)、next(指向下一個節(jié)點)、waitStatus(當前node的狀態(tài))、thread(當前維護的線程)四個重要的屬性。其中waitStatus分別有如下取值:Node中waitStatus具體取值 含義 CANCELLED(1) 中斷或取消,此狀態(tài)下的節(jié)點會從隊列中移除 SIGNAL(-1) 此狀態(tài)下的節(jié)點一定是在隊列排隊中 CONDITION(-2) 條件阻塞,比如說內(nèi)部因Condition而阻塞的節(jié)點 PROPAGATE(-3) 表示下一個acquireShared應該無條件傳播 0 默認狀態(tài) 除此之外,隊列中還維護了三個屬性,head(指向隊列中的頭節(jié)點)、state(鎖的狀態(tài))、tail(指向隊列中的尾節(jié)點)。其中,state的取值有兩種情況,將以如下表展示出來:
AQS中state具體取值 含義 0 表示當前鎖沒有被線程持有 1 表示當前鎖正在被線程持有 大于1 表示當前鎖被線程重入了(重入鎖),這里要注意:ReentrantLock重入了幾次,就要釋放幾次鎖
二、案例1:線程A單獨加鎖
-
代碼如下:
public class SimpleThreadLock { static ReentrantLock lock = new ReentrantLock(true); public static void main(String[] args) throws InterruptedException { Thread a = new Thread(() -> { try { lock.lock(); System.out.println("Get lock"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "線程a"); a.start(); a.join(); System.out.println("end"); } }代碼也比較簡單,就是在主線程中創(chuàng)建了一個線程,并且內(nèi)部去使用ReentrantLock加鎖,獲取到鎖后就打印出Get lock這句話,當t1線程執(zhí)行完后再繼續(xù)執(zhí)行主線程的邏輯。這里就不一步步演示斷點了,直接上源碼。
-
這里先說明下ReentrantLock重載的兩個構(gòu)造方法
// 默認非公平鎖 public ReentrantLock() { sync = new NonfairSync(); } // 若傳入true則是公平鎖 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }因為咱們傳入了true進去,所以此時,它是一把公平鎖。
-
lock.lock()方法,因為咱們指定了使用公平鎖,所以最終會進入ReentrantLock內(nèi)部維護的
FairSync類的lock方法// FairSync類下的lock方法 final void lock() { acquire(1); }于是,我們需要找到acquire方法,此方法為AQS(父類AbstractQueuedSynchronizer)的方法,所以最終會進入如下這么一段代碼:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }這段代碼,看似很精簡,但是它做的事真的太多了。
濃縮的才是精華呀!好了,咱們不偏題,繼續(xù)按照咱們的主題:線程A單獨加鎖。不過要繼續(xù)往下看,還是要加深下acquire方法的含義,我們必須要tryAcquire方法返回false,才能繼續(xù)走if條件中后面的邏輯,以及if條件內(nèi)部的邏輯。于是,我們直接看tryAcquire方法源碼: -
tryAcquire方法
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }tryAcquire方法是一個protected方法,內(nèi)部直接拋出了一個異常,還記得咱們是從哪個類掉用到父類AbstractQueuedSynchronizer的acquire方法的?沒錯,就是
FairSync類。那么咱們就直接定位到FairSync類的tryAcquire方法唄。protected final boolean tryAcquire(int acquires) { // 拿到當前線程,也就是線程A final Thread current = Thread.currentThread(); // 拿到當前aqs的state變量,我們沒有修改過它, // 默認為0 int c = getState(); if (c == 0) { // 進入此邏輯,此邏輯跟acquire方法有點類似 // 必須要hasQueuedPredecessors()方法返回false // 才能繼續(xù)往下執(zhí)行,于是我們把hasQueuedPredecessors的源碼也貼出來 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } -
hasQueuedPredecessors方法源碼
public final boolean hasQueuedPredecessors() { // 拿到aqs中的tail Node t = tail; // 拿到aqs中的head Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }此方法涵蓋的情景比較多,但是就當前情景而言,它很容易理解,在當前情形中,我們壓根沒操作過tail和head那么h 肯定等于 t,所以此方法返回false,返回false后,我們回到
FairSync類的tryAcquire方法,protected final boolean tryAcquire(int acquires) { // .... 上半部分代碼省略 if (c == 0) { // 在當前情景下,hasQueuedPredecessors返回的是false // 也就是說會繼續(xù)走if后面的邏輯, // if后面的邏輯就是執(zhí)行CAS操作, // 將state屬性從0設置成1 // 由于此時只有一個線程在執(zhí)行, // 這個cas操作一定是成功的 // cas成功后就會執(zhí)行setExclusiveOwnerThread代碼,這段代碼很有用 // 它是一個賦值的操作,也就是記錄 // 當前擁有鎖的線程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // .... 下半部分else if邏輯也省略了 return false; }通過上述代碼中的注釋,我們可以發(fā)現(xiàn),線程A加鎖成功后會返回true,至此,tryAcquire的返回值為true。還記的我們是從哪個方法進來的嗎?是的,是從父類AbstractQueuedSynchronizer的acquire方法進來的,上面總結(jié)到了,只有當tryAcquire返回false,才會繼續(xù)往下執(zhí)行。至此,線程A單獨加鎖的案例就結(jié)束了。通過這么一個單線程加鎖的案例,如果你認為AQS很簡單的話,那就大錯特錯了,單線程加鎖的案例中,我們僅使用到了AQS中的
state變量,CLH隊列卻始終沒有涉及到,而且從加鎖到加鎖結(jié)束的整個過程,我們連一個Node類型的數(shù)據(jù)結(jié)構(gòu)都沒有看到過。那Node類型的數(shù)據(jù)結(jié)構(gòu)什么時候會被用到呢?我們來看下一個案例線程A正在持有鎖的過程中,線程t1來加鎖
三、案例2:線程A正在持有鎖的過程中,線程t1來加鎖
-
同樣的,咱們改造下代碼:
public class TwoThreadLock { static ReentrantLock lock = new ReentrantLock(true); public static void main(String[] args) throws InterruptedException { new Thread(() -> { try { lock.lock(); System.out.println("Thread a get lock"); TimeUnit.SECONDS.sleep(60); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "線程a").start(); Thread t1 = new Thread(() -> { try { lock.lock(); System.out.println("Thread t1 get lock"); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }, "線程t1"); t1.start(); t1.join(); System.out.println("end"); } }上段代碼,毫無疑問,線程t1在調(diào)用lock.lock()方法時,就阻塞到那里了,要等線程a睡60s后才會繼續(xù)執(zhí)行,那么這里面到底做了哪些事呢?我們來一起研究下。
-
同案例1,使用的是公平鎖,最終肯定會調(diào)用到tryAcquire方法去,咱們這次就一次性的把tryAcquire方法給講清楚
protected final boolean tryAcquire(int acquires) { // 拿到當前線程,也就是線程t1 final Thread current = Thread.currentThread(); // 拿到當前aqs的state變量,此時的c是多少呢? // 沒錯,是1,因為鎖已經(jīng)被線程A占有了,此時的 // state為1。于是它會走else if邏輯 int c = getState(); if (c == 0) { // 進入此邏輯,此邏輯跟acquire方法有點類似 // 必須要hasQueuedPredecessors()方法返回false // 才能繼續(xù)往下執(zhí)行,于是我們把hasQueuedPredecessors的源碼也貼出來 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 走了else if邏輯,它也發(fā)現(xiàn)當前持有鎖的線程不是自己呀,于是直接return false // 這里順帶解釋下這個else if的邏輯,這個else if // 就是判斷當前調(diào)用lock方法的線程是不是和當前持有 // 鎖的線程一樣,如果是一樣的,則將state + 1并賦值給nextc屬性 // 這就表示了ReentrantLock支持重入性 // 那么什么時候會出現(xiàn)nextc屬性小于0的情況呢? // nextc是一個int類型,當超過了它的存儲返回后 // 會出現(xiàn)小于0的情況 ===> 也就是說ReentrantLock // 的重入次數(shù)最大為支持int類型最大值 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }通過上述代碼塊中的注釋可知,線程t1的加鎖流程并沒有這么順利,在tryAcquire方法中返回了false,那這代表了什么呢?是的,它代表著線程t1可以繼續(xù)走acquire后面的邏輯了,咱們繼續(xù)把acquire方法貼出來:
public final void acquire(int arg) { // 在案例2的情況下,tryAcquire方法返回了false // 于是會執(zhí)行后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg) // 當acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回了true才會執(zhí)行內(nèi)部的selfInterrupt()方法 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }于是,咱們先了解下
addWaiter(Node.EXCLUSIVE)方法,它的源碼如下:private Node addWaiter(Node mode) { // 此時的mode是由上述代碼塊傳入的, // 它的值為Node.EXCLUSIVE ===> 這是一個空節(jié)點, // 值為null, // 創(chuàng)建了一個node節(jié)點, 內(nèi)部維護了當前線程(線程t1),并且它的next節(jié)點為null(有Node的構(gòu)造方法可知) Node node = new Node(Thread.currentThread(), mode); // 拿到aqs隊列中的tail屬性, // 此時肯定為null啊(aqs隊列都沒初始化,哪來的隊尾節(jié)點) Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 此時pred為null,即不會走上面的if邏輯,于是執(zhí)行enq方法,記?。捍藭r傳入enq方法時的形參為新new出來的Node // 內(nèi)部維護的是當前線程(線程t1) enq(node); return node; }上面代碼塊的注釋也說了,最終會執(zhí)行到enq方法,enq方干啥的呢?猜一下?是的,它就是初始aqs隊列的。我們來看一下它的源碼:
/** 形參node內(nèi)部維護的線程為t2, 并且它的next屬性指向為null */ private Node enq(final Node node) { // 此處寫了一個死循環(huán),也就是常說的自旋鎖 for (;;) { // 自旋的過程中 // 第一次自旋: // 拿到隊尾元素, 此時隊列都沒有,肯定為null // 發(fā)現(xiàn)隊列中的tail指向的是null,于是初始化tail節(jié)點,并讓aqs中的head指向了tail, // 至此,aqs簡易版本的隊列就出來啦, // head和tail指向同一個node,并且此node內(nèi)部 // 維護的thread、prev、next、waitStatus全是默認值 // 由于是if else邏輯,所以初始化tail屬性后,就會進行第二次自旋 // 第二次自旋: // 再次拿到tail, 由于第一次自旋把tail給初始化了,所以此時拿到的tail不為null, 于是走了else邏輯 // 在else中,主要操作的是形參node, 還記得形參node是什么嗎? ==> 維護當前線程(線程t1)的node節(jié)點, // 此時會將node的上一個節(jié)點指向t節(jié)點 // 同時進行cas操作,將node節(jié)點變成tail // 當cas成功后,再設置t的next指向node // 最終返回這個t. // 此時此刻這個t是什么樣的數(shù)據(jù)結(jié)構(gòu)呢? // 此時的這個t就是隊列中的head節(jié)點了, // 并且它的next為node(維護線程t1) // 所以此時此刻隊列中現(xiàn)在有兩個元素了 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }代碼中的注釋描述了enq的過程,我專門畫了一個圖來描述aqs隊列產(chǎn)生的過程,幫助理解:
在這里插入圖片描述
enq初始化aqs隊列的過程后,就執(zhí)行到了addWaiter方法的出口了
private Node addWaiter(Node mode) { // ....上述代碼省略 // enq初始化隊列后,會將node進行返回 // 這個node就是維護線程t1的node,它已經(jīng)是 // 隊列中的隊列了 enq(node); return node; }addWaiter方法執(zhí)行完了之后,將繼續(xù)執(zhí)行acquire方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }此時應該接著執(zhí)行
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)了,由于addWaiter方法已經(jīng)執(zhí)行完成,返回的是擁有當前線程的node,同時它也是當前隊列中的隊尾。我們來查看下acquireQueued的源碼:/** node形參為維護當前線程(t1)的節(jié)點, 同時arg為1 */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 此處又自旋了 for (;;) { // 獲取到當前節(jié)點的上一個節(jié)點,在 // 當前案例下,它是head節(jié)點 final Node p = node.predecessor(); // 第一次自旋: // 做判斷,發(fā)現(xiàn)上一個節(jié)點是head節(jié)點 // 于是繼續(xù)執(zhí)行加鎖方法tryAcquire // 因為在當前案例下,線程a睡眠了60s // 肯定還是加鎖失敗的,加鎖失敗后, // 則走下面的邏輯,這里就是為了當前 // 節(jié)點繼續(xù)上鎖、因為有可能前面的 // 節(jié)點已經(jīng)釋放鎖了,或者說被park // 的線程被unpark了,要繼續(xù)自旋, // 嘗試獲取鎖 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 判斷當前這個節(jié)點是否需要park // 什么是park?就是使用unsafe類來阻塞指定的線程, // 在shouldParkAfterFailedAcquire方法中 // 傳入的是當前節(jié)點和上一個節(jié)點, // 大致邏輯為: // 1. 判斷當前節(jié)點的上一個節(jié)點(即p)的waitStatus是不是SIGNAL(-1)狀態(tài),如果是則返回true // SIGNAL代表什么呢?上面的表格中有說到 // SIGNAL代表這個Node是處于排隊狀態(tài) // 因此可以得出一個結(jié)論:如果上一個節(jié)點也處于排隊狀態(tài) // 那么我就返回true,進而執(zhí)行parkAndCheckInterrupt方法,parkAndCheckInterrupt方法就是讓park當前線程,讓當前線程進入阻塞狀態(tài),自旋再此暫停 // 2. 如果p節(jié)點的waitStatus為負數(shù),即不是中斷或者取消狀態(tài) // 那么它會將p的waitStatus置為-1.并返回false // 進而進入第二次自旋,當進入第二次自旋時,若上面還未獲取鎖成功,那么當前線程就會被park if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }所以,當線程t2在執(zhí)行到此方法時,發(fā)現(xiàn)head即線程a對應的node的waitStatus為
0,于是會自旋一次將head的waitStatus置為-1,然后再繼續(xù)自旋,此時自己嘗試加鎖又失敗了,此時就會進入park狀態(tài)。所以就在acquireQueued方法處阻塞了,等待線程a釋放鎖后喚醒線程t1。至此案例2的加鎖過程也結(jié)束了
四、總結(jié)
- 本次只是基于兩個簡單的案例來認識ReentrantLock加鎖流程的源碼,其中還有很多其他的case沒有涉及到。這兩種案例算是認識ReentrantLock加鎖源碼的入門吧。下篇博客將介紹下基于這兩種案例的解鎖過程。
- ReentrantLock加鎖流程涉及到每個方法的詳細步驟可查看在github中的總結(jié):傳送門
- I am a slow walker, but I never walk backwards.