Android中的并發(fā)編程第一篇

1 volatile 的工作原理


眾所周知,在如今的計算機時代,CPU的運算處理速度與內存讀寫速度的差異非常巨大,為了解決這種差異充分利用CPU的使用效率,于是在CPU 和內存之間增加 CPU Cache(高速緩存),將運算需要的數據先復制到CPU Cache中,在進行運算時CPU不再和內存打交道,而是直接對CPU Cache 進行讀寫。除此之外還可以減少 CPU 與 I/O 設備爭搶訪存,由于 CPU 和 I/O 設備會競爭同一條內存總線,有可能出現 CPU 等待 I/O 設備訪存的情況。而如果 CPU 能直接從緩存中獲取數據,就可以減少競爭,提高 CPU 的使用率。

1.1 緩存一致性問題

CPU Cache 會引起緩存一致性問題, 在分析緩存一致性問題時,忽略 L1 / L2 / L3 的多級緩存結構,提出緩存一致性抽象模型:



在單核 CPU 中,只需要考慮 Cache 與內存的一致性。但是在多核 CPU 中,由于每個核心都有一份獨占的 Cache,就會存在一個核心修改Cache 數據后,兩個核心 Cache 數據不一致的問題。因此CPU 的緩存一致性問題應該從 2 個維度考慮:

  • Cache 與內存的一致性問題: 在修改 Cache 數據后,如何同步回內存?

  • 多核心 Cache 的一致性問題: 在一個核心修改 Cache 數據后,如何同步給其他核心 Cache?

1.1.1 Cache 與內存的一致性問題

1> 寫直達策略(Write-Through)

寫直達策略的讀取過程:

  • 1、CPU 在訪問內存地址時,會先檢查該地址的數據是否已經加載到 Cache 中;

  • 2、如果數據在 Cache 中,則直接讀取 Cache 上的數據到 CPU 中;

  • 3、如果數據不在 Cache 中:

    • 如果 Cache 已裝滿或者 Cache Line 被占用,先執(zhí)行替換策略,騰出空閑位置;

    • 訪問內存地址,并將內存地址所處的整個Cache LIne 寫入到映射的 Cache Line 中;

    • 讀取 Cache Line上的數據到 CPU 中。

寫直達策略的寫入過程:

  • 1、如果數據不在 Cache 中,則直接將數據寫入內存;

  • 2、如果數據已經加載到 Cache 中,則不僅要將數據寫入 Cache,還要將數據寫入內存。

寫直達的優(yōu)點和缺點:

  • 優(yōu)點: 每次讀取操作就是純粹的讀取,不涉及對內存的寫入操作,讀取速度更快;

  • 缺點: 每次寫入操作都需要同時寫入 Cache 和寫入內存,在寫入操作上失去了 CPU 高速緩存的價值,需要花費更多時間。


2> 寫回策略(Write Back)

既然寫直達策略在每次寫入操作都會寫內存,那么有沒有什么辦法可以減少寫回內存的次數呢?這就是寫回策略:

  • 1、寫回策略會在每個 Cache Line上增加一個 “臟(Dirty)” 標記位 ,當一個 Cache Line 被標記為臟時,說明它的數據與內存數據是不一致的;

  • 2、在寫入操作時,我們只需要修改 Cache Line 并將其標記為臟,而不需要寫入內存;

  • 3、那么,什么時候才將臟數據寫回內存呢?—— 就發(fā)生在 Cache 塊被替換出去的時候:

    • 在寫入操作中,如果目標內存塊不在 Cache 中,需要先將內存塊數據讀取到 Cache 中。如果替換策略換出的舊 Cache Line 是臟的,就會觸發(fā)一次寫回內存操作;

    • 在讀取操作中,如果目標內存塊不在 Cache 中,且替換策略換出的舊 Cache 塊是臟的,就會觸發(fā)一次寫回內存操作;

可以看到,寫回策略只有當一個 Cache 數據將被替換出去時判斷數據的狀態(tài),清(未修改過,數據與內存一致) 的 Cache 塊不需要寫回內存, 的 Cache 塊才需要寫回內存。這個策略能夠減少寫回內存的次數,性能會比寫直達更高。

當然,寫回策略在讀取的時候,有可能不是純粹的讀取了,因為還可能會觸發(fā)一次臟 Cache Line 的寫入。


通過寫直達或寫回策略,我們已經能夠解決 在修改 Cache 數據后,如何同步回內存 的問題

1.1.2 多核心 Cache 的一致性問題

在單核 CPU 中,我們通過寫直達策略或寫回策略保持了Cache 與內存的一致性。但是在多核 CPU 中,由于每個核心都有一份獨占的 Cache,就會存在一個核心修改數據后,兩個核心 Cache 不一致的問題。

舉個例子:

  • 1、Core 1 和 Core 2 讀取了同一個內存塊的數據,在兩個 Core 都緩存了一份內存塊的副本。此時Cache 和內存塊是一致的;

  • 2、Core 1 執(zhí)行內存寫入操作:

    • 在寫直達策略中,新數據會直接寫回內存,此時,Cache 和內存塊一致。但由于之前 Core 2 已經讀過這塊數據,所以 Core 2 緩存的數據還是舊的。此時,Core 1 和 Core 2 不一致;

    • 在寫回策略中,新數據會延遲寫回內存,此時 Cache 和內存塊不一致。不管 Core 2 之前有沒有讀過這塊數據,Core 2 的數據都是舊的。此時,Core 1 和 Core 2 不一致。

  • 3、由于 Core 2 無法感知到 Core 1 的寫入操作,如果繼續(xù)使用過時的數據,就會出現邏輯問題。



    可以看到:由于兩個核心的工作是獨立的,在一個核心上的修改行為不會被其它核心感知到,所以不管 CPU 使用寫直達策略還是寫回策略,都會出現緩存不一致問題。 所以,我們需要一種機制,將多個核心的工作聯(lián)合起來,共同保證多個核心下的 Cache 一致性,這就是緩存一致性機制。

1.1.3 寫傳播 & 事務串行化

