java鎖(4)隊(duì)列同步器AQS詳解

1、AQS簡介

AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer)是用來構(gòu)建鎖或者其他同步組件(信號量、事件等)的基礎(chǔ)框架類。JDK中許多并發(fā)工具類的內(nèi)部實(shí)現(xiàn)都依賴于AQS,如ReentrantLock, Semaphore, CountDownLatch等等。

AQS的主要使用方式是繼承它作為一個內(nèi)部輔助類實(shí)現(xiàn)同步原語,它可以簡化你的并發(fā)工具的內(nèi)部實(shí)現(xiàn),屏蔽同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作。

在基于AQS構(gòu)建的同步器類中,最基本的操作包括各種形式的獲取操作和釋放操作。獲取操作是一種依賴狀態(tài)的操作,并且通常會阻塞。當(dāng)使用鎖或信號量時,“獲取”操作的含義就很直觀,即獲取的是鎖或者許可,并且調(diào)用者可能會一直等待直到同步器類處于可被獲取的狀態(tài)。如果一個類想成為狀態(tài)依賴的類,那么它必須擁有一些狀態(tài)。AQS負(fù)責(zé)管理同步器類中的狀態(tài),它管理了一個整數(shù)狀態(tài)信息,可以通過getstate,setState以及compareAndSetState等 protected類型方法來進(jìn)行操作。這個整數(shù)可以用于表示任意狀態(tài)。

AQS主要做三件事:

  • 同步狀態(tài)的管理
  • 線程的阻塞和喚醒
  • 同步隊(duì)列的維護(hù)

狀態(tài)管理API:

  • int getState(): 獲取同步狀態(tài)
  • void setState(): 設(shè)置同步狀態(tài)
  • boolean compareAndSetState(int expect, int update):基于CAS,原子設(shè)置狀態(tài)

AQS模板方法:

方法 描述
void acquire(int arg) 獨(dú)占式獲取同步狀態(tài),如果當(dāng)前線程獲取同步狀態(tài)成功,則由該方法返回,否則,將會進(jìn)入同步隊(duì)列等待,該方法將會調(diào)用重寫的tryAcquire(intarg)方法
void acquireInterruptibly(int arg) 與acquire(int arg)相同,但是該方法響應(yīng)中斷,當(dāng)前線程未獲取到同步狀態(tài)而進(jìn)入同步隊(duì)列中,如果當(dāng)前線程被中斷,則該方法會拋出InterruptedException并返回
boolean tryAcquireNanos(int arg,long nanos) 在acquireInterruptibly(int arg)基礎(chǔ)上增加了超時限制,如果當(dāng)前線程在超時時間內(nèi)沒有獲取到同步狀態(tài),那么就會返回false,如果獲取到就返回true
void acquireShared(int arg) 共享式的獲取同步狀態(tài),如果當(dāng)前線程未獲取到同步狀態(tài),將會進(jìn)入同步隊(duì)列等待,與獨(dú)占式獲取的主要區(qū)別是在同一時刻可以有多個線程獲取到同步狀態(tài)。
void acquireSharedInterruptibly(int arg) 與acquireShared(int arg)相同,該方法響應(yīng)中斷。
boolean tryAcquireSharedNanos(int arg,long nanos) 在acquireSharedInterruptibly(int arg)基礎(chǔ)上增加了超時限制。
boolean release(int arg) 獨(dú)占式的釋放同步狀態(tài),該方法會在釋放同步狀態(tài)之后,將同步隊(duì)列中第一個節(jié)點(diǎn)包含的線程喚醒。
boolean releaseShared(int arg) 共享式的釋放同步狀態(tài)
Collection getQueuedThreads() 獲取等待在同步隊(duì)列上的線程集合

同步器可重寫的方法:

方法 描述
boolean tryAcquire(int arg) 獨(dú)占獲取同步狀態(tài),實(shí)現(xiàn)該方法需要查詢當(dāng)前狀態(tài),并判斷同步狀態(tài)是否符合預(yù)期狀態(tài),然后再進(jìn)行CAS設(shè)置同步狀態(tài)。
boolean tryRelease(int arg) 獨(dú)占式釋放同步狀態(tài),等待獲取同步狀態(tài)的線程將有機(jī)會獲取同步狀態(tài)
int tryAcquireShared(int arg) 共享式獲取同步狀態(tài),返回大于等于0的值,表示獲取成功,反之失敗
boolean tryReleaseShared(int arg) 共享式釋放同步狀態(tài)
boolean isHeldExclusively() 當(dāng)前同步器是否在獨(dú)占模式下被線程占用,一般該方法表示是否被當(dāng)前線程所獨(dú)占

