Java并發(fā)源碼剖析(一)——AbstractQueuedSynchronizer獨(dú)占模式

圖片來(lái)源網(wǎng)絡(luò).png

作為Java核心內(nèi)容之一【并發(fā)】,該部分的源碼基本在java.util.concurrent這個(gè)包下面。本文的內(nèi)容的源碼版本是jdk1.8_11。

作為AQS的前言,可以看看我之前的文章:《Java同步框架AQS原文分析》。

1、認(rèn)識(shí)AQS

使用過(guò)Java中的鎖對(duì)象,一定會(huì)對(duì)一個(gè)鎖很熟悉—ReentrantLock。這是一個(gè)可重入的鎖。大部分情況是作為一些情況替換synchronized這個(gè)關(guān)鍵字的方案。synchronized這個(gè)關(guān)鍵字是Java內(nèi)部實(shí)現(xiàn)同步機(jī)制的,那么ReentrantLock實(shí)現(xiàn)的方式是什么?

它背后的大佬就是大名鼎鼎的AQS(AbstractQueuedSynchronizer)

AQS——簡(jiǎn)單來(lái)說(shuō),就是提供一個(gè)實(shí)現(xiàn)阻塞式鎖和相關(guān)同步器的框架。它的內(nèi)部是依賴一個(gè)FIFO阻塞隊(duì)列實(shí)現(xiàn)以上功能的。

1.1 、AQS的兩種模式

AQS中有兩種獲取和釋放資源的模式,兩個(gè)方式都很容易理解。

  • 獨(dú)占式:只允許同時(shí)一個(gè)線程獲取同一個(gè)許可??梢詫?duì)應(yīng)為一些鎖ReetrantLock
  • 共享式:允許同時(shí)多個(gè)線程獲取同一個(gè)許可??梢詫?duì)應(yīng)一些同步器類(lèi)似Semaphore

Exclusive vs Shared 模式

在此隊(duì)列中,通過(guò)一個(gè)非常簡(jiǎn)單的方式區(qū)分兩種模式。就是以下兩個(gè)靜態(tài)常量。

    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

1.2、AQS的子類(lèi)

除了ReentrantLock之外還有以下的類(lèi)使用到了AQS。這個(gè)版本是jdk1.8

基于AQS的相關(guān)類(lèi).png

2 、AQS內(nèi)部結(jié)構(gòu)

AQS內(nèi)部的結(jié)構(gòu)不算太復(fù)雜,這里先將其中重點(diǎn)的幾個(gè)字段列出來(lái)

public abstract class AbstractQueuedSynchronizer{
    private volatile int state;
    private volatile Node head;    // 隊(duì)列的頭節(jié)點(diǎn)
    private volatile Node tail;    // 隊(duì)列的尾節(jié)點(diǎn)
    ...
}

state字段代表的同步的狀態(tài),它有什么用?我們暫時(shí)留個(gè)疑問(wèn),只需要知道它是用來(lái)決定同步的一個(gè)狀態(tài)。后面分析流程時(shí),再來(lái)分析這個(gè)字段。

這里我們先重點(diǎn)分析AQS的內(nèi)部類(lèi)Node

2.1、鎖隊(duì)列節(jié)點(diǎn)Node

AQS的鎖隊(duì)列就是依靠此內(nèi)部類(lèi)實(shí)現(xiàn)的。它是一個(gè)雙向隊(duì)列,內(nèi)部存儲(chǔ)了線程的信息。

static final class Node {
    volatile Node prev;        // 前驅(qū)節(jié)點(diǎn)
    volatile Node next;        // 后繼節(jié)點(diǎn)
    volatile Thread thread;    // 節(jié)點(diǎn)的線程信息
    volatile int waitStatus;   // 線程的等待狀態(tài)
    Node nextWaiter;           // 用于條件隊(duì)列使用
    ...
}

前三個(gè)字段其實(shí)很好理解,既然是雙向隊(duì)列,那就會(huì)有前驅(qū)和后繼節(jié)點(diǎn),而線程信息也是需要保存的。waitStatus這個(gè)字段是什么作用的?

waitStatus