緩存一致性機制必須解決的如下兩個問題:

  • 寫傳播(Write Propagation): 一個CPU核心對Cache中的值進行了修改,需要傳播其他CPU核心,也就是需要用到寫更新或者寫無效策略。

    當某個 CPU 核心的Cache中執(zhí)行寫操作時,其他 CPU 核心中對應的Cache Line 的更新策略也有兩種

    • 寫更新(Write Update):一個CPU核心對Cache中的值進行修改時,該 CPU 核心都必須先發(fā)起一次總線請求,通知其他 CPU 核心將它們的CPU Cache 值更新為剛寫入的值,所以寫更新會很占用總線帶寬。

    • 寫無效(Write Invalidate) : 一個CPU核心對Cache中的值進行修改時,該 CPU 核心都必須先發(fā)起一次總線請求,通知其他 CPU 核心將它們的CPU Cache 設置為無效。MESI協(xié)議用的就是這個策略,也是絕大多數 CPU 都會采用緩存一致性協(xié)議。這是因為多次寫操作只需要發(fā)起一次總線事件即可,第一次寫已經將其他緩存的值置為無效,之后的寫不必再更新狀態(tài),這樣可以有效地節(jié)省 CPU 核間總線帶寬。

  • 事務串行化(Transaction Serialization): 各個 CPU 核心所有寫入操作的順序,在所有 CPU 核心看起來是一致。

寫傳播解決了 感知 問題,如果一個核心修改了數據,就需要同步給其它核心。但只做到同步還不夠,如果各個核心收到的同步信號順序不一致,那最終的同步結果也會不一致。
舉個例子:假如 CPU 有 4 個核心,Core 2 將共享數據修改為 1000,隨后 Core 1 將共享數據修改為 2000。在寫傳播下,“修改為 1000” 和 “修改為 2000” 兩個事務會同步到 Core 3 和 Core 4。但是如果沒有事務串行化,不同核心收到的事務順序可能是不同的,最終數據還是不一致。

1.1.4 總線嗅探 & 總線仲裁

寫傳播和事務串行化在 CPU 中是如何實現的呢?

  • 寫傳播 - 總線嗅探(bus snooping): 本質上就是進行Cache讀寫操作時發(fā)送到總線請求,然后讓各個核心去嗅探這些請求,再根據本地的情況進行響應;

  • 事務串行化 - 總線仲裁: 總線的獨占性要求同一時刻最多只有一個模塊占用總線,天然地會將所有核心對內存的讀寫操作串行化。如果多個核心同時發(fā)起總線事務,此時總線仲裁單元會對競爭做出仲裁,未獲勝的事務只能等待獲勝的事務處理完成后才能執(zhí)行。

基于總線嗅探和總線仲裁,現代 CPU 逐漸形成了各種緩存一致性協(xié)議,例如 MESI 協(xié)議。

MESI 協(xié)議使用的是寫回策略和寫無效策略,MESI 協(xié)議通過Cache Line 的四種狀態(tài)非常有效地降低了 CPU 核間帶寬

1.1.5 MESI 協(xié)議


CPU Cache 是由很多個 Cache Line 組成的,CPU Line 是 CPU 從內存讀取數據的基本單位,Cache Line 大小通常是64字節(jié)。

CPU對Cache的請求:

  1. PrRd: CPU核心請求一個 Cache Line

  2. PrWr: CPU核心請求一個 Cache Line

總線對Cache的請求:

  1. BusRd: 其他CPU核心請求一個 Cache Line

  2. BusRdX: 其他CPU核心請求一個該CPU核心不擁有的緩存塊

  3. BusUpgr: 其他CPU核心請求一個該CPU核心擁有的緩存塊

  4. Flush: 請求回寫整個Cache Line到內存

  5. FlushOpt: 整個Cache Line被發(fā)到總線以發(fā)送給另外一個CPU核心(緩存到緩存的復制)

初始狀態(tài) 操作 響應
Shared(S) PrRd - 無總線事務生成 - 狀態(tài)保持不變 - 讀操作為緩存命中
Shared(S) PrWr - 發(fā)出總線事務BusUpgr信號 - 狀態(tài)轉換為(M)Modified - 其他緩存看到BusUpgr總線信號,標記其副本為(I)Invalid.
Modified(M) PrRd - 無總線事務生成 - 狀態(tài)保持不變 - 讀操作為緩存命中
Modified(M) PrWr - 無總線事務生成 - 狀態(tài)保持不變 - 寫操作為緩存命中
Invalid(I) PrRd - 發(fā)出總線事務BusRd信號 - 其他CPU核心看到BusRd,檢查自己是否有有效的數據副本,通知發(fā)出請求的緩存 - 狀態(tài)轉換為(S)Shared, 如果其他緩存有有效的副本 - 狀態(tài)轉換為(E)Exclusive, 如果其他緩存都沒有有效的副本 - 如果其他緩存有有效的副本, 其中一個緩存發(fā)出數據(FlushOpt);否則從主存獲得數據
Invalid(I) PrWr - 發(fā)出總線事務BusRdX信號 - 狀態(tài)轉換為(M)Modified - 如果其他緩存有有效的副本, 其中一個緩存發(fā)出數據(FlushOpt);否則從主存獲得數據 - 如果其他緩存有有效的副本, 見到BusRdX信號后無效其副本 - 向Cache Line中寫入修改后的值
Exclusive(E) PrRd - 無總線事務生成 - 狀態(tài)保持不變 - 讀操作為緩存命中
Exclusive(E) PrWr - 無總線事務生成 - 狀態(tài)轉換為(M)Modified - 向Cache Line中寫入修改后的值

緩存一致性協(xié)議定義了Cache Line的4個狀態(tài):獨占(exclusive)、共享(share)、修改(modified)、失效(invalid)。

  • M(Modified,修改): 表明 Cache Line 被修改過,與內存中不一致,只有本地一個拷貝(專有),并且其它核心的同一個 Cache Line 會失效;

  • E(Exclusive,獨占): 表明 Cache LIne 只有本地一個拷貝(專有);

  • S(Share,共享): 表明 Cache Line 不僅有本地一個拷貝并且其它核心也存在其拷貝;

  • I(Invalid,失效): 未從內存加載數據或者已失效;

