Java并發(fā)編程專題之AQS

主要參考了博客JUC框架 源碼解析系列文章目錄 JDK8

AbstractQueuedSynchronizer

概述

實現(xiàn)大量依賴樂觀鎖的方式(即CAS+自旋)。它實現(xiàn)了一個FIFO的等待隊列用于等待獲取同步狀態(tài),而獲取/釋放同步器狀態(tài)的函數(shù)則依靠子類來實現(xiàn)。

雖然AQS是一個抽象類,但卻沒有任何抽象方法。如果定義為抽象方法確實不合適,因為繼承使用AQS并不一定需要使用到AQS提供的所有功能(獨占鎖和共享鎖),這樣子類反而需要實現(xiàn)所有抽象方法。如果定義為空實現(xiàn)的普通方法,雖然不需要子類實現(xiàn)所有空方法了,但這樣還是不夠明確。現(xiàn)在AQS將這些方法的實現(xiàn)為拋出UnsupportedOperationException異常,那么如果是子類需要使用的方法,就覆蓋掉它;如果是子類不需要使用的方法,一旦調(diào)用就會拋出異常。

AQS定義兩種資源共享方式:Exclusive(獨占,只有一個線程能執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore/CountDownLatch)。

簡單使用案例(實現(xiàn)一個共享鎖):

自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:

  • isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。
  • tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
  • tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
  • tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失敗;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true,否則返回false。