首先我們看看這個(gè)狀態(tài)的值有什么意義。它在內(nèi)部中定義了waitStatus只能有以下四個(gè)值。

  • CANCELLED:值為1。代表此節(jié)點(diǎn)中的線程因?yàn)槌瑫r(shí)或者中斷而被取消了。這種線程不會(huì)被再次阻塞。
  • SIGNAL:值為-1。代表此節(jié)點(diǎn)的后繼結(jié)點(diǎn)馬上要或者已經(jīng)阻塞了。所以,當(dāng)前節(jié)點(diǎn)必須在它釋放或者取消的時(shí)候,取消它后繼結(jié)點(diǎn)的阻塞狀態(tài)。為了避免出現(xiàn)競(jìng)爭(zhēng),獲取許可的時(shí)候先要表明需要一個(gè)信號(hào)signal。然后開(kāi)始原子式的獲取,失敗后就阻塞。
  • CONDITION:值為-2。隊(duì)列在Condition Queue中,等待某一個(gè)條件。
  • PROPAGATE:值為-3。代表后繼節(jié)點(diǎn)會(huì)傳播喚醒的操作,此值在共享模式下啟用。

對(duì)于一個(gè)線程來(lái)說(shuō),判斷它能否獲得許可,就需要根據(jù)waitStatus的值。

2.2、條件隊(duì)列ConditionObject

除了鎖隊(duì)列之外,AQS還有一個(gè)條件隊(duì)列,本文暫不分析。

3 、獨(dú)占式AQS解析

介紹上述內(nèi)容后,依然對(duì)上面提到的變量還有模式很模糊,后面我們開(kāi)始分析具體的AQS流程,在流程中,我們將逐一講解上述的內(nèi)容的實(shí)際意義。

3.1、acquire操作

在AQS中的入口即是acquire。AQS 獲取獨(dú)占式的許可,需要調(diào)用acquire方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

這里方法傳進(jìn)來(lái)了一個(gè)arg參數(shù),arg的意義AQS不做具體定義和處理。簡(jiǎn)單來(lái)說(shuō),它是可以跟AQS 中的state字段進(jìn)行聯(lián)合使用的,用來(lái)判斷現(xiàn)在是否能完成同步操作,而怎么使用state這個(gè)字段應(yīng)該是交給子類(lèi)去實(shí)現(xiàn)的,

3.1.1、tryAcquire

tryAcquire方法是AQS交給子類(lèi)處理state與傳入的arg的方法,子類(lèi)需要判斷是否允許當(dāng)前線程在獨(dú)占模式下進(jìn)行acquire。

tryAcquire是一個(gè)空方法,具體實(shí)現(xiàn)方式在子類(lèi)中。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

這里我們以ReentrantLock內(nèi)的公平鎖作為例子,分析具體情況下,子類(lèi)的tryAcquire應(yīng)該做什么工作。

protected final boolean tryAcquire(int acquires) {
    ...
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        ...
        setState(nextc);
        return true;
    }
    return false;
}

這里我們不展開(kāi)分析方法的具體意義。在ReentrantLock的公平鎖中,state代表的是可重入的次數(shù),簡(jiǎn)單來(lái)說(shuō)就是多少對(duì)lock和unlock方法。

如果state等于0代表沒(méi)有使用過(guò)lock方法,那就會(huì)進(jìn)行兩個(gè)條件判斷。

  1. AQS隊(duì)列中是不是沒(méi)有其他還在等待的線程;
  2. CAS更新State字段成功與否。
    如果上面兩個(gè)條件都是true,那么就認(rèn)為當(dāng)前線程可以持有這個(gè)鎖。

如果state不等于0,這就代表此次調(diào)用的lock方法不是第一對(duì)。那么只要當(dāng)前線程與存儲(chǔ)的當(dāng)前持有鎖的線程為同一個(gè)線程,就會(huì)state+1。表示當(dāng)前l(fā)ock的對(duì)數(shù)為2對(duì)。以此來(lái)實(shí)現(xiàn)可重入的鎖!


3.1.2、addWaiter