獨占共享 狀態(tài)下,Cache Line的數據是干凈的,任何讀取操作可以直接使用該Cache Line的數據;

失效修改 狀態(tài)下,Cache Line 的數據是臟的,其數據和內存中的可能不一致,在讀取或寫入 失效 Cache Line 時,需要先將其它核心 修改 的Cache Line寫回內存,再從內存讀取;

共享失效 狀態(tài),核心沒有獲得 Cache Line 的獨占權(鎖)。在修改數據時不能直接修改,而是要先向總線發(fā)起 RFO(Request For Ownership)請求 ,將其它核心的 Cache 置為 失效,等到獲得回應 ACK 后才算獲得 Cache Line 的獨占權。

這個獨占權這有點類似于開發(fā)語言層面的鎖概念,在修改資源之前,需要先獲取資源的鎖;

修改獨占 狀態(tài)下,核心已經獲得了 Cache Line 的獨占權(鎖)。在修改數據時不需要向總線發(fā)送RFO 請求,能夠減輕總線的通信壓力。

MESI 協(xié)議有一個非常 nice 的在線體驗網站,你可以對照文章內容,在網站上操作指令區(qū),并觀察內存和緩存的數據和狀態(tài)變化。網站地址:(VivioJS MESI)

1.1.6 寫緩沖區(qū) & 失效隊列

MESI 協(xié)議保證了 CPU Cache 的一致性,但完全地遵循協(xié)議會影響性能。 因此,現代的 CPU 會增加寫緩沖區(qū)和失效隊列將 MESI 協(xié)議的請求異步化,以提高效率:

  • 寫緩沖區(qū)(Store Buffer)

由于在寫入操作之前,CPU Core 需要先發(fā)起 RFO 請求獲得獨占權,在其它 CPU Core 回應 ACK 之前,當前 CPU Core 只能空等待,這對 CPU 資源是一種浪費。因此現代 CPU 會采用 寫緩沖區(qū) 機制,寫入的數據放到寫緩沖區(qū)后并發(fā)送 RFO 請求后,CPU 就可以去執(zhí)行其它任務,等收到 ACK 后再將寫入數據寫到 Cache 上。

當前CPU核心如果要讀Cache Line中的數據,需要先掃描Store Buffer之后再讀取Cache Line(Store-Buffer Forwarding)。

  • 失效隊列(Invalidation Queue)

Store Buffer 容量是有限的,當Store Buffer滿了之后CPU核心還是要卡住等待ACK。所以其他核心在收到 RFO 請求時,需要及時回應 ACK。如果核心很忙不能及時回復,就會造成發(fā)送 RFO 請求的核心在等待 ACK。因此現代 CPU 會采用 失效隊列 機制,先把其它核心發(fā)過來的 RFO 請求放到失效隊列,然后直接返回 ACK,等當前核心處理完任務后再去處理失效隊列中的失效請求。

因此核心可能并不知道在它Cache里的某個Cache Line是Invalid狀態(tài)的,因為失效隊列包含有收到但還沒有處理的Invalidation消息。CPU在讀取數據的時候,并不像Store buffer那樣讀取Invalidate queue。

1.2 內存屏障

CPU 已經實現了 MESI 協(xié)議,已經在硬件層面實現了寫傳播和事務串行化,為什么 Java 語言層面還需要定義 volatile 關鍵字呢?豈不是多此一舉?

MESI 協(xié)議解決了緩存一致性,一致性有強弱之分:

  • 強一致性: 保證在任意時刻任意副本上的同一份數據都是相同的,或者允許不同,但是每次使用前都要刷新確保數據一致,所以最終還是一致。

  • 弱一致性: 不保證在任意時刻任意副本上的同一份數據都是相同的,也不要求使用前刷新,但是隨著時間的遷移,不同副本上的同一份數據總是向趨同的方向變化,最終還是趨向一致。

例如,MESI 協(xié)議就是強一致性的,但引入寫緩沖區(qū)或失效隊列后就變成弱一致性,隨著寫緩沖區(qū)和失效隊列被消費,各個核心 Cache 最終還是會趨向一致狀態(tài)。

引入寫緩沖區(qū)或失效隊列后會導致內存一致性的問題(出現內存重排 Memory Reordering),舉個例子:初始狀態(tài)變量 a 和變量 b 都是 0,現在 Core1 和 Core2 分別執(zhí)行這兩段指令,最終 x 和 y 的結果是什么?

Core1

a = 1; // A1
x = b; // A2


Core2

b = 2; // B1
y = a; // B2

寫緩存區(qū)造成內存重排:


可以看到:從內存的視角看,直到 Core1 執(zhí)行 A3 來刷新寫緩沖區(qū),寫操作 A1 才算真正執(zhí)行了。雖然 Core 的執(zhí)行順序是 A1 → A2 → B1 → B2,但內存看到的順序卻是 A2 → B1 → B2 → A1,變量 a 寫入沒有同步給對變量 a 的讀取,即發(fā)生了內存重排。內存重排是硬件上無法解決的問題,這個時候就必須加入內存屏障來解決這個問題。

內存屏障 (Memory Barrier)分為寫屏障(Store Barrier)、讀屏障(Load Barrier)和全屏障(Full Barrier),寫屏障會阻塞等待Store Buffer中的數據同步刷到Cache后再執(zhí)行屏障后面的讀寫操作;讀屏障會阻塞Invalid Queue中的消息理完成后再執(zhí)行屏障后面的讀寫操作。

JVM 并不直接顯露內存屏障。(Memory barriers are not directly exposed by the JVM. )反之,為了保證語言級的并發(fā)原語語義,它們被 JVM 插入到指令序列中。(Instead they are inserted into the instruction sequence by the JVM in order to uphold the semantics of language level concurrency primitives. )