public class MyLock implements Lock {
    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
            if (count <= 0) throw new IllegalArgumentException();
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int acquireCount) {
            while (true) {
                int cur = getState();
                int newCount = cur - acquireCount;
                if (newCount < 0 || compareAndSetState(cur, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int releaseCount) {
            while (true) {
                int cur = getState();
                int newCount = cur + releaseCount;
                if (compareAndSetState(cur, newCount)) {
                    return true;
                }
            }
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }
}

重要屬性

state

  • state用volatile修飾,保證了它的可見性。
  • 如果是多線程并發(fā)修改的話,采用compareAndSetState來操作state
  • 如果是在沒有線程安全的環(huán)境下對state操作(例如ReentrantLock釋放鎖,因為它之前已經(jīng)獲取到獨占鎖,所以沒必要用CAS),采用setState方法
private volatile int state;

protected final int getState() {
    return state;
}

protected final void setState(int newState) {
    state = newState;
}

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

等待隊列

AQS中已經(jīng)為我們實現(xiàn)了一個FIFO的等待隊列,它是一個雙向鏈表。由于同步器的state一般不能讓所有線程同時獲得,所以將這些需要暫時等待的線程包裝成一個節(jié)點放到隊列中去,當(dāng)獲取state的條件滿足時,會將這個節(jié)點內(nèi)的線程喚醒,以便它接下來去嘗試獲取state。

static final class Node {
    static final Node SHARED = new Node(); 
    static final Node EXCLUSIVE = null; 

    static final int CANCELLED = 1;
    static final int SIGNAL = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3; 

    volatile int waitStatus; // 表明node代表線程的狀態(tài)
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;

    Node nextWaiter; // 表明當(dāng)前node的線程是想要獲取共享鎖還是獨占鎖

    final boolean isShared() {
        return nextWaiter == SHARED;
    }
}
private transient volatile Node head; // 固定是一個dummy node,因為它的thread成員固定為null
private transient volatile Node tail; // 請求鎖失敗的線程,會包裝成node,放到隊尾
  • head節(jié)點中thread成員為null,可以理解為將它的thread成員放到AQS的exclusiveOwnerThread屬性上
  • 即使等待線程只有一個,等待隊列中的節(jié)點個數(shù)也肯定是2個,因為第一個節(jié)點總是dummy node。

acquire(int arg)

流程

20216141
  • 首先調(diào)用子類的tryAcquire嘗試獲取獨占鎖一次,try的意思就是只試一次,要么成功,要么失敗。
  • 獲取不到則調(diào)用addWaiter(Node.EXCLUSIVE)將該線程加入等待隊列的尾部,并標(biāo)記為獨占模式
  • acquireQueued使線程在等待隊列中獲取資源,中途可能不斷經(jīng)歷阻塞/喚醒狀態(tài),一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
  • 如果線程在等待過程中被中斷過,它是不響應(yīng)的。但是當(dāng)acquireQueued返回真時,代表這期間函數(shù)曾經(jīng)檢測到過中斷狀態(tài),并且將中斷狀態(tài)消耗掉了(Thread.interrupted()),所以需要在退出acquire之前,將中斷狀態(tài)重新設(shè)置上。
/**
Acquires in exclusive mode, ignoring interrupts.
Implemented by invoking at least once tryAcquire, returning on success.
Otherwise the thread is queued, 
possibly repeatedly blocking and unblocking, invoking tryAcquire until success.
*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/**
該方法的默認(rèn)實現(xiàn)是拋出UnsupportedOperationException,具體實現(xiàn)由自定義的擴(kuò)展了AQS的同步類來實現(xiàn)。
AQS在這里只負(fù)責(zé)定義了一個公共的方法框架。
沒有定義成abstract,是因為獨占模式下只用實現(xiàn)tryAcquire-tryRelease,
而共享模式下只用實現(xiàn)tryAcquireShared-tryReleaseShared。
如果都定義成abstract,那么每個模式也要去實現(xiàn)另一模式下的接口。
*/
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

addWaiter(Node)

202161413
  • 將當(dāng)前線程封裝成一個節(jié)點(Node.EXCLUSIVE互斥模式、Node.SHARED共享模式)
  • 嘗試快速入隊:通過一次CAS加入到等待隊列的隊尾。
  • 如果CAS失敗或者隊列為空,則通過enq(node)方法初始化一個等待隊列
  • 在enq(node)中,如果隊列為空,則會給頭部設(shè)置一個空節(jié)點:compareAndSetHead(new Node()),隨后不斷自旋直到把node加入到等待隊列隊尾。這個循環(huán)只有在compareAndSetTail(t, node)成功時才會退出循環(huán),這就保證了enq最終肯定能將參數(shù)node放到隊尾。就算只有一個線程入隊,入隊完畢后隊列將有兩個node,第一個node稱為dummy node,因為它的thread成員為null;第二個node才算是實際意義的隊頭,它的thread成員不為null。新建的是空node,它的所有成員都是默認(rèn)值。thread成員為null,waitStatus為0。之后你會發(fā)現(xiàn),隊尾node的waitStatus總是0,因為默認(rèn)初始化。
  • 返回當(dāng)前線程所在的結(jié)點

注意點

如果是多線程執(zhí)行,可能導(dǎo)致多個node.prev鏈接到了tail,但是通過CAS保證tail.next只會鏈接到其中一個Node,并且其他的Node在不斷的自旋中最終還是會加入到等待隊列中

prev的有效性:有可能產(chǎn)生這樣一種中間狀態(tài),即node.prev指向了原先的tail,但是tail.next還沒來得及指向node。這時如果另一個線程通過next指針遍歷隊列,就會漏掉最后一個node。但是如果是通過tail.prev來遍歷等待隊列,就不會漏掉節(jié)點

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { 
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

acquireQueued

202161412
  • 每次循環(huán)都會判斷是否可以嘗試獲取鎖(判斷前驅(qū)節(jié)點p是否為head),如果可以,那么嘗試tryAcquire(arg)
  • 如果不可以嘗試,或者獲取鎖失敗,則通過parkAndCheckInterrupt阻塞線程并檢查線程中斷狀態(tài)
  • 如果線程被unpark/interrupt,則會從park中返回,接著從parkAndCheckInterrupt()返回,繼續(xù)往下執(zhí)行
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true; 
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; 
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) cancelAcquire(node); // 該方法不會被執(zhí)行
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

shouldParkAfterFailedAcquire

Node的狀態(tài)

static final int CANCELLED =  1; 
static final int SIGNAL    = -1; 
static final int CONDITION = -2;
static final int PROPAGATE = -3;
  • CANCELLED代表線程已經(jīng)取消等待

  • SIGNAL說明這個node的后繼node的代表線程已經(jīng)阻塞或馬上阻塞。當(dāng)前node成為head并釋放鎖時,會根據(jù)SIGNAL來喚醒后繼node。即SIGNAL是喚醒后繼節(jié)點的標(biāo)志。

  • 一個node新建的時候,它的waitStatus是默認(rèn)初始化為0

說明

獲取鎖失敗了才會執(zhí)行該函數(shù):

  1. p == head為false,即當(dāng)前線程的node的前驅(qū)不是head
  2. 雖然 p == head為true,雖然當(dāng)前線程雖然已經(jīng)排到等待隊列的最前面,但獲取鎖還是失敗了

只有當(dāng)該函數(shù)返回true時,才會去執(zhí)行parkAndCheckInterrupt

作用:跳過無效前驅(qū),把node的有效前驅(qū)(有效是指node不是CANCELLED的)找到,并且將有效前驅(qū)的狀態(tài)設(shè)置為SIGNAL,之后便返回true代表馬上可以阻塞了。給前一個節(jié)點設(shè)置SIGNAL,相當(dāng)于一個鬧鐘,當(dāng)前一個節(jié)點釋放鎖時,喚醒當(dāng)前節(jié)點

執(zhí)行兩次
如果剛開始前驅(qū)的狀態(tài)為0,那么需要第一次執(zhí)行compareAndSetWaitStatus(pred, ws, Node.SIGNAL)返回false并進(jìn)入下一次循環(huán),第二次才能進(jìn)入if (ws == Node.SIGNAL)分支,所以說至少執(zhí)行兩次。死循環(huán)保證了最終一定能設(shè)置前驅(qū)為SIGNAL成功的。(考慮當(dāng)前線程一直不能獲取到鎖)

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) return true;
    if (ws > 0) {
        do {
            // 是CANCELLED,說明前驅(qū)節(jié)點已經(jīng)因為超時或響應(yīng)了中斷,而取消了自己,需要跳過他們
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL); // 把一個node的狀態(tài)變成SIGNAL
    }
    return false;
}

