AQS研究系列(三)--AbstractQueuedSynchronizer源碼分析

目標

分析aqs中頂層方法實現,主要包括方法:

  1. acquire 外層獲取鎖方法
  2. acquireQueued 節(jié)點以自旋的方式獲取同步狀態(tài),如果獲取同步狀態(tài)失敗,要掛起線程
  3. addWaiter 將當前線程包裝成node節(jié)點添加到等待隊列
  4. shouldParkAfterFailedAcquire 處理當前線程是否應該掛起
  5. cancelAcquire 對于最終未成功獲取到同步鎖狀態(tài)的情況進行移除等待隊列處理

主流程時序圖

aqs1.jpg

分析下

aqs頂層方法的入口為acquire,看下源碼:

   public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

尋著流程圖,看到首先調用的tryAcquire方法(但此方法是其具體鎖實現類實現的,如我們常用的ReentrantLock類,但此方法我們先略過不管),此方法含義為當前線程去嘗試獲取資源,當獲取失敗時調用acquireQueued方法進入等待隊列,相當于進入鎖池,看下acquireQueued方法:

  final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
          //1.不斷自旋重試或線程等待,直到結果出來
            for (;;) {
                final Node p = node.predecessor();
          //2. 當前節(jié)點為頭節(jié)點,調用tryAcquire嘗試獲取資源,如果成功直接返回,并把當前節(jié)點置為頭節(jié)點
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
            //3. 上一步未成功,判斷當前階段是否應該進入線程等待狀態(tài),如果線程進入等待,進入等待池.
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
            //4. 等待被喚醒后標注此線程被中斷過
                    interrupted = true;
            }
        } finally {
            if (failed)
              //5. 不斷試驗后獲取鎖失敗,進行放棄等待邏輯,將此節(jié)點從隊列中移除
                cancelAcquire(node);
        }
    }

看下shouldParkAfterFailedAcquire方法,什么情況下滿足線程等待:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //1. 前節(jié)點狀態(tài)為Node.SIGNAL,代表此節(jié)點已經標記,當他資源釋放后,喚起后面的節(jié)點,所以本節(jié)點可以安全park
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
    //2. 當狀態(tài)>0,表述前節(jié)點已經放棄等待,所以遞歸前移,找到可用節(jié)點
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        //3. 其他情況,cas嘗試將前節(jié)點狀態(tài)設置為Node.SIGNAL狀態(tài),表示前節(jié)點結束后喚醒當前節(jié)點,但本次并不阻塞.
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

上面就是嘗試獲取隊列,失敗進行阻塞的邏輯.其中還有個addWaiter(Node.EXCLUSIVE)方法,此方法用于將當前線程包裝成一個Node,并放入隊列中:

   private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
          // 1. 嘗試將當前節(jié)點設為尾節(jié)點,成功后,返回此節(jié)點
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
      //2. 如果設置失敗,此方法進行不斷自旋(不斷循環(huán)到原有邏輯進行重試),嘗試加入尾幾點
        enq(node);
        return node;
    }

對應自旋邏輯 enq(node);

 private Node enq(final Node node) {
  //不斷循環(huán)自旋重試
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
              //當隊列為空時,必須有個首節(jié)點,進行初始化
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
              //嘗試再次設置尾節(jié)點,失敗后將會循環(huán)重試,成功將返回此節(jié)點
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

好了,上面就是頂層aqs抽象類定義的方法.

引申出tryAcquire方法:

tryAcquire方法在頂層并未實現,具體實現是各個具體鎖實現的,如ReentrantLock中通過兩個內部類NonfairSync(非公平鎖)和FairSync(公平鎖)來實現,下面對比下這兩個類的此方法:

1. 公平鎖獲取方法
 protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            //1. 獲取當前資源狀態(tài)
            int c = getState();
          
            if (c == 0) {
            //2.通過hasQueuedPredecessors方法判斷是否有其他線程排在前邊等待隊列 
                if (!hasQueuedPredecessors() &&
            //3. 如果當前此線程排在最前邊,則將當前隊列狀態(tài)值賦值, acquires在獨占鎖中一直是1,所以就是0到1點變化
                    compareAndSetState(0, acquires)) {
          //4.如果狀態(tài)修改成功,則將當前資源擁有者改成當前線程,返回true
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
          //5. 上一步判斷c!=0代表已有線程擁有資源,判斷當前線程是否為此線程
            else if (current == getExclusiveOwnerThread()) {
          //6.如果是當前線程獲取的資源,此處重新獲取鎖,因為為可重入鎖,所以鎖次數加1
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

上面的分析,看出了公平鎖的實現,其中還有個重點,hasQueuedPredecessors方法是怎么判斷是否有其他隊列排在前邊:

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
      //1. 此方法需要返回false才代表沒有排在前邊的線程等待
      //2. 如果h!=t表示等待隊列有其他節(jié)點存在,否則h==t表示只有一個節(jié)點,直接返回false,表示當前線程可以獲取資源
      //3. 如果h!=t,則表示隊列中有線程在使用資源,繼續(xù)判斷(s = h.next) == null表示第二個節(jié)點為null,說明h的下一個節(jié)點異常,可能是在其他線程處理線程隊列過程中,直接返回true
      //4.最后一個判斷s.thread != Thread.currentThread(),在上面所有可能后,判斷s線程不是當前線程,則說明已有其他線程索取到鎖了
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

2. 好了,公平鎖完了,下面看下非公平鎖怎么實現:

 final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
        //.非公平鎖,不用判斷隊列中是否有其他節(jié)點,直接嘗試修改資源狀態(tài),如果成功,代表獲取到資源,直接返回
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

我們現在非公平鎖實現和公平鎖邏輯類似,只是少了一個hasQueuedPredecessors方法,不在去判斷是否前面有線程排隊而已.

引申出參數(int acquires)

在獨占鎖中, acquires一直為1,表示占有鎖到永遠只有一個,看下ReentrantLock的寫法:

  public boolean tryLock() {
        return sync.nonfairTryAcquire(1);
    }

而在CountDownLatch等共享鎖到實現中,此值也是1,但其實現類中,通過構造器添加共享鎖中的資源可用數.


   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

                ...........................................
        //初始設值,選擇共享鎖容量
   private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

好了,上面就分析完了aqs實現的重點邏輯.

AQS研究系列(一)--Unsafe使用
AQS研究系列(二)--線程狀態(tài)和interrupt()、interrupted()、isInterrupted等方法學習

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容