在tryAcquire獲取許可權(quán)失敗后,addWaiter就是用來(lái)將當(dāng)前失敗的線程,添加進(jìn)等待隊(duì)列。傳入的參數(shù)也就是AQS的模式,在1.1小節(jié)中提到的final兩個(gè)字段。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;

    // 當(dāng)?shù)却?duì)列尾部不為空時(shí),有在等待的線程
    if (pred != null) {
        // 將新增的節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)鏈接到尾部節(jié)點(diǎn)。
        node.prev = pred;
        // 采用CAS的方式進(jìn)行數(shù)據(jù)的更新,更新成功后,則將原尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)鏈接到新增的節(jié)點(diǎn)
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果隊(duì)列為空或者當(dāng)前的尾節(jié)點(diǎn)已經(jīng)變化了,則直接向隊(duì)列中添加這個(gè)新的節(jié)點(diǎn)
    enq(node);
    return node;
}

addWaiter的大致流程在代碼中注釋中已經(jīng)說(shuō)明了,需要解釋的一點(diǎn)是為什么采用CAS更新并連接線程節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)。Doug Lea在AQS的文章中提到過(guò),目前并沒(méi)有很好的原子方式去更新雙向隊(duì)列。所以采用了上述的擇中方案。

Q1:為什么采用CAS更新前驅(qū)節(jié)點(diǎn)保證其原子性,而對(duì)后繼節(jié)點(diǎn)采用普通的賦值方式?

AQS采用的CLH鎖的變體.
CLH鎖中每個(gè)節(jié)點(diǎn)都回去對(duì)自身的前驅(qū)節(jié)點(diǎn)判斷是否可以訪問(wèn)然后進(jìn)行自旋,而AQS將自旋改為了LockSupport的park阻塞過(guò)程(這個(gè)過(guò)程在后文會(huì)解釋?zhuān)?/em>)。因此需要保證前驅(qū)節(jié)點(diǎn)一定是原子性,在不斷的訪問(wèn)前驅(qū)節(jié)點(diǎn)過(guò)程中一定是最新的。這樣即使next域不是最新的,也并不影響在隊(duì)列中阻塞的過(guò)程,因?yàn)樗麄兌际窃L問(wèn)前驅(qū)節(jié)點(diǎn)。


然后,我們重點(diǎn)關(guān)注的是enq過(guò)程。

// 將新的節(jié)點(diǎn)插入到等待隊(duì)列的尾部。如果等待隊(duì)列不存在,一定要對(duì)其初始化
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { 
            // 隊(duì)列為空情況下,CAS操作將頭節(jié)點(diǎn)進(jìn)行更新,將頭節(jié)點(diǎn)設(shè)置為空節(jié)點(diǎn)。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 尾節(jié)點(diǎn)存在的情況下,代表AQS等待隊(duì)列已經(jīng)初始化完成。
            // 與addWaiter一樣,先將新增節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)鏈接到尾節(jié)點(diǎn)
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                // 當(dāng)更新尾節(jié)點(diǎn)成功后,就將原尾節(jié)點(diǎn)的后繼節(jié)點(diǎn)連接為新增的節(jié)點(diǎn)
                t.next = node;
                return t;
            }
        }
    }
}

AQS中對(duì)鎖隊(duì)列的初始化過(guò)程并不是在類(lèi)建立的時(shí)候初始化,而是第一次有線程開(kāi)始獲取同步資源時(shí)。它初始化時(shí)將頭節(jié)點(diǎn)設(shè)定為一個(gè)空的Node節(jié)點(diǎn),新增的節(jié)點(diǎn)都添加在頭節(jié)點(diǎn)后的后繼節(jié)點(diǎn)中。

Q2:為什么要初始化成一個(gè)空節(jié)點(diǎn)作為頭節(jié)點(diǎn)?

這個(gè)與后面文章的內(nèi)容acquireQueue方法中入隊(duì)操作有關(guān)。Q1問(wèn)題提到了AQS借用了CLH的思想,也就是不斷對(duì)前驅(qū)節(jié)點(diǎn)的狀態(tài)進(jìn)行訪問(wèn)。那么在判斷是否可以獲取資源時(shí),也需要再次訪問(wèn)前驅(qū)節(jié)點(diǎn)。如果初始化的時(shí)候不將頭節(jié)點(diǎn)賦值為空節(jié)點(diǎn),那么前驅(qū)節(jié)點(diǎn)為null,這個(gè)與不是初始化階段的隊(duì)列情況不同,勢(shì)必會(huì)復(fù)雜后面的流程。這也是AQS的簡(jiǎn)單優(yōu)化之一。