AQS對外暴露的方法:

暴露的方法.png

2、AQS數(shù)據(jù)結(jié)構(gòu)

因?yàn)楂@取鎖是有條件的,沒有獲取鎖的線程就要阻塞等待,那么就要存儲這些等待的線程。在AQS中使用CLH隊(duì)列儲存這些等待的線程,但它并不是直接儲存線程,而是儲存擁有線程的node節(jié)點(diǎn)。

2.1、Node數(shù)據(jù)結(jié)構(gòu):

static final class Node {
    //共享模式的標(biāo)記,標(biāo)識一個節(jié)點(diǎn)在共享模式下等待
    static final Node SHARED = new Node();
    //獨(dú)占模式的標(biāo)記,標(biāo)識一個節(jié)點(diǎn)在獨(dú)占模式下等待
    static final Node EXCLUSIVE = null;

    // waitStatus變量的值,標(biāo)志著線程被取消,后續(xù)將不會獲取到鎖
    static final int CANCELLED = 1;
     // waitStatus變量的值,標(biāo)志著后繼線程(即隊(duì)列中此節(jié)點(diǎn)之后的節(jié)點(diǎn))需要被阻塞.(用于獨(dú)占鎖)
    static final int SIGNAL = -1;
    // waitStatus變量的值,標(biāo)志著線程在Condition條件上等待阻塞.(用于Condition的await等待)
    static final int CONDITION = -2;
    // waitStatus變量的值,標(biāo)志著下一個acquireShared方法線程應(yīng)該被無條件傳播。(用于共享鎖)
    static final int PROPAGATE = -3;

     // 標(biāo)記著當(dāng)前節(jié)點(diǎn)的狀態(tài),默認(rèn)狀態(tài)是0, 小于0的狀態(tài)都是有特殊作用,大于0的狀態(tài)表示已取消
     //SIGNAL:此節(jié)點(diǎn)的繼任節(jié)點(diǎn)被阻塞,故當(dāng)此節(jié)點(diǎn)釋放鎖或中斷時需要換繼任節(jié)點(diǎn)的線程。
     //CANCELLED:當(dāng)獲取鎖超時或被中斷時節(jié)點(diǎn)狀態(tài)會被設(shè)置成此狀態(tài),此狀態(tài)的節(jié)點(diǎn)不會被再次阻塞;
     //CONDITION:標(biāo)識此節(jié)點(diǎn)在條件隊(duì)列中,在同步隊(duì)列的節(jié)點(diǎn)不會出現(xiàn)此狀態(tài),
     //當(dāng)節(jié)點(diǎn)從條件隊(duì)列移到同步隊(duì)列時此狀態(tài)會被設(shè)置為0;
     //PROPAGATE:一個releaseShared應(yīng)該被傳播到其他節(jié)點(diǎn),此狀態(tài)在doReleaseShared()中調(diào)用,
     //以確保傳播傳播在其他插入時保持繼續(xù)。
     //總結(jié):狀態(tài)小于0表示節(jié)點(diǎn)無需被通知喚醒;狀態(tài)為0表示普通同步節(jié)點(diǎn);CONDITION表示節(jié)點(diǎn)在
     //等待隊(duì)列中,狀態(tài)通過CAS進(jìn)行原子更新。
    volatile int waitStatus;

    /**
     * 前驅(qū)節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除;
     */
    volatile Node prev;

    /**
     * 后繼節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除;
     * 入隊(duì)操作不會設(shè)置前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn),直到節(jié)點(diǎn)連接到隊(duì)列;
     * 故next節(jié)點(diǎn)為null不一定表示此節(jié)點(diǎn)為隊(duì)列尾部,當(dāng)next節(jié)點(diǎn)為null時,
     * 可遍歷prev節(jié)點(diǎn)進(jìn)行雙重檢查;已經(jīng)取消的節(jié)點(diǎn)的next指向自己而不是null
     */
    volatile Node next;