還是以上面的例子說明下:

Core1

// volatile 類型
a = 1; // A1
// 插入寫內存屏障
// 插入讀內存屏障
x = b; // A2

Core2

// volatile 類型
b = 2; // B1
// 插入寫內存屏障
// 插入讀內存屏障
y = a; // B2

初始狀態(tài)變量 a 和變量 b 都是 0 并且設置為volatile類型,現在 Core1 和 Core2 分別執(zhí)行這兩段指令

由于在 A1和B1之后插入一個寫內存屏障,所以在執(zhí)行A2和B2之前會將a=1和b=2刷到Cache

由于在 A2前插入一個讀內存屏障,所以在執(zhí)行A2之前會將b=1同步到Core1的Cache

由于在 B2前插入一個讀內存屏障,所以在執(zhí)行B2之前會將a=1同步到Core2的Cache

內存屏障解決了內存重排的問題,除此意外內存屏障可以阻止屏障兩側的指令重排序(即編譯器重排)

需要注意的是,內存屏障并不是Java源代碼中的一部分,它們是在編譯到機器指令時由Java內存模型隱式插入的,寫Java代碼時是看不到的。

x86架構CPU提供了比較強的緩存一致性支持,但有的ARM構的CPU指供的緩存一致性支持就很弱,大部分Android手機采用的CPU是ARM架構,所以需要在正確的位置插入內存屏障來保證一致性。

對于app開發(fā)者來說理解到這里就可以了,下面從源碼角度分析,有興趣的同學可以自己看看Hotspot虛擬機arm架構下的模板解釋器中的putfield和getfield方法。

2 synchronized 的工作原理

首先舉個例子:

public class UnSafeTest {
    @SneakyThrows
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        CustomService customService = new CustomService();
        for (int i = 0; i < 10; i++) {
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    customService.add();
                }
                countDownLatch.countDown();
            }).start();
        }
        // 等待其他線程執(zhí)行完畢
        countDownLatch.await();
        System.out.println("num:" + customService.getNum());
    }

    static class CustomService {
        private int num = 0;

        public void add() {
            num++;
        }

        public int getNum() {
            return num;
        }
    }
}

上述代碼啟動了10個線程,每個線程使num累加100次,期望結果是1000,但打印通常小于1000,這是一個典型的線程安全問題。 我們在多線程環(huán)境下調用了CustomService.add(),因而導致線程不安全,那么為什么會有線程安全問題呢?我們來看關鍵代碼:

public void add() {
    num++;
}

num++ 實際上并不是原子操作,這需要看其對應的字節(jié)碼信息(可以使用javap -v xx.class進行查看):

  public void add();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=1, args_size=1
         0: aload_0
         1: dup
         2: getfield      #2                  // Field num:I
         5: iconst_1
         6: iadd
         7: putfield      #2                  // Field num:I
        10: return

可以發(fā)現num++實際上由3個指令組成getfield、iadd、putfield組成。
getfield:獲取變量值
iadd:執(zhí)行+1
putfield:設置變量值
既然num++實際上由3步組成,那么在多線程環(huán)境中就無法保證其執(zhí)行過程的原子性,通過下圖可以更加清晰的說明這一點:

上面這種情況是原子性問題導致線程不安全,synchronized是一個同步鎖,在同一時刻被修飾的方法或代碼塊只有一個線程能執(zhí)行,以保證線程安全。很多人都稱之為重量級鎖(也叫做悲觀鎖),但是隨著JDK1.6對synchronized進行了鎖升級的優(yōu)化后,在線程競爭不激烈的情況下性能還是不錯的,接下來看看synchronized怎么解決線程安全問題:

public synchronized void add() {
    num++;
}

或者

public void add() {
    synchronized (CustomService.class) {
        num++;
    }
}

接下來看一下字節(jié)碼

public synchronized void add();
    descriptor: ()V
    flags: ACC_PUBLIC, ACC_SYNCHRONIZED
    Code:
      stack=3, locals=1, args_size=1
         0: aload_0
         1: dup
         2: getield      #2                  // Field num:I
         5: iconst_1
         6: iadd
         7: putfield      #2                  // Field num:I
        10: return

或者

  public void add();
    descriptor: ()V
    flags: ACC_PUBLIC
    Code:
      stack=3, locals=3, args_size=1
         0: ldc           #3                  // class com/cytmxk/trainproject/UnSafeTest$CustomService
         2: dup
         3: astore_1
         4: monitorenter
         5: aload_0
         6: dup
         7: getfield      #2                  // Field num:I
        10: iconst_1
        11: iadd
        12: putfield      #2                  // Field num:I
        15: aload_1
        16: monitorexit
        17: goto          25
        20: astore_2
        21: aload_1
        22: monitorexit
        23: aload_2
        24: athrow
        25: return

synchronized修飾方法時,會在訪問標識符(flags)中加入ACC_SYNCHRONIZED標識
方法級同步是隱式執(zhí)行的。當調用這些方法時,如果發(fā)現會ACC_SYNCHRONIZED標識,則會進入一個monitor,執(zhí)行方法,然后退出monitor。這時如果其他線程來請求執(zhí)行方法,會因為無法獲得監(jiān)視器鎖而被阻斷住。
無論方法調用正常還是發(fā)生異常,都會自動退出monitor,也就是釋放鎖。

synchronized修飾代碼塊時,會增加monitorenter和monitorexit指令
每個對象都與一個監(jiān)視器monitor關聯(lián),執(zhí)行monitorenter指令的線程嘗試獲取鎖對象關聯(lián)的監(jiān)視器monitor的所有權,此時其他線程將阻塞等待,直到執(zhí)行monitorexit退出monitor時,
其他線程才有機會來獲取監(jiān)視器monitor的所有權。
synchronized保證了有且僅有一個線程獲取執(zhí)行權(即保證了原子性)。

synchronized也是基于MESI協(xié)議和內存屏障實現的緩存一致性
在monitorenter指令之后有一個Load內存屏障將CPU Cache更新到最新值
在monitorexit指令之后會有一個Store內存屏障將Store Buffer更新到CPU Cache