AQS的初始化結(jié)構(gòu).png
新增線程節(jié)點(diǎn)的AQS結(jié)構(gòu)圖.png

3.1.3、acquireQueued

在將新的線程節(jié)點(diǎn)入隊(duì)后,這個(gè)方法將會(huì)執(zhí)行阻塞,并根據(jù)當(dāng)前線程信息更新隊(duì)列。它采用一個(gè)循環(huán)去處理隊(duì)列中的信息,只有隊(duì)列中獲取節(jié)點(diǎn)成功才能從循環(huán)中彈出。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 獲取節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
            final Node p = node.predecessor();
            // 該節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),就嘗試去獲取一下許可
            // 當(dāng)獲取許可成功后,就可以將頭節(jié)點(diǎn)設(shè)置為當(dāng)前的節(jié)點(diǎn)
            // 并將后繼節(jié)點(diǎn)置空,重置狀態(tài)
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 當(dāng)獲取許可失敗后,會(huì)判斷需不需要對(duì)線程進(jìn)行阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                  parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 如果執(zhí)行過(guò)程中出現(xiàn)了了錯(cuò)誤,或者中斷后,就會(huì)對(duì)數(shù)據(jù)進(jìn)行處理,完成銷(xiāo)毀的過(guò)程。
        if (failed)
            cancelAcquire(node);
    }
}

這里的核心在于兩個(gè)if語(yǔ)句的內(nèi)容:

  1. 第一個(gè)if語(yǔ)句主要實(shí)現(xiàn)對(duì)前驅(qū)節(jié)點(diǎn)的判定,只有是頭節(jié)點(diǎn)才會(huì)再次嘗試能否拿到資源的許可訪問(wèn)權(quán)。當(dāng)拿到了許可權(quán)后,也就是持有獨(dú)占權(quán),那么就會(huì)將頭節(jié)點(diǎn)更新為當(dāng)前的線程節(jié)點(diǎn)。返回的狀態(tài)為interrupted狀態(tài)。
  2. 第二個(gè)if語(yǔ)句是前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn)或者在許可獲取失敗后,對(duì)線程節(jié)點(diǎn)的狀態(tài)進(jìn)行更新,并對(duì)合適的線程進(jìn)行阻塞操作。

3.1.4、shouldParkAfterFailedAcquire

在線程不是頭節(jié)點(diǎn)或者許可獲取失敗后,會(huì)調(diào)用此方法。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 只有SIGNAL狀態(tài)才能阻塞線程
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            // 向隊(duì)列的前驅(qū)節(jié)點(diǎn)找,一直到找到一個(gè)狀態(tài)碼小于0的,也就是非CANCEL狀態(tài)的
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
            // 把找到節(jié)點(diǎn)后繼節(jié)點(diǎn)鏈接到當(dāng)前節(jié)點(diǎn)
            pred.next = node;
    } else {
        // 如果前驅(qū)節(jié)點(diǎn)的狀態(tài)是小于0的,CAS方式將前驅(qū)節(jié)點(diǎn)的狀態(tài)更新為SIGNAL。同時(shí)不對(duì)線程阻塞
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

這個(gè)函數(shù)是整個(gè)循環(huán)中處理信號(hào)狀態(tài)的唯一方法。信號(hào)是什么?也就是我們?cè)贜ode章節(jié)提到的waitStatus狀態(tài)。

waitStatus狀態(tài)初始化的值是0。如代碼中描述的,waitStatus只有前驅(qū)節(jié)點(diǎn)是SIGNAL狀態(tài)才會(huì)將當(dāng)前的線程阻塞住。其他情況下,waitStatus為非正數(shù)時(shí),就會(huì)將前驅(qū)節(jié)點(diǎn)的狀態(tài)更新為SIGNAL。而狀態(tài)為正數(shù)時(shí),代表節(jié)點(diǎn)被取消了,就會(huì)不斷向前尋找,直到找到未被取消的節(jié)點(diǎn)。取消的節(jié)點(diǎn)通通會(huì)被垃圾回收機(jī)制處理掉。

