AQS和ReentrantLock

1 概述

哪里使用AQS?我們最常用的ReentrantLock類其實就是使用CAS和AQS來實現(xiàn)的。

ReentrantLock的構造方法中,sync對象其實就是繼承了AbstractQueuedSynchronizer(AQS)。

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
  ....
}
public ReentrantLock() {
    sync = new NonfairSync();
}

AQS翻譯過來是,抽象隊列同步器,其定義了一套多線程訪問共享變量的同步器框架。

內(nèi)部類Sync就是自定義同步器,采用內(nèi)部類的方式也是作者建議的。

AbstractQueuedSynchronizer的源碼里作者還給了一個圖。

  * <pre>
  *      +------+  prev +-----+       +-----+
  * head |      | <---- |     | <---- |     |  tail
  *      +------+       +-----+       +-----+
  * </pre>

畫詳細點

FIFO

2 state

state我們可以理解成為表示當前同步情況的一個狀態(tài)變量。state是使用volatile修飾的一個變量,它的值代表鎖的狀態(tài),state > 0表示鎖被使用,state < 0表示鎖被釋放。

private volatile int state;
protected final int getState() {
    return state;
}
protected final void setState(int newState) {
    state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

getState用于獲取state的值,比如一個線程調(diào)用該方法查看state的值是不是0,來判斷當前是不是鎖定的狀態(tài)。

setState用來直接將state的值修改成其他,比如重入鎖重入的時候將state的值+1。

compareAndSetState使用CAS的方式修改state的值,比如線程獲取到鎖嘗試修改state的值為1。

state有兩種共享方式:Exclusive(獨占,只能有一個線程使用state)和 Share(共享,多個線程可以同時使用state)。

3 CLH同步隊列

這里的CLH同步隊列是一個FIFO(先進先出的)雙向隊列,元素的節(jié)點類型為Node,并且通過head和tail來記錄隊首和隊尾的元素。Node節(jié)點對象用來維護需要獲取鎖的線程,當前一個節(jié)點釋放鎖的時候,該節(jié)點的線程就會被喚醒。

private transient volatile Node head;
private transient volatile Node tail;
static final class Node {
  static final Node SHARED = new Node();
  static final Node EXCLUSIVE = null;

  static final int CANCELLED =  1;
  static final int SIGNAL    = -1;
  static final int CONDITION = -2;
  static final int PROPAGATE = -3;

  volatile int waitStatus;

  volatile Node prev;

  volatile Node next;

  volatile Thread thread;

  Node nextWaiter;

  final boolean isShared() {
    return nextWaiter == SHARED;
  }

  final Node predecessor() throws NullPointerException {
    Node p = prev;
    if (p == null)
      throw new NullPointerException();
    else
      return p;
  }

  Node() {    // Used to establish initial head or SHARED marker
  }

  Node(Thread thread, Node mode) {     // Used by addWaiter
    this.nextWaiter = mode;
    this.thread = thread;
  }

  Node(Thread thread, int waitStatus) { // Used by Condition
    this.waitStatus = waitStatus;
    this.thread = thread;
  }
}

說明一下幾個重要的常量和變量:

  1. SHARED:表示被標記的線程是因為以共享的方式獲取state失敗而進入阻塞隊列的;

  2. EXCLUSIV:表示被標記的線程是因為以獨占的方式獲取state失敗而進入阻塞隊列的;

  3. waitStatus:表示線程等待的狀態(tài)

    static final int CANCELLED = 1; 表示線程因為中斷或者等待超時,需要從等待隊列中取消等待;
    static final int SIGNAL = -1; 鎖被占用,隊列中的head(僅僅代表頭結點,里面沒有存放線程引用)的后繼結點node1處于等待狀態(tài),如果已占有鎖的線程釋放鎖或被CANCEL之后就會通知這個結點node1去獲取鎖執(zhí)行。
    static final int CONDITION = -2; 表示結點在等待隊列中(這里指的是等待在某個lock的condition上,關于Condition的原理下面會寫到),當持有鎖的線程調(diào)用了Condition的signal()方法之后,結點會從該condition的等待隊列轉移到該lock的同步隊列上,去競爭lock。(注意:這里的同步隊列就是我們說的AQS維護的FIFO隊列,等待隊列則是每個condition關聯(lián)的隊列)
    static final int PROPAGATE = -3; 表示下一次共享狀態(tài)獲取將會傳遞給后繼結點獲取這個共享同步狀態(tài)。

我們可以通過繼承AbstractQueuedSynchronizer類來自己實現(xiàn)一個鎖,使用的時候需要重寫一些指定的方法(模板方法模式)。

//獨占式的獲取同步狀態(tài),實現(xiàn)該方法需要查詢當前狀態(tài)并判斷同步狀態(tài)是否符合預期,然后再進行CAS設置同步狀態(tài)
protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
//獨占式的釋放同步狀態(tài),等待獲取同步狀態(tài)的線程可以有機會獲取同步狀態(tài)
protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
//共享式的獲取同步狀態(tài)
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
//嘗試將狀態(tài)設置為以共享模式釋放同步狀態(tài)。 該方法總是由執(zhí)行釋放的線程調(diào)用。 
protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
//當前同步器是否在獨占模式下被線程占用,一般該方法表示是否被當前線程所獨占
protected int isHeldExclusively(int arg) {  throw new UnsupportedOperationException();}

4 使用ReentrantLock分析

我們通過ReentrantLock來幫助了解一下加鎖和釋放鎖時state和CLH隊列的具體操作情況。

4.1 先來看lock方法

final void lock() {
  if (compareAndSetState(0, 1))
    setExclusiveOwnerThread(Thread.currentThread());
  else
    acquire(1);
}

很好理解,state沒有被占用的時候,當前線程使用鎖,并將獨占鎖的線程持有者設置為自己。

否則,調(diào)用acquire方法。

public final void acquire(int arg) {
  if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}

第一個條件

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    //(1)獲取當前線程
    final Thread current = Thread.currentThread();
    //(2)獲得當前同步狀態(tài)state
    int c = getState();
    //(3)如果state==0,表示沒有線程獲取
    if (c == 0) {
        //(3-1)那么就嘗試以CAS的方式更新state的值
        if (compareAndSetState(0, acquires)) {
            //(3-2)如果更新成功,就設置當前獨占模式下同步狀態(tài)的持有者為當前線程
            setExclusiveOwnerThread(current);
            //(3-3)獲得成功之后,返回true
            return true;
        }
    }
    //(4)這里是重入鎖的邏輯
    else if (current == getExclusiveOwnerThread()) {
        //(4-1)判斷當前占有state的線程就是當前來再次獲取state的線程之后,就計算重入后的state
        int nextc = c + acquires;
        //(4-2)這里是風險處理
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        //(4-3)通過setState無條件的設置state的值,(因為這里也只有一個線程操作state的值,即
        //已經(jīng)獲取到的線程,所以沒有進行CAS操作)
        setState(nextc);
        return true;
    }
    //(5)沒有獲得state,也不是重入,就返回false
    return false;
}