3 通過ReentrantLock解決線程安全問題

3.1 LockSupport 的工作原理

AQS使用LockSupport來控制線程的阻塞和喚醒,我們更加熟悉的阻塞喚醒操作是wait/notify方式,它是以Object的角度來設計,而LockSupport提供的park/unpark則是以線程的角度來設計。LockSupport主要提供兩類操作:

1 park 操作提供了4個方法
/**
 * 許可證可用則消耗許可證并且調用立即返回;否則當前線程會進入WAITING或者TIMED_WAITING狀態(tài),直到以下三種情況之一發(fā)生:
 * 1> 其他線程以當前線程為目標調用 unpark
 * 2> 其他線程中斷當前線程
 * 3> 調用錯誤地(沒有原因地)返回
 * 
 * 此方法不報告導致方法返回的原因,調用方應首先重新檢查導致線程停止的條件,調用方還可以在返回時確定線程的中斷狀態(tài)。
 */
public static void park() {
    U.park(false, 0L);
}
/**
 * 同上
 */
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    U.park(false, 0L);
    setBlocker(t, null);
}
/**
 * 許可證可用則消耗許可證并且調用立即返回;否則當前線程會進入WAITING或者TIMED_WAITING狀態(tài),直到以下三種情況之一發(fā)生:
 * 1> 其他線程以當前線程為目標調用 unpark
 * 2> 其他線程中斷當前線程
*  3> 指定的等待時間已到
 * 4> 調用錯誤地(沒有原因地)返回
 * 
 * 此方法不報告導致方法返回的原因,調用方應首先重新檢查導致線程停止的條件,調用方還可以在返回時確定線程的中斷狀態(tài)或者調用花費的時間。
 */
public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        U.park(false, nanos);
        setBlocker(t, null);
    }
}
/**
 * 許可證可用則消耗許可證并且調用立即返回;否則當前線程會進入WAITING或者TIMED_WAITING狀態(tài),直到以下三種情況之一發(fā)生:
 * 1> 其他線程以當前線程為目標調用 unpark
 * 2> 其他線程中斷當前線程
*  3> 到達指定的時間線
 * 4> 調用錯誤地(沒有原因地)返回
 * 
 * 此方法不報告導致方法返回的原因,調用方應首先重新檢查導致線程停止的條件,調用方還可以在返回時確定線程的中斷狀態(tài)或者調用返回時的時間。
 */
public static void parkUntil(Object blocker, long deadline) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    U.park(true, deadline);
    setBlocker(t, null);
}
上面三種形式的park都有blocker參數,此參數在線程進入WAITING或者TIMED_WAITING狀態(tài)時被記錄,以幫助監(jiān)視工具和診斷工具確定線程受阻塞的原因(通過getBlocker方法獲取blocker)。
看下線程dump的結果來理解blocker的作用。

2 unpark 操作
/**
 * 使給定線程的許可證可用(如果尚未可用),如果線程在 park上被阻塞那么它將解除阻塞,否則保證下一次調用park不會被阻塞。如果指定的線 
 * 程尚未啟動,則不能保證此操作有任何效果。
 */
public static void unpark(Thread thread) {
    if (thread != null)
        U.unpark(thread);
}

歸根結底,LockSupport調用的是Unsafa的native代碼:

// park方法
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
  ...
  // 調用parker的park方法
  thread->parker()->park(isAbsolute != 0, time);
  ...
UNSAFE_END

// unpark方法
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
  ...
  if (p != NULL) {
    HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
    // 調用parker的unpark方法
    p->unpark();
  }
UNSAFE_END

每個JAVA線程都有一個Parker成員變量

  // JSR166 per-thread parker
private:
  Parker*    _parker;
public:
  Parker*     parker() { return _parker; }

park函數使當前線程進入WAITING或者TIMED_WAITING狀態(tài),而其unpark函數則是將指定線程喚醒,看一下Parker的park和unpark方法:

class Parker : public os::PlatformParker {
private:
  volatile int _counter ;
  ...
public:
  Parker() : PlatformParker() {
    _counter       = 0 ; //初始化為0
    ...
  }
...
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();
  ...
};

os::PlatformParker是對底層的抽象,根據不同的操作系統(tǒng)有不同的實現,在Linux系統(tǒng)下,是用Posix線程庫pthreads中的mutex(互斥量)和condition(條件變量)實現的,mutex和condition保護了一個counter的變量,當park時counter被設置為0,當unpark時_counter被設置為1。接下來看一下Linux系統(tǒng)中的實現:

class PlatformParker : public CHeapObj<mtInternal> {
  protected:
    enum {
        REL_INDEX = 0,
        ABS_INDEX = 1
    };
    int _cur_index;  // which cond is in use: -1, 0, 1
    pthread_mutex_t _mutex [1] ; // pthread互斥鎖
    pthread_cond_t  _cond  [2] ; // pthread條件變量數組,一個用于相對時間,一個用于絕對時間

  public:       // TODO-FIXME: make dtor private
    ~PlatformParker() { guarantee (0, "invariant") ; }

  public:
    PlatformParker() {
      int status;
      status = pthread_cond_init (&_cond[REL_INDEX], os::Linux::condAttr());
      assert_status(status == 0, status, "cond_init rel");
      status = pthread_cond_init (&_cond[ABS_INDEX], NULL);
      assert_status(status == 0, status, "cond_init abs");
      status = pthread_mutex_init (_mutex, NULL);
      assert_status(status == 0, status, "mutex_init");
      _cur_index = -1; // mark as unused
    }
};

下面是park和unpark方法的具體實現