【舉個(gè)栗子!就是一個(gè)栗子】這樣理解起來(lái)可能還是比較困難,我們可以試想一個(gè)情形,兩個(gè)線程依次爭(zhēng)奪獨(dú)占權(quán),線程A先獲得了獨(dú)占權(quán),獲得后的AQS等待隊(duì)列的狀態(tài)是如下圖所示

線程A獲得了獨(dú)占權(quán)后的等待隊(duì)列結(jié)構(gòu).png

在線程A釋放獨(dú)占權(quán)之前,線程B也發(fā)起了獨(dú)占權(quán)的爭(zhēng)奪。于是,按照之前的過(guò)程分析,在獲取占有權(quán)失敗后,會(huì)調(diào)用addWaiter方法,線程B的節(jié)點(diǎn)會(huì)被添加到等待隊(duì)列中。添加后的結(jié)構(gòu)就如下圖所示

addWaiter后的等待隊(duì)列結(jié)構(gòu).png

由于線程A一直沒(méi)釋放占有權(quán),那么線程B就會(huì)進(jìn)入到shouldParkAfterFailedAcquire方法內(nèi),而線程B的前驅(qū)節(jié)點(diǎn)就是線程A,而線程A節(jié)點(diǎn)的waitStatus是0,那就將它的狀態(tài)更新為SIGNAL。它會(huì)返回到acquireQueue的死循環(huán)中再次進(jìn)行占有權(quán)的獲取,當(dāng)然如果再次沒(méi)有獲取成功,也會(huì)進(jìn)入此方法,此次它的前驅(qū)節(jié)點(diǎn)線程A的waitStatus已經(jīng)更新為SIGNAL。返回true。

shouldParkAfterFailedAcquire后的AQS等待隊(duì)列結(jié)構(gòu).png
Q3:為什么需要SIGNAL信號(hào)才阻塞線程呢?

其實(shí)只是為了在線程park阻塞和解除阻塞的次數(shù)不那么頻繁,減少了不必要的線程阻塞切換時(shí)間。SIGNAL設(shè)置后,只有確定SIGNAL信號(hào)的線程才會(huì)阻塞。

3.1.5、parkAndCheckInterrupt

這個(gè)方法很簡(jiǎn)單,就是使用LockSupport.park對(duì)線程進(jìn)行了阻塞,返回線程是否被中斷的狀態(tài)。LockSupport的細(xì)節(jié)不做解釋?zhuān)?jiǎn)單來(lái)說(shuō),它主要是用來(lái)解決Thread.suspend這種方法會(huì)產(chǎn)生死鎖的問(wèn)題,同時(shí)能夠?qū)€程進(jìn)行阻塞,核心是由native方法進(jìn)行處理。有想深究的可以自行Google哈。

3.2、release操作

與acquire相對(duì)的方法也就是釋放占有權(quán)的release方法。

3.2.1、release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

相比之下,release方法涉及的步驟相對(duì)來(lái)說(shuō)簡(jiǎn)單一些。與acquire方法類(lèi)似,release也有一個(gè)tryRelease方法,是子類(lèi)去實(shí)現(xiàn)具體功能。

具體步驟:

  1. 調(diào)用子類(lèi)的tryRelease方法。
  2. 成功返回后,判斷頭節(jié)點(diǎn)是否存在和頭節(jié)點(diǎn)的狀態(tài)
  3. 若頭節(jié)點(diǎn)存在,且頭節(jié)點(diǎn)的waitStatus狀態(tài)是非0,也就說(shuō)明AQS不是初始化階段
  4. 將后繼節(jié)點(diǎn)的線程喚醒

這里重點(diǎn)注意一下】:waitStatus狀態(tài)是0,代表這個(gè)節(jié)點(diǎn)的狀態(tài)還未更新,因此AQS沒(méi)有將后繼節(jié)點(diǎn)進(jìn)行阻塞,這個(gè)時(shí)候就不能去做unpark的操作。