parkAndCheckInterrupt

調(diào)用完LockSupport.park(this),當(dāng)前線程就阻塞在這里,直到有別的線程unpark了當(dāng)前線程,或者中斷了當(dāng)前線程。而返回的Thread.interrupted()代表當(dāng)前線程在阻塞的過程中,有沒有被別的線程中斷過,如果有,則返回true。注意,Thread.interrupted()會消耗掉中斷的狀態(tài),即第一次執(zhí)行能返回true,但緊接著第二次執(zhí)行就只會返回false了。

如果是別的線程unpark了當(dāng)前線程,那么調(diào)用Thread.interrupted()返回false。
如果是別的線程中斷了當(dāng)前線程,那么調(diào)用Thread.interrupted()返回true。

回到acquireQueued的邏輯中,發(fā)現(xiàn)一旦當(dāng)前線程被中斷過一次,那么parkAndCheckInterrupt就返回了true,那么執(zhí)行interrupted = true,interrupted局部變量就一定是true的了。(該中斷狀態(tài)會永久保留,用于最外層acquire中恢復(fù)用戶中斷)

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

注意點

忽略中斷

整個過程忽略用戶發(fā)出的中斷信號(也就是由于線程獲取同步狀態(tài)失敗后進(jìn)入同步隊列中,后續(xù)對線程進(jìn)行中斷操作時,線程不會從同步隊列中移出),直到acquireQueued執(zhí)行結(jié)束后,才通過selfInterrupt恢復(fù)用戶的中斷