void Parker::park(bool isAbsolute, jlong time) {
  // Ideally we'd do something useful while spinning, such
  // as calling unpackTime().

  // Optional fast-path check:
  // Return immediately if a permit is available.
  // We depend on Atomic::xchg() having full barrier semantics
  // since we are doing a lock-free update to _counter.
  if (Atomic::xchg(0, &_counter) > 0) return; // 使用 xchgl 指令將_counter的值設置為0,如果_counter原來的值大于0則返回

  Thread* thread = Thread::current();
  assert(thread->is_Java_thread(), "Must be JavaThread");
  JavaThread *jt = (JavaThread *)thread;

  // Optional optimization -- avoid state transitions if there's an interrupt pending.
  // Check interrupt before trying to wait
  if (Thread::is_interrupted(thread, false)) { // 如果線程處于中斷狀態(tài),直接返回
    return;
  }

  // Next, demultiplex/decode time arguments
  timespec absTime;
 // 如果time小于0,或者isAbsolute是true并且time等于0則直接返回
  if (time < 0 || (isAbsolute && time == 0) ) { // don't wait at all
    return;
  }
  if (time > 0) {
    unpackTime(&absTime, isAbsolute, time);
  }


  // Enter safepoint region
  // Beware of deadlocks such as 6317397.
  // The per-thread Parker:: mutex is a classic leaf-lock.
  // In particular a thread must never block on the Threads_lock while
  // holding the Parker:: mutex.  If safepoints are pending both the
  // the ThreadBlockInVM() CTOR and DTOR may grab Threads_lock.
  ThreadBlockInVM tbivm(jt); // 構造當前線程的 ThreadBlockInVM

  // 如果當前線程設置了中斷標志,或者獲取mutex互斥鎖失敗則直接返回
  // 由于Parker是每個線程都有的,所以_counter cond mutex都是每個線程都有的,不是所有線程共享的,所以加鎖失敗只有兩種情況,
  // 1> unpark已經加鎖這時只需要返回即可,對應unpark先調用的情況
  // 2> 調用pthread_mutex_trylock出錯
  if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) { // 嘗試獲取鎖,失敗則返回
    return;
  }

  int status ;
  // 如果當前線程持有許可(即_counter大于0),說明之前已經調用unpark方法將_counter置為了1
  if (_counter > 0)  {
    _counter = 0; // 將許可消耗掉
    status = pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
    // Paranoia to ensure our locked and lock-free paths interact
    // correctly with each other and Java-level accesses.
    OrderAccess::fence();
    return;
  }

#ifdef ASSERT
  // Don't catch signals while blocked; let the running threads have the signals.
  // (This allows a debugger to break into the running thread.)
  sigset_t oldsigs;
  sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();
  pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif

  OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
  jt->set_suspend_equivalent();
  // cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()

  assert(_cur_index == -1, "invariant");
  // 如果time等于0,說明是相對時間也就是isAbsolute是fasle(否則前面就直接返回了)
  if (time == 0) {
    _cur_index = REL_INDEX; // arbitrary choice when not timed
    // 當前線程進入WAITING狀態(tài)
    status = pthread_cond_wait (&_cond[_cur_index], _mutex) ;
  } else {
    // 判斷isAbsolute是false還是true,false的話使用_cond[0],否則用_cond[1]
    _cur_index = isAbsolute ? ABS_INDEX : REL_INDEX;
    // 使用條件變量當前線程進入TIMED_WAITING狀態(tài)
    status = os::Linux::safe_cond_timedwait (&_cond[_cur_index], _mutex, &absTime) ;
    if (status != 0 && WorkAroundNPTLTimedWaitHang) {
      pthread_cond_destroy (&_cond[_cur_index]) ;
      pthread_cond_init    (&_cond[_cur_index], isAbsolute ? NULL : os::Linux::condAttr());
    }
  }

  // 如果當前線程被喚醒則繼續(xù)向下執(zhí)行
  _cur_index = -1;
  assert_status(status == 0 || status == EINTR ||
                status == ETIME || status == ETIMEDOUT,
                status, "cond_timedwait");

#ifdef ASSERT
  pthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif

  _counter = 0 ; // 返回后 _counter 狀態(tài)位重置
  status = pthread_mutex_unlock(_mutex) ;
  assert_status(status == 0, status, "invariant") ;
  // 使用內存屏障使_counter對其它線程可見
  OrderAccess::fence();

  // If externally suspended while waiting, re-suspend
  if (jt->handle_special_suspend_equivalent_condition()) {
    jt->java_suspend_self();
  }
}

void Parker::unpark() {
  int s, status ;
  status = pthread_mutex_lock(_mutex);
  assert (status == 0, "invariant") ;
  s = _counter;
  _counter = 1;
  if (s < 1) {
    // 說明當前parker對應的線程掛起了,因為_cur_index初始是-1,并且等待條件變量的線程被喚醒
    // 后也會將_cur_index重置-1
    if (_cur_index != -1) {
      // 如果設置了WorkAroundNPTLTimedWaitHang先調用signal再調用unlock,在hotspot在Linux下默認使用這種方式
      // 即先調用signal再調用unlock
      if (WorkAroundNPTLTimedWaitHang) {
        status = pthread_cond_signal (&_cond[_cur_index]);
        assert (status == 0, "invariant");
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
      } else {
        // must capture correct index before unlocking
        int index = _cur_index;
        status = pthread_mutex_unlock(_mutex);
        assert (status == 0, "invariant");
        status = pthread_cond_signal (&_cond[index]);
        assert (status == 0, "invariant");
      }
    } else { // 如果_cur_index == -1說明線程沒在等待條件變量,則直接解鎖
      pthread_mutex_unlock(_mutex);
      assert (status == 0, "invariant") ;
    }
  } else { // 如果_counter == 1,說明線程調用了一次或多次unpark但是沒調用park,則直接解鎖
    pthread_mutex_unlock(_mutex);
    assert (status == 0, "invariant") ;
  }
}

Atomic::xchg 源碼

inline jint     Atomic::xchg    (jint     exchange_value, volatile jint*     dest) {
  __asm__ volatile (  "xchgl (%2),%0"
                    : "=r" (exchange_value)
                    : "0" (exchange_value), "r" (dest)
                    : "memory");
  return exchange_value;
}

__asm__ 代表后面的xchgl是匯編指令,xchgl是一個原子操作,交換exchange_value和dest的值,并且將exchange_value的值返回。