第一個條件總結來說就是再次嘗試占用state,如果成功當前線程使用鎖,如果失敗并且當前線程和正在使用鎖的線程是同一個,那么重入該鎖。如果都不是,返回false。

第二個條件又分為幾個方法,我們都看一下

private Node addWaiter(Node mode) {
    //(1)將當前線程以及阻塞原因(是因為SHARED模式獲取state失敗還是EXCLUSIVE獲取失敗)構造為Node結點
    Node node = new Node(Thread.currentThread(), mode);
    //(2)這一步是快速將當前線程插入隊列尾部
    Node pred = tail;
    if (pred != null) {
        //(2-1)將構造后的node結點的前驅結點設置為tail
        node.prev = pred;
        //(2-2)以CAS的方式設置當前的node結點為tail結點
        if (compareAndSetTail(pred, node)) {
            //(2-3)CAS設置成功,就將原來的tail的next結點設置為當前的node結點。這樣這個雙向隊
            //列就更新完成了
            pred.next = node;
            return node;
        }
    }
    //(3)執(zhí)行到這里,說明要么當前隊列為null,要么存在多個線程競爭失敗都去將自己設置為tail結點,
    //那么就會有線程在上面(2-2)的CAS設置中失敗,就會到這里調(diào)用enq方法
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        //(4)還是先獲取當前隊列的tail結點
        Node t = tail;
        //(5)如果tail為null,表示當前同步隊列為null,就必須初始化這個同步隊列的head和tail(建
        //立一個哨兵結點)
        if (t == null) { 
            //(5-1)初始情況下,多個線程競爭失敗,在檢查的時候都發(fā)現(xiàn)沒有哨兵結點,所以需要CAS的
            //設置哨兵結點
            if (compareAndSetHead(new Node()))
                tail = head;
        } 
        //(6)tail不為null
        else {
            //(6-1)直接將當前結點的前驅結點設置為tail結點
            node.prev = t;
            //(6-2)前驅結點設置完畢之后,還需要以CAS的方式將自己設置為tail結點,如果設置失敗,
            //就會重新進入循環(huán)判斷一遍
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //在這樣一個循環(huán)中嘗試tryAcquire同步狀態(tài)
        for (;;) {
            //獲取前驅結點
            final Node p = node.predecessor();
            //(1)如果前驅結點是頭節(jié)點,就嘗試取獲取同步狀態(tài),這里的tryAcquire方法相當于還是調(diào)
            //用NofairSync的tryAcquire方法,在上面已經(jīng)說過
            if (p == head && tryAcquire(arg)) {
                //如果前驅結點是頭節(jié)點并且tryAcquire返回true,那么就重新設置頭節(jié)點為node
                setHead(node);
                p.next = null; //將原來的頭節(jié)點的next設置為null,交由GC去回收它
                failed = false;
                return interrupted;
            }
            //(2)如果不是頭節(jié)點,或者雖然前驅結點是頭節(jié)點但是嘗試獲取同步狀態(tài)失敗就會將node結點
            //的waitStatus設置為-1(SIGNAL),并且park自己,等待前驅結點的喚醒。至于喚醒的細節(jié)
            //下面會說到
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //(1)獲取前驅結點的waitStatus
    int ws = pred.waitStatus;
    //(2)如果前驅結點的waitStatus為SINGNAL,就直接返回true
    if (ws == Node.SIGNAL)
        //前驅結點的狀態(tài)為SIGNAL,那么該結點就能夠安全的調(diào)用park方法阻塞自己了。
        return true;
    if (ws > 0) {
        //(3)這里就是將所有的前驅結點狀態(tài)為CANCELLED的都移除
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //CAS操作將這個前驅節(jié)點設置成SIGHNAL。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

模擬一下,

  1. 假設線程A已經(jīng)持有鎖,并將state修改成1;
  2. 線程B通過lock方法進入到addWaiter,此時還沒有維護隊列,tail=nill;
  3. 進入enq方法的自旋中;
  4. 第一次循環(huán) t = tail = null,進入if條件,head = tail = new Node();
  5. 第二次循環(huán) t = tail = new Node(),進入else,將node(線程B)的上一個節(jié)點設置為t,將node設置為新的tail,將t的下一個節(jié)點設置為node,退出循環(huán);
  6. 進入到acquireQueued方法,還是自旋,先獲取node的上一個結點;
  7. 第一次循環(huán),判斷上一個節(jié)點是head并且當前線程B可以獲取到state;
  8. 第7步通過,將head設置為node,返回false;
  9. 顯然第7步不通過,進入下面的條件,先進入shouldParkAfterFailedAcquire方法;
  10. 判斷上一個節(jié)點(pred)的waitStatus不是 -1,進入else,將上一個節(jié)點(pred)的waitStatus設置為 -1,返回false;
  11. 第二次循環(huán),進入shouldParkAfterFailedAcquire方法,由于上一次循環(huán)修改waitStatus為-1,直接返回true;
  12. 再看parkAndCheckInterrupt方法,將線程B pack阻塞;
  13. 自旋并沒有結束,只是被掛起了,這個自旋在后面還有用。

這時,隊列是這樣的

thread B

再來一個線程C,變成這樣

thread C

4.2 再來看unlock方法

public void unlock() {
    sync.release(1); //這里ReentrantLock的unlock方法調(diào)用了AQS的release方法
}
public final boolean release(int arg) {
    //這里調(diào)用了子類的tryRelease方法,即ReentrantLock的內(nèi)部類Sync的tryRelease方法
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先我們能看到,重入鎖在獲取鎖和釋放鎖的時候對state的操作參數(shù)都是1,這也就是為什么lock了幾次就要unlock幾次的原因。

protected final boolean tryRelease(int releases) {
    //(1)獲取當前的state,然后減1,得到要更新的state
    int c = getState() - releases;
    //(2)判斷當前調(diào)用的線程是不是持有鎖的線程,如果不是拋出IllegalMonitorStateException
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //(3)判斷更新后的state是不是0
    if (c == 0) {
        free = true;
        //(3-1)將當前鎖持者設為null
        setExclusiveOwnerThread(null);
    }
    //(4)設置當前state=c=getState()-releases
    setState(c);
    //(5)只有state==0,才會返回true
    return free;
}
private void unparkSuccessor(Node node) {
    //(1)獲得node的waitStatus
    int ws = node.waitStatus;
    //(2)判斷waitStatus是否小于0
    if (ws < 0)
        //(2-1)如果waitStatus小于0需要將其以CAS的方式設置為0
        compareAndSetWaitStatus(node, ws, 0);

    //(2)獲得s的后繼結點,這里即head的后繼結點
    Node s = node.next;
    //(3)判斷后繼結點是否已經(jīng)被移除,或者其waitStatus==CANCELLED
    if (s == null || s.waitStatus > 0) {
        //(3-1)如果s!=null,但是其waitStatus=CANCELLED需要將其設置為null
        s = null;
        //(3-2)會從尾部結點開始尋找,找到離head最近的不為null并且node.waitStatus的結點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //(4)node.next!=null或者找到的一個離head最近的結點不為null
    if (s != null)
        //(4-1)喚醒這個結點中的線程
        LockSupport.unpark(s.thread);
}

我們還是分析一下代碼

  1. 進入tryRelease方法,將state的值 -1 ,如果state = 0 就返回true,否則返回 false;
  2. 進入unparkSuccessor方法,先將head的waitStatus修改成0,在判斷head的后續(xù)節(jié)點的waitStatus是不是 < 0 ;
  3. 小于 0 就喚醒,否則一直向下查找直到出現(xiàn)waitStatus小于 0 的節(jié)點,并且將它喚醒;

后面程序是怎么執(zhí)行的呢?

  1. 回憶一下上面的lock邏輯,這時候線程B被喚醒了,繼續(xù)自旋;
  2. 線程B獲取state成功,就會從acquireQueued方法中退出,最后執(zhí)行自己鎖住的代碼塊中的程序。
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容