    //該節(jié)點(diǎn)擁有的線程
    volatile Thread thread;

    /**
     * 1、值為null或非SHARED;為null時表示獨(dú)占模式;非SHARED時表示在Condition中等待的隊(duì)列;
     * 2、值為SHARED,表示共享模式;
     */
    Node nextWaiter;

    //是否為共享模式
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    //當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),如果前驅(qū)節(jié)點(diǎn)為null,則拋出NPE異常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    //用于在addWaiter()中使用,創(chuàng)建同步隊(duì)列中的節(jié)點(diǎn)
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //在Condition中使用,創(chuàng)建等待隊(duì)列的節(jié)點(diǎn)
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

說明:

  • waitStatus:表示當(dāng)前節(jié)點(diǎn)的狀態(tài),會有如下五中狀態(tài):

CANCELLED(1):當(dāng)獲取鎖超時或被中斷時節(jié)點(diǎn)狀態(tài)會被設(shè)置成此狀態(tài),此狀態(tài)的 節(jié)點(diǎn)會被unpark,不會參與鎖的獲取,不會被再次阻塞;

0:表示普通節(jié)點(diǎn),當(dāng)節(jié)點(diǎn)初始插入到同步隊(duì)列時的狀態(tài);

SIGNAL(-1):此節(jié)點(diǎn)的繼任節(jié)點(diǎn)被阻塞,故當(dāng)此節(jié)點(diǎn)釋放鎖或中斷時需要換繼任節(jié)點(diǎn)的線程。

CONDITION(-2):標(biāo)識此節(jié)點(diǎn)在條件隊(duì)列中,在同步隊(duì)列的節(jié)點(diǎn)不會出現(xiàn)此狀態(tài),當(dāng)節(jié)點(diǎn)從條件隊(duì)列移到同步隊(duì)列時此狀態(tài)會被設(shè)置為0;

PROPAGATE(-3):一個releaseShared應(yīng)該被傳播到其他節(jié)點(diǎn),此狀態(tài)在doReleaseShared()中調(diào)用,以確保傳播傳播在其他插入時保持繼續(xù)。

總結(jié):狀態(tài)小于0表示節(jié)點(diǎn)無需被通知喚醒;狀態(tài)為0表示普通同步節(jié)點(diǎn);CONDITION表示節(jié)點(diǎn)在等待隊(duì)列中,狀態(tài)通過CAS進(jìn)行原子更新。