為什么tryAcquire(arg)的前提是p == head?

從enq的邏輯可知,head只會是一個dummy node,實際意義的node只會在head的后面。而node的前驅(qū)是head(final Node p = node.predecessor()),則代表node已經(jīng)是隊列中的第一個實際node了,排在最前面的node自然可以嘗試去獲取鎖了。

回想整個調(diào)用過程,是最開始在acquire里調(diào)用tryAcquire就已經(jīng)失敗了,然而此時第一次循環(huán)時,又可能馬上去調(diào)tryAcquire(說可能,是因為需要p == head成立),這會不會是一次肯定失敗的tryAcquire?
考慮這種場景,線程1獲取了鎖正在使用還沒釋放,此時隊列為空,線程2此時也來獲取鎖,自然最開始在acquire里調(diào)用tryAcquire會失敗,假設(shè)線程2剛開始執(zhí)行acquireQueued,此時線程1釋放了鎖,此時線程2肯定排在head后面,那么線程2馬上tryAcquire,然后就可以獲取成功。

執(zhí)行acquireQueued的線程是誰?
一定是node參數(shù)的thread成員,雖然執(zhí)行過程中,可能會經(jīng)歷不斷阻塞和被喚醒的過程。

為什么剛執(zhí)行完addWaiter方法時,才把代表當(dāng)前線程的node放到隊尾,怎么之后一判斷就會發(fā)現(xiàn)自己處于head的后繼了?

考慮addWaiter時,隊列中有許多node。這說明從head到當(dāng)前方法棧中的node之間的那些node,它們自己也會在執(zhí)行acquireQueued,它們依次執(zhí)行成功(指p == head && tryAcquire(arg)成功),每次執(zhí)行成功相當(dāng)于把head成員從隊列上后移一個node,當(dāng)它們都執(zhí)行完畢,當(dāng)前方法棧中的node自然也就是head的后繼了。
“之間的那些node”的最后一個node執(zhí)行acquireQueued成功后(代表 最后一個node的代表線程獲得鎖成功,它自己成為了head),當(dāng)前方法還在阻塞之中,只有當(dāng)這“最后一個node”釋放獨占鎖時,才會執(zhí)行unparkSuccessor(head),當(dāng)前線程才會被喚醒。

finally塊是否會執(zhí)行cancelAcquire(node)?

雖然號稱此函數(shù)是不響應(yīng)中斷的函數(shù),但不響應(yīng)中斷只是對于AQS的使用者來說,如果一個線程阻塞在parkAndCheckInterrupt這里,別的線程來中斷它,它是會馬上喚醒的,然后繼續(xù)這個循環(huán)。不過想要退出這個函數(shù),只有通過return interrupted,而前一句就是failed = false,所以finally塊里,是永遠(yuǎn)不會去執(zhí)行cancelAcquire(node)的。

release(int arg)

獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。

釋放鎖的過程,根本不會區(qū)分公平或不公平、響應(yīng)中斷或不響應(yīng)中斷、超時或不超時。這是因為,這些區(qū)別都只是存在于嘗試獲取鎖的方式上而已,既然已經(jīng)獲得了鎖,也就不需要有這些區(qū)別。

細(xì)節(jié)

  • 如果遇到s == null,說明我們遇到一種中間狀態(tài),next指針還沒有指好。如果遇到s.waitStatus > 0,說明head后繼剛?cè)∠?。這兩種情況,都需要從隊尾的prev往前找。
  • 注意循環(huán)條件t != null && t != node,它會從隊尾一直往前找,直到t是null或t已經(jīng)到達(dá)了node。一般情況下,不會出現(xiàn)t != null,所以,這樣循環(huán)肯定能找到node之后第一個不是取消狀態(tài)的節(jié)點。
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 如果從頭到尾都只有一個線程在使用鎖,那么隊列也不會初始化,head肯定為null。
        // 當(dāng)隊列只有一個dummy node時,它的狀態(tài)為0,也就不會執(zhí)行unparkSuccessor(h)了
        // 當(dāng)head的狀態(tài)為SIGNAL時,說明head后繼已經(jīng)設(shè)置了鬧鐘,會執(zhí)行unparkSuccessor(h)。
        if (h != null && h.waitStatus != 0) unparkSuccessor(h);
        return true;
    }
    return false;
}

protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) compareAndSetWaitStatus(node, ws, 0);
    Node s = node.next;
    /*
    * head后繼一般能直接通過next找到,但只有prev是肯定有效的。
    * 所以遇到next為null,肯定需要從隊尾的prev往前找。
    * 遇到next的狀態(tài)為取消,也需要從隊尾的prev往前找。
    */
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0) s = t;
    }
    if (s != null) LockSupport.unpark(s.thread);
}

acquireInterruptibly(int arg)

進(jìn)入這個方法后,會第一次進(jìn)行tryAcquire嘗試。但不同的,此acquireInterruptibly函數(shù)中,會去檢測Thread.interrupted(),并拋出異常。

對于acquireInterruptibly這個方法而言,既可以是公平的,也可以是不公平的,這完全取決于tryAcquire的實現(xiàn)(即取決于ReentrantLock當(dāng)初是怎么構(gòu)造的)。

如果檢測到中斷信號,首先線程會從LockSupport.park()中返回,并且拋出InterruptedException異常,執(zhí)行cancelAcquire方法,將該線程代表的節(jié)點從等待隊列中移除,并根據(jù)情況選擇是否unparkSuccessor后續(xù)節(jié)點

doAcquireInterruptibly不需要返回值,因為該函數(shù)中如果檢測到了中斷狀態(tài),就直接拋出異常就好了。

doAcquireInterruptibly方法的finally塊是可能會執(zhí)行到cancelAcquire(node)的,而acquireQueued方法不可能去執(zhí)行cancelAcquire(node)的。在doAcquireInterruptibly方法中,如果線程阻塞在parkAndCheckInterrupt這里后,別的線程來中斷阻塞線程,阻塞線程會被喚醒,然后拋出異常。本來拋出異常該函數(shù)就馬上結(jié)束掉的,但由于有finally塊,所以在結(jié)束掉之前會去執(zhí)行finally塊,并且由于failed為true,則會執(zhí)行cancelAcquire(node)。

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    if (!tryAcquire(arg)) doAcquireInterruptibly(arg);
}

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}
202162111
private void cancelAcquire(Node node) {
    if (node == null) return;
    node.thread = null;
    Node pred = node.prev;
    while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 跳過CANCELLED的節(jié)點

    // 執(zhí)行完循環(huán),pred會指向node的有效前驅(qū)
    Node predNext = pred.next;

    // 如果別的線程在執(zhí)行這步之后,別的線程將會跳過這個node。
    // 如果別的線程在執(zhí)行這步之前,別的線程還是會將這個node當(dāng)作有效節(jié)點。
    node.waitStatus = Node.CANCELLED;

    // 如果node是隊尾,直接設(shè)置pred為隊尾,然后設(shè)置pred的后繼為null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

tryAcquireNanos(int arg, long nanosTimeout)

tryAcquireNanos這個方法與不響應(yīng)中斷的acquire方法對應(yīng)。同樣的,進(jìn)入這個方法后,會第一次進(jìn)行tryAcquire嘗試。但不同的,此tryAcquireNanos函數(shù)中,會先去檢測Thread.interrupted(),并拋出異常。

但注意,對于tryAcquireNanos這個方法而言,既可以是公平的,也可以是不公平的,這完全取決于tryAcquire的實現(xiàn)(即取決于ReentrantLock當(dāng)初是怎么構(gòu)造的)。

差別

每次循環(huán)都會檢查時間是否到達(dá)deadline。
當(dāng)剩余時間小于spinForTimeoutThreshold時,則不能調(diào)用LockSupport.parkNanos,因為時間太短,反而無法精確控制阻塞時間,所以不如在剩余的時間里一直循環(huán)。
LockSupport.parkNanos除了會因為別人的park而喚醒,也會因為別人的中斷而喚醒,當(dāng)然最重要的,時間到了,它自己會喚醒。
不管哪種情況,被喚醒后,都會檢查中斷狀態(tài)。每個循環(huán)都會檢查一次。

如果中斷,也同樣進(jìn)入cancelAcquire方法

final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted()) throw new InterruptedException();
    return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout);
}

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L) return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted()) throw new InterruptedException();
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