首先說明一下上面用到的三個方法:

int pthread_mutex_lock(pthread_mutex_t* mutex);    // 阻塞式的加鎖
int pthread_mutex_trylock(pthread_mutex_t* mutex); // 非阻塞式的加鎖,加不上鎖就返回
int pthread_mutex_unlock(pthread_mutex_t* mutex);  // 解鎖
// 以上函數成功返回0;失敗返回錯誤碼

接下來總結一下park方法的邏輯:
1> 使用 xchgl 指令將counter修改為 0 返回,如果counter原來的值大于0則返回,即有許可直接消費掉
2> pthread_mutex_trylock 嘗試獲取鎖,失敗則返回非0的錯誤碼(其他線程unpark該線程時持有mutex互斥量或者pthread_mutex_trylock調用出錯) 3> 如果持有許可(即counter >0成立),則消費掉許可(即執(zhí)行counter = 0),釋放鎖(執(zhí)行pthread_mutex_unlock)然后返回
4> 如果 3 不成立,根據時間的不同執(zhí)行不同的等待函數,如果等待正確返回,則消費掉許可(即執(zhí)行
counter = 0),釋放鎖(執(zhí)行pthread_mutex_unlock)然后返回

unpark的邏輯為:
1> pthread_mutex_lock 獲取鎖,可能會阻塞線程
2> 將參數線程設置為持有許可(即_counter 設置為 1)
3> 判斷 _counter 的舊值:
小于 1 時,調用 pthread_cond_signal 喚醒在 park 阻塞的線程;
等于 1 時,直接返回

3.2 CAS 的工作原理

說到CAS,第一個想到的就是原子類型,其實4.1中的問題就可以通過AtomicInteger解決:

static class CustomService {
    private final AtomicInteger num = new AtomicInteger();

    public void add() {
        num.getAndIncrement();
    }

    public int getNum() {
        return num.get();
    }
}

為什么上面的代碼可以解決線程安全的問題呢?先看下AtomicInteger的部分源碼:

// Unsafe 對象,可以直接根據內存地址操作數據,可以突破JVM的現在直接操作內存,所以是不安全的
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
// 存儲value屬性在AtomicInteger類實例內部的偏移地址
private static final long VALUE;

static {
    try {
        // 在類初始化的時候就獲取就獲取到了value屬性在對象內部的偏移地址
        VALUE = U.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }
}
// 存儲實際的值
private volatile int value;
......
/**
 * Atomically increments by one the current value.
 *
 * @return the previous value
 */
public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

1> 首先內部持有一個Unsafe對象,原子類底層的操作都是基于Unsafe進行的。
2> volatile int value 是AtomicInteger實例的實際數值,volatile保證并發(fā)中的可見性和有序性。
3> VALUE 是 value屬性在AtomicInteger對象內部的偏移地址,通過Unsafe類是可以在內存級別給變量賦值的,要操作AtomicInteger實例的數據,首先要知道AtomicInteger實例的內存地址,其次是要知道value屬性在對象內部的偏移量VALUE,接著直接給這塊內存賦值就行了:


4> AtomicInteger的getAndIncrement()方法是基于Unsafe的getAndAddInt的,Unsafe的getAndAddInt方法源碼

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!compareAndSwapInt(o, offset, v, v + delta));
    return v;
}

1> o + offset 得到value屬性在內存中的地址,然后根據地址從內存中獲取value的值,保存在v中(即期望值)。
2> compareAndSwapInt : v的值跟當前內存的值(o + offsetSet 地址對應的值)進行對比,如果相等則將內存的值修改為 v + delta(即CAS操作成功),如果值不相等,則進入下一次循環(huán),
   直到CAS操作成功為止。
3> CAS操作是怎么保證原子性的呢(比較和修改這是兩個動作,如果比較的時候是一樣的,修改的時候被別的線程修改了)?

接下來hotspot中compareAndSwapInt的實現

UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

Atomic::cmpxchg 源碼

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}

__asm_ "cmpxchg面的xchgl是匯編指令,LOCK_IF_MP代表如果設計多核心則添加lock,使cmpxchgl指令原子性

CAS保證了加1操作的原子性,volatile確保了可見性、有序性,所以可以解決上面的問題


3.3 ReentrantLock的工作原理

LockSupport 底層使用互斥量mutex和condition,互斥量mutex和condition是內核提供的能力,所以性能消耗是非常大的。

首先通過下圖整體看一下ReentrantLock的類結構


首先ReentrantLock繼承自父類Lock,然后有3個內部類,其中Sync內部類繼承自AQS,另外的兩個內部類繼承自Sync,這兩個類分別是用來實現公平鎖和非公平鎖的,首先看一下非公平鎖的流程圖:

接下來就是對應的源碼:

// ReentrantLock.java

/** Synchronizer providing all implementation mechanics */
private final Sync sync;

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

public void lock() {
    sync.lock();
}

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        // 通過CAS的方式先判斷state == 0是否成立,如果成立則將當前state設置為1
        if (compareAndSetState(0, 1))
            // state成功設置為1,說明當前線程獲取到資源,則將當前線程保存下來
            setExclusiveOwnerThread(Thread.currentThread());
        else
            // 失敗的話將當前線程放到等待隊列
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    abstract void lock();

    // 非公平的嘗試獲取鎖
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // c == 0 表示沒有線程占有鎖
        if (c == 0) {
            // 通過CAS的方式先判斷state == 0是否成立,如果成立則將當前state設置為1
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果當前線程已經占有鎖,則state加1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
    ...
}
// AbstractQueuedSynchronizer.java

private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long HEAD;
private static final long TAIL;

static {
    try {
        STATE = U.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        HEAD = U.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        TAIL = U.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
    } catch (ReflectiveOperationException e) {
        throw new Error(e);
    }

    // Reduce the risk of rare disastrous classloading in first call to
    // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
    Class<?> ensureLoaded = LockSupport.class;
}

/**
 * The synchronization state.
 */
private volatile int state;