3.2.2、unparkSuccessor

在成功獲取到子類(lèi)的release認(rèn)可后, 會(huì)進(jìn)行喚醒過(guò)程,對(duì)該節(jié)點(diǎn)的后繼節(jié)點(diǎn)進(jìn)行喚醒。
unparkSuccessor方法不僅僅要處理release的解除阻塞操作,同時(shí)需要對(duì)cancel的進(jìn)行區(qū)分。核心內(nèi)容在于對(duì)Cancel狀態(tài)的處理。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    // 如果當(dāng)前節(jié)點(diǎn)的狀態(tài)小于0,將狀態(tài)更新為0,重置一下
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 后繼節(jié)點(diǎn)不存在或者后繼節(jié)點(diǎn)的waitStatus大于0(也就是被取消的狀態(tài))
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        // 置后繼節(jié)點(diǎn)為null,
        // 若當(dāng)前節(jié)點(diǎn)發(fā)生了取消操作,重新從尾部開(kāi)始遍歷,找到?jīng)]有標(biāo)記為取消的節(jié)點(diǎn)進(jìn)行阻塞操作
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

3.3、cancelAcquire

在整個(gè)acquire的過(guò)程中,如果發(fā)生超時(shí),線程發(fā)起了中斷操作,那么就會(huì)發(fā)起cancel方法,將線程的信息情況。具體見(jiàn)代碼

private void cancelAcquire(Node node) {
    ...
    node.thread = null;

    // 跳過(guò)所有取消了的前驅(qū)節(jié)點(diǎn)
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    Node predNext = pred.next;
    // 不采用 CAS方式進(jìn)行寫(xiě)操作
    node.waitStatus = Node.CANCELLED;

    // 如果此節(jié)點(diǎn)已經(jīng)是隊(duì)尾,就將自己CAS刪除。
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // 如果后繼節(jié)點(diǎn)需要信號(hào)SIGNAL,就自己前驅(qū)結(jié)點(diǎn)重新鏈接。
        // 否則將線程喚醒,進(jìn)行propagate過(guò)程
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }
        node.next = node; 
    }
}

cancelAcquire的方法這里進(jìn)行了一些不同情況下的取消操作。

  1. 取消的節(jié)點(diǎn)是隊(duì)尾節(jié)點(diǎn)tail。那么直接將pred的next和當(dāng)前節(jié)點(diǎn)的prev全部置null。重新設(shè)置一下tail節(jié)點(diǎn)。
  2. 取消的節(jié)點(diǎn)是隊(duì)列中的一個(gè)節(jié)點(diǎn),并且不是head頭節(jié)點(diǎn)的直接后繼節(jié)點(diǎn)。那么也很容易理解,就是將pred的next和當(dāng)前節(jié)點(diǎn)的next進(jìn)行一個(gè)鏈接。那么有一個(gè)疑問(wèn),代碼中只將next節(jié)點(diǎn)鏈接了,prev的節(jié)點(diǎn)如何連接。這里的prev是由其它線程做的,由于當(dāng)前節(jié)點(diǎn)已經(jīng)是CANCEL狀態(tài),其他線程在進(jìn)行競(jìng)爭(zhēng)時(shí),都會(huì)遍歷前驅(qū)節(jié)點(diǎn),重新鏈接前驅(qū)節(jié)點(diǎn)完成prev的鏈接。
  3. 取消的節(jié)點(diǎn)是隊(duì)列頭節(jié)點(diǎn)head的直接后繼節(jié)點(diǎn)。那么如果取消的節(jié)點(diǎn)后繼節(jié)點(diǎn)是存在的,可以直接喚醒后繼節(jié)點(diǎn)。

小結(jié)

本文只是分析了獨(dú)占式模式,第二篇將分析共享式的AQS的情況。

參考文章

  1. JAVA并發(fā)編程學(xué)習(xí)筆記之CLH隊(duì)列鎖
  2. A Hierarchical CLH Queue Lock(CLH原理原文)

如有問(wèn)題,歡迎批評(píng)指正!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容