acquireShared

共享鎖與獨占鎖的區(qū)別

  • 獨占鎖是線程獨占的,同一時刻只有一個線程能擁有獨占鎖,AQS里將這個線程放置到exclusiveOwnerThread成員上去。
  • 共享鎖是線程共享的,同一時刻能有多個線程擁有共享鎖,但AQS里并沒有用來存儲獲得共享鎖的多個線程的成員。
  • 如果一個線程剛獲取了共享鎖,那么在其之后等待的線程也很有可能能夠獲取到鎖。但獨占鎖不會這樣做,因為鎖是獨占的。
  • 當(dāng)然,如果一個線程剛釋放了鎖,不管是獨占鎖還是共享鎖,都需要喚醒在后面等待的線程

流程

  • 創(chuàng)建的節(jié)點不同。共享鎖使用addWaiter(Node.SHARED),所以會創(chuàng)建出想要獲取共享鎖的節(jié)點。而獨占鎖使用addWaiter(Node.EXCLUSIVE)。
  • 獲取鎖成功后的善后操作不同。共享鎖使用setHeadAndPropagate(node, r),因為剛獲取共享鎖成功后,后面的線程也有可能成功獲取,所以需要在一定條件喚醒head后繼。而獨占鎖使用setHead(node)。
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
}
protected int tryAcquireShared(int arg) {
    throw new UnsupportedOperationException();
}
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted) selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed) cancelAcquire(node);
    }
}

/**
setHead函數(shù)只是將剛成為將成為head的節(jié)點變成一個dummy node。
而setHeadAndPropagate里也會調(diào)用setHead函數(shù)。
但是它在一定條件下還可能會調(diào)用doReleaseShared
“如果一個線程剛獲取了共享鎖,那么在其之后等待的線程也很有可能能夠獲取到鎖”。
*/
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared()) doReleaseShared();
    }
}

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

/**
* 復(fù)原中斷狀態(tài),雖然這個版本的函數(shù)不用響應(yīng)中斷。
* 當(dāng)acquireQueued返回真時,代表這期間函數(shù)曾經(jīng)檢測到過中斷狀態(tài),并且將中斷狀態(tài)消耗掉了
* 所以需要在退出acquire之前,將中斷狀態(tài)重新設(shè)置上。
*/
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

releaseShared(int arg)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

protected boolean tryReleaseShared(int arg) {
    throw new UnsupportedOperationException();
}


// 在獲取共享鎖成功時,也可能會調(diào)用到doReleaseShared。
private void doReleaseShared() {
    for (; ; ) {
        Node h = head;
        // 如果隊列從來沒有初始化過(head為null),或者h(yuǎn)ead就是tail,則直接判斷head是否變化過。
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue;
                unparkSuccessor(h);
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) {
                continue;
            }
        }
        /*
        循環(huán)檢測到head沒有變化時就會退出循環(huán)
        head變化一定是因為acquire thread被喚醒,之后它成功獲取鎖,然后setHead設(shè)置了新head
        保證了只要在某個循環(huán)的過程中有線程剛獲取了鎖且設(shè)置了新head,就會再次循環(huán)
        目的當(dāng)然是為了再次執(zhí)行unparkSuccessor(h),即喚醒隊列中第一個等待的線程
        */
        if (h == head) break;
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容