  • prev:前驅(qū)節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除;
  • next:后繼節(jié)點(diǎn),在enqueue時設(shè)置,在dequeue或前驅(qū)節(jié)點(diǎn)取消時清除; 入隊(duì)操作不會設(shè)置前驅(qū)節(jié)點(diǎn)的后繼節(jié)點(diǎn),直到節(jié)點(diǎn)連接到隊(duì)列; 故next節(jié)點(diǎn)為null不一定表示此節(jié)點(diǎn)為隊(duì)列尾部,當(dāng)next節(jié)點(diǎn)為null時, 可遍歷prev節(jié)點(diǎn)進(jìn)行雙重檢查;已經(jīng)取消的節(jié)點(diǎn)的next指向自己而不是null
  • **nextWaiter: **值為SHARED,表示共享模式;值為null或非SHARED,為null時表示獨(dú)占模式,非SHARED時表示在Condition隊(duì)列中等待的節(jié)點(diǎn);

waitStatus狀態(tài)狀態(tài):

狀態(tài)轉(zhuǎn)換.png

2.2、AQS數(shù)據(jù)結(jié)構(gòu)

/**
 * 等待隊(duì)列的頭節(jié)點(diǎn),通過setHead方法進(jìn)行更新;頭結(jié)點(diǎn)的狀態(tài)不可能是CANCELLED
 */
private transient volatile Node head;

/**
 * 等待隊(duì)列的尾節(jié)點(diǎn)
 */
private transient volatile Node tail;

/**
 * 同步狀態(tài)
 */
private volatile int state;

3、CLH隊(duì)列相關(guān)操作

3.1、相關(guān)屬性的CAS操作

/**
 * 通過CAS設(shè)置AQS的head值
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
 * 通過CAS設(shè)置AQS的tail值
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

/**
 * 通過CAS設(shè)置Node節(jié)點(diǎn)的waitStatue值
 */
private static final boolean compareAndSetWaitStatus(Node node,
                                                     int expect,
                                                     int update) {
    return unsafe.compareAndSwapInt(node, waitStatusOffset,
            expect, update);
}

/**
 * 通過CAS設(shè)置Node節(jié)點(diǎn)的next值
 */
private static final boolean compareAndSetNext(Node node,
                                               Node expect,
                                               Node update) {
    return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}

3.2、將節(jié)點(diǎn)添加到CLH隊(duì)尾

//通過空轉(zhuǎn)及CAS方式,將一個node插入同步隊(duì)列的隊(duì)尾
private Node enq(final Node node) {
    for (; ; ) {
        Node t = tail;
        
        //尾節(jié)點(diǎn)為空,則直接CAS初始化頭結(jié)點(diǎn),并將尾節(jié)點(diǎn)設(shè)置為頭節(jié)點(diǎn)
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            //尾節(jié)點(diǎn)不為空,則CAS設(shè)置當(dāng)前節(jié)點(diǎn)為尾節(jié)點(diǎn)
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

3.3、將當(dāng)前線程添加到CLH隊(duì)尾

//創(chuàng)建一個給定模式的節(jié)點(diǎn),此節(jié)點(diǎn)線程為當(dāng)前線程,并將節(jié)點(diǎn)加入隊(duì)尾
private Node addWaiter(Node mode) {
    //創(chuàng)建線程
    Node node = new Node(Thread.currentThread(), mode);
    //快速檢測尾節(jié)點(diǎn)不為空,則CAS將當(dāng)前節(jié)點(diǎn)替換為尾節(jié)點(diǎn)
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //快速替換失?。縿t通過enq()的空轉(zhuǎn)方式將當(dāng)前節(jié)點(diǎn)插入隊(duì)尾
    enq(node);
    return node;
}

4、獨(dú)占鎖

獨(dú)占鎖主要包括兩方面功能:

  • 獲取鎖的功能:既當(dāng)多個線程一起獲取鎖的時候,只有一個線程能獲取到鎖,其他線程必須在當(dāng)前位置阻塞等待。
  • 釋放鎖的功能:獲取鎖的線程釋放鎖資源,而且還必須能喚醒正在等待鎖資源的一個線程。

4.1、獨(dú)占鎖獲取流程

獲取獨(dú)占鎖流程.png

4.2、獲取獨(dú)占鎖相關(guān)方法

//獲取獨(dú)占鎖,忽略中斷;直到成功獲取鎖,此方法經(jīng)常被lock.lock調(diào)用
public final void acquire(int arg) {
    //tryAcquire:先CAS嘗試獲取鎖,當(dāng)返回true表示獲取成功,此方法由子類實(shí)現(xiàn);
    //當(dāng)返回false,則調(diào)用
    //acquireQueued進(jìn)行獲取及入隊(duì)阻塞處理,當(dāng)有其他線程釋放鎖會喚醒改線程。
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//調(diào)用tryAcquire嘗試獲取鎖,當(dāng)獲取失敗就將線程節(jié)點(diǎn)入隊(duì)并阻塞節(jié)點(diǎn)線程;
//直到線程被中斷或被喚醒,會再次嘗試獲取鎖;
final boolean acquireQueued(final Node node, int arg) {
    //標(biāo)識獲取鎖失敗
    boolean failed = true;
    try {
        //標(biāo)識鎖被中斷
        boolean interrupted = false;
        for (; ; ) {
            //獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),若前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn),則嘗試獲取鎖;
            //若獲取成功則將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn),否則嘗試阻塞節(jié)點(diǎn)線程
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                //將當(dāng)前節(jié)點(diǎn)設(shè)為頭節(jié)點(diǎn)
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //判斷當(dāng)前是否滿足阻塞條件,滿足則阻塞當(dāng)前線程;
            //并等待被中斷或被喚醒
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //獲取鎖失敗,則進(jìn)行取消獲取操作
        if (failed)
            cancelAcquire(node);
    }
}
//判斷當(dāng)前節(jié)點(diǎn)線程是否需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驅(qū)節(jié)點(diǎn)的狀態(tài)
    int ws = pred.waitStatus;
    
    //前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL,表示前驅(qū)節(jié)點(diǎn)在等待獲取鎖的信號
    //故本節(jié)點(diǎn)可以安全的阻塞
    if (ws == Node.SIGNAL)
        return true;
    
    //前驅(qū)節(jié)點(diǎn)waitStatus>0,即為waitStatus=CANCELLED;
    //表示前驅(qū)節(jié)點(diǎn)已經(jīng)被取消,需需要前向遍歷前驅(qū)節(jié)點(diǎn),直到狀態(tài)
    //不為CANCELLED的節(jié)點(diǎn),并將此節(jié)點(diǎn)設(shè)為node節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn);
    //返回false,讓上層調(diào)用繼續(xù)嘗試獲取鎖
    if (ws > 0) {
        //循環(huán)遍歷前驅(qū)節(jié)點(diǎn),尋找狀態(tài)不為CANCELLED的節(jié)點(diǎn),并設(shè)為當(dāng)前節(jié)點(diǎn)的
        //前驅(qū)節(jié)點(diǎn)
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //當(dāng)前驅(qū)節(jié)點(diǎn)狀態(tài)為0或PROPAGATE時,通過CAS設(shè)置前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL
        //并返回fase,等待下個循環(huán)阻塞當(dāng)前節(jié)點(diǎn)線程;
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//通過LockSupport.park()阻塞當(dāng)前線程,直到線程被unpark或被中斷
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

4.3、釋放獨(dú)占鎖相關(guān)方法

//釋放獨(dú)占鎖
public final boolean release(int arg) {
    //調(diào)用tryRelease方式通過CAS嘗試釋放鎖,tryRelease由子類實(shí)現(xiàn)
    if (tryRelease(arg)) {
        Node h = head;
        
        //頭結(jié)點(diǎn)不為空且頭節(jié)點(diǎn)狀態(tài)不為0,應(yīng)該為SIGNAL
        //表示隊(duì)列中有需要喚醒的節(jié)點(diǎn),調(diào)用unparkSuccessor進(jìn)行頭節(jié)點(diǎn)線程
        //喚醒操作
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
//喚醒節(jié)點(diǎn)線程處理
private void unparkSuccessor(Node node) {
    //獲取當(dāng)前節(jié)點(diǎn)狀態(tài)
    int ws = node.waitStatus;
    //狀態(tài)小于零,則將狀態(tài)重置為0,表示節(jié)點(diǎn)處理已經(jīng)完成
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    //獲取后繼節(jié)點(diǎn),當(dāng)后繼節(jié)點(diǎn)為空或后繼節(jié)點(diǎn)狀態(tài)為CANCELLED時;
    //由tail前向遍歷隊(duì)列,找到當(dāng)前節(jié)點(diǎn)的下個有效節(jié)點(diǎn),即waitStatus <= 0
    //的節(jié)點(diǎn)
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //下個節(jié)點(diǎn)非空,表示為等待信號的節(jié)點(diǎn),執(zhí)行unpark喚醒節(jié)點(diǎn)線程
    if (s != null)
        LockSupport.unpark(s.thread);
}

5、共享鎖

5.1、共享鎖獲取流程

共享鎖獲取流程.png

5.2、共享鎖獲取相關(guān)方法

//獲取共享鎖,忽略中斷;
public final void acquireShared(int arg) {
    //子類實(shí)現(xiàn),CAS方式獲取共享鎖,若獲取失敗,調(diào)用doAcquireShared繼續(xù)獲取共享鎖
    if (tryAcquireShared(arg) < 0)
        //嘗試獲取共享鎖,獲取失敗則將當(dāng)前線程節(jié)點(diǎn)入隊(duì),直到被通知或被中斷
        doAcquireShared(arg);
}
//獲取共享鎖,若CAS獲取失敗,則將當(dāng)前節(jié)點(diǎn)入隊(duì)并阻塞當(dāng)前線程,直到獲取鎖
private void doAcquireShared(int arg) {
    //將當(dāng)前節(jié)點(diǎn)插入隊(duì)尾
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (; ; ) {
            //獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn),若前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn),則嘗試獲取鎖;
            //若獲取失敗,則檢查節(jié)點(diǎn)狀態(tài);當(dāng)節(jié)點(diǎn)狀態(tài)為SIGNAL時將節(jié)點(diǎn)線程阻塞
            final Node p = node.predecessor();
            if (p == head) {
                //CAS獲取鎖
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    //成功則設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)并將其他節(jié)點(diǎn)狀態(tài)設(shè)為PROPAGAE
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //檢查當(dāng)前節(jié)點(diǎn)是否應(yīng)該阻塞,是則進(jìn)行阻塞處理,直到被中斷
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //若獲取鎖失敗,則進(jìn)行取消獲取處理
        if (failed)
            cancelAcquire(node);
    }
}
//設(shè)置當(dāng)前節(jié)點(diǎn)為頭節(jié)點(diǎn)并喚醒共享模式下的線程
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    //設(shè)為頭節(jié)點(diǎn)
    setHead(node);
    //若propagate > 0或頭結(jié)點(diǎn)為空且頭節(jié)點(diǎn)狀態(tài)為 < 0 
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        //若頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)為共享模式,則獲取頭結(jié)點(diǎn)
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

5.3、共享鎖釋放相關(guān)方法

//釋放共享鎖
public final boolean releaseShared(int arg) {
    //cas方式釋放鎖,若失敗則doReleaseShared釋放鎖
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
private void doReleaseShared() {
    for (; ; ) {
        //獲取頭結(jié)點(diǎn),頭結(jié)點(diǎn)不為空且狀態(tài)為SIGNAL時,CAS設(shè)置狀態(tài)為0并喚醒線程
        //否則將頭結(jié)點(diǎn)狀態(tài)設(shè)置為PROPAGATE,然后循環(huán)檢查頭結(jié)點(diǎn)狀態(tài)并試圖喚醒
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            } else if (ws == 0 &&
                    !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

6、condition條件

6.1、condition的實(shí)現(xiàn)

首先內(nèi)部存在一個Condition隊(duì)列,存儲著所有在此Condition條件等待的線程。

await系列方法:讓當(dāng)前持有鎖的線程釋放鎖,并喚醒一個在CLH隊(duì)列上等待鎖的線程,再為當(dāng)前線程創(chuàng)建一個node節(jié)點(diǎn),插入到Condition隊(duì)列(注意不是插入到CLH隊(duì)列中)

signal系列方法:其實(shí)這里沒有喚醒任何線程,而是將Condition隊(duì)列上的等待節(jié)點(diǎn)插入到CLH隊(duì)列中,所以當(dāng)持有鎖的線程執(zhí)行完畢釋放鎖時,就會喚醒CLH隊(duì)列中的一個線程,這個時候才會喚醒線程。

6.2、await及signal處理流程

await流程.png
signal處理流程

6.3、await相關(guān)方法

//讓當(dāng)前持有鎖的線程阻塞等待,并釋放鎖。如果有中斷請求,則拋出InterruptedException異常
public final void await() throws InterruptedException {
    //若當(dāng)前線程已被中斷,則拋出中斷異常
    if (Thread.interrupted())
        throw new InterruptedException();
   // 為當(dāng)前線程創(chuàng)建新的Node節(jié)點(diǎn),并且將這個節(jié)點(diǎn)插入到Condition隊(duì)列中
    Node node = addConditionWaiter();
    //釋放當(dāng)前線程持有的鎖,并喚醒同步隊(duì)列中的頭結(jié)點(diǎn)
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    //如果當(dāng)前節(jié)點(diǎn)補(bǔ)足同步隊(duì)列中;
    //阻塞當(dāng)前線程,當(dāng)前當(dāng)前線程被signal信號喚醒后,將當(dāng)前節(jié)點(diǎn)加入同步隊(duì)列中;
    //等待獲取獲取鎖
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //檢查是否被中斷并入隊(duì)等待鎖
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 如果節(jié)點(diǎn)node已經(jīng)在同步隊(duì)列中了,獲取同步鎖,只有得到鎖才能繼續(xù)執(zhí)行,否則線程繼續(xù)阻塞等待
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 清除Condition隊(duì)列中狀態(tài)不是Node.CONDITION的節(jié)點(diǎn)    
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    // 是否要拋出異常,或者發(fā)出中斷請求
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
//為當(dāng)前線程創(chuàng)建新的Node節(jié)點(diǎn),并且將這個節(jié)點(diǎn)插入到Condition隊(duì)列中
private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果等待隊(duì)列尾節(jié)點(diǎn)狀態(tài)不是CONDITION,則進(jìn)行清除操作;
    // 清除隊(duì)列中狀態(tài)不是CONDITION的節(jié)點(diǎn)
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //為當(dāng)前線程創(chuàng)建一個狀態(tài)為CONDITION的節(jié)點(diǎn),并將節(jié)點(diǎn)插入隊(duì)尾
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
//從頭到尾部遍歷等待隊(duì)列,去除狀態(tài)不是CONDITION的節(jié)點(diǎn)
private void unlinkCancelledWaiters() {
    //記錄下一個待處理的節(jié)點(diǎn)
    Node t = firstWaiter;
    //記錄上一個狀態(tài)為CONDITION的節(jié)點(diǎn)
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null)
                firstWaiter = next;
            else
                trail.nextWaiter = next;
            if (next == null)
                lastWaiter = trail;
        } else
            trail = t;
        t = next;
    }
}
//釋放當(dāng)前線程占有的鎖,并喚醒同步隊(duì)列一個等待線程
//如果失敗就拋出異常,設(shè)置node節(jié)點(diǎn)的狀態(tài)是Node.CANCELLED
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        //釋放當(dāng)前線程占有的鎖
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
//判斷節(jié)點(diǎn)釋放在同步隊(duì)列中
final boolean isOnSyncQueue(Node node) {
    // 如果node的狀態(tài)是Node.CONDITION,或者node沒有前一個節(jié)點(diǎn)prev,
    // 那么返回false,節(jié)點(diǎn)node不在同步隊(duì)列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果node有下個節(jié)點(diǎn),則其一定在同步隊(duì)列中    
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //從同步隊(duì)列中查找node節(jié)點(diǎn)
    return findNodeFromTail(node);
}

//根據(jù)當(dāng)前的模式,判斷是否拋出異常或重新中斷等
private void reportInterruptAfterWait(int interruptMode)
        throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

6.4、signal相關(guān)方法

//如果等待隊(duì)列不為空,則將隊(duì)列頭節(jié)點(diǎn)插入同步隊(duì)列中
public final void signal() {
    //如果當(dāng)前線程不是獨(dú)占鎖,則拋出異常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    //將等待隊(duì)列中的頭結(jié)點(diǎn)插入同步隊(duì)列中
    if (first != null)
        doSignal(first);
}
//將等待隊(duì)列總的頭結(jié)點(diǎn)插入同步隊(duì)列中
private void doSignal(Node first) {
    do {
        // 原先的Condition隊(duì)列頭節(jié)點(diǎn)取消,所以重新賦值Condition隊(duì)列頭節(jié)點(diǎn)
        // 如果新的Condition隊(duì)列頭節(jié)點(diǎn)為null,表示Condition隊(duì)列為空了
        // ,所以也要設(shè)置Condition隊(duì)列尾lastWaiter為null
        if ((firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
            (first = firstWaiter) != null);
}
// 返回true表示節(jié)點(diǎn)node插入到同步隊(duì)列中,返回false表示節(jié)點(diǎn)node沒有插入到同步隊(duì)列中
final boolean transferForSignal(Node node) {
    //如果無法將節(jié)點(diǎn)狀態(tài)由CONDITION修改為0,表示節(jié)點(diǎn)已在同步隊(duì)列中,直接返回false
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    //將節(jié)點(diǎn)node插入到同步隊(duì)列中,p是原先同步隊(duì)列尾節(jié)點(diǎn),也是node節(jié)點(diǎn)的前一個節(jié)點(diǎn)
    Node p = enq(node);
    int ws = p.waitStatus;
    // 如果前一個節(jié)點(diǎn)是已取消狀態(tài),或者不能將它設(shè)置成Node.SIGNAL狀態(tài)。
    // 就說明節(jié)點(diǎn)p之后也不會發(fā)起喚醒下一個node節(jié)點(diǎn)線程的操作,
    // 所以這里直接調(diào)用 LockSupport.unpark(node.thread)方法,喚醒節(jié)點(diǎn)node所在線程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
//喚醒所有節(jié)點(diǎn)
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}
//循環(huán)喚醒所有等待中的節(jié)點(diǎn)
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容