protected final boolean compareAndSetState(int expect, int update) {
    return U.compareAndSwapInt(this, STATE, expect, update);
}

// 請求鎖
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

Sync通過acquirerelease方法獲取個釋放鎖,所以 ReentrantLock實現的是AQS的獨占模式,也就是獨占鎖

上面說明了非公平鎖的獲取和釋放流程,接下來看一下公平鎖:

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        // 將當前線程放到等待隊列
        acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        // c == 0 表示沒有線程占有鎖
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                // 等待隊列中不存在有效節(jié)點并且CAS成功的情況下當前線程獲取到鎖
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果當前線程已經獲取到鎖,則state加1
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

釋放鎖的流程和非公平鎖一樣,不一樣的地方體現在獲取鎖的流程。

接下來看一下AQS中核心等待隊列的管理:

3.3.1 線程加入隊列的時機

當執(zhí)行Acquire(1)時,會通過tryAcquire獲取鎖。在這種情況下,如果獲取鎖失敗,就會調用addWaiter加入到等待隊列中去。

獲取鎖失敗后,會執(zhí)行 addWaiter(Node.EXCLUSIVE) 加入等待隊列,具體實現方法如下:

private Node addWaiter(Node mode) {
    // 創(chuàng)建于當前線程關聯(lián)的Node節(jié)點(即將thread字段設置為當前線程)
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        if (oldTail != null) {
            // 將node的prev設置為oldTail
            U.putObject(node, Node.PREV, oldTail);
            // 使用CAS的方式判斷oldTail地址和tail的地址是否相同,成立的話將tail設置為node
            if (compareAndSetTail(oldTail, node)) {
                // 最終完成將Node插入隊尾
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

private final void initializeSyncQueue() {
    Node h;
    // 創(chuàng)建第一個節(jié)點(虛節(jié)點,占位用),head和tail同時指向該節(jié)點
    if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
        tail = h;
}

回到公平鎖的代碼,hasQueuedPredecessors是公平鎖加鎖時判斷等待隊列中是否存在有效節(jié)點的方法。如果返回False,說明當前線程可以爭取共享資源;如果返回True,說明隊列中存在有效節(jié)點,當前線程必須加入到等待隊列中。

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // h != t 成立代表等待對列中存在等待的節(jié)點
    // (s = h.next) == null 成立代表其他線程正在添加到等待隊列,進行到了tail已經指向新Node,但是Head還沒有指向新Node,此時隊列中有元素,需要返回true
    // s.thread != Thread.currentThread() 成立代表等待隊列的第一個有效節(jié)點線程與當前線程不同,所以當前線程不可以獲取鎖,當前線程必須加入進等待隊列
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

回到最初的代碼:

// 請求鎖
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上文解釋了addWaiter方法,這個方法其實就是把對應的線程以Node的數據結構形式加入到雙端隊列里,返回的是一個包含該線程的Node。而這個Node會作為參數,進入到acquireQueued方法中。acquireQueued會把放入隊列中的線程不斷去獲取鎖,直到獲取成功或者被park掛起:

final boolean acquireQueued(final Node node, int arg) {
    try {
        // 標記等待過程中是否中斷過
        boolean interrupted = false;
        // 開始自旋,要么獲取鎖,要么中斷
        for (;;) {
            // 獲取當前節(jié)點的前驅節(jié)點
            final Node p = node.predecessor();
            // 如果p是首節(jié)點,說明當前節(jié)點在真實數據隊列的頭部,就嘗試獲取鎖(別忘了頭結點是虛節(jié)點)
            if (p == head && tryAcquire(arg)) {
                // 獲取鎖成功,head指針移動到當前node,node節(jié)點變成虛節(jié)點
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            // 說明p為首節(jié)點且當前沒有獲取到鎖(可能是鎖被其他線程占了)或者是p不是首節(jié)點,這個時候就要判斷當前node是否要被阻塞(被阻塞條件:前驅節(jié)點的waitStatus為SIGNAL),
            // 防止無限循環(huán)浪費資源。具體兩個方法下面細細分析
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

// 將指定的節(jié)點設置為首節(jié)點(即虛節(jié)點)
private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

// 靠前驅節(jié)點判斷當前線程是否應該被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅節(jié)點的節(jié)點狀態(tài)
    int ws = pred.waitStatus;
    // 說明前驅節(jié)點處于SIGNAL狀態(tài)(在獨占鎖機制中,waitStatus 只會使用到 CANCELLED 和 SIGNAL 兩個狀態(tài))
    if (ws == Node.SIGNAL)
        return true;
    // 通過枚舉值我們知道waitStatus>0是取消狀態(tài)
    if (ws > 0) {
        do {
            // 循環(huán)向前查找取消節(jié)點,把取消節(jié)點從隊列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 設置前驅節(jié)點等待狀態(tài)為SIGNAL
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt主要用于掛起當前線程,阻塞調用棧,返回當前線程的中斷狀態(tài)。

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

上述方法的流程圖如下:


從上圖可以看出,跳出當前循環(huán)的條件是當前置節(jié)點是頭結點,且當前線程獲取鎖成功。為了防止因死循環(huán)導致CPU資源被浪費,我們會判斷前置節(jié)點的狀態(tài)來決定是否要將當前線程掛起,具體掛起流程用流程圖表示如下(shouldParkAfterFailedAcquire流程):

3.3.2 等待隊列中出隊時機

// ReentrantLock.java

public void unlock() {
    sync.release(1);
}

abstract static class Sync extends AbstractQueuedSynchronizer {
    ...
    // 嘗試釋放鎖
    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        // 如果當前線程沒有占有鎖則拋出異常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // c == 0 代表當前線程要釋放鎖
        if (c == 0) {
            free = true;
            // 清空占有鎖的線程對象
            setExclusiveOwnerThread(null);
        }
        // 更新state的值
        setState(c);
        return free;
    }
}
// AbstractQueuedSynchronizer.java

// 釋放鎖
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 解鎖隊列總的第一個有效節(jié)點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    // 獲取當前節(jié)點的waitStatus
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容