AbstractQueuedSynchronizer 獨占鎖分析

獨占鎖, 即一次允許一個線程進入臨界區(qū)。

加鎖流程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

這里首先調(diào)用tryAcquire直接去嘗試獲取鎖, 成功就返回了。
如果失敗, 這時候調(diào)用addWaiter將自己加入到等待隊列,然后調(diào)用acquireQueued開始自旋或者阻塞等待加鎖成功, acquireQueued返回false時表示加鎖成功直接返回, 返回true時表示被設(shè)置中斷標(biāo)記了, 那么調(diào)用selfInterrupt中斷自己。

tryAcquire

這是具體鎖需要實現(xiàn)的方法, 一般是使用CAS增加同步變量, 成功者表示獲得鎖, 返回true, 下面是ReentrantLock的實現(xiàn):

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }  else if (current == getExclusiveOwnerThread()) { 
            //這里處理重入的情況
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

加入同步等待隊列

同步等待隊列是一個CLH隊列的變體,所以加入隊列的方式很簡單,使用CAS設(shè)置tail指向自己,代碼如下:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

首先根據(jù)模式和當(dāng)前線程創(chuàng)建一個Node對象,如果tail不為空,那么做一次快速設(shè)置的嘗試:將node.prev指向隊列尾, 然后使用CAS設(shè)置tail指向node,設(shè)置成功的話,將原隊列尾的節(jié)點的next指向node,插入成功直接返回。如果設(shè)置tail失敗,那么進入enq函數(shù)。

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

如果tail指向空,表示隊列是空的,這時候需要初始化下隊列(創(chuàng)建一個空的Node),然后把tail指向head。
如果tail不為空,和上面的樣嘗試插入,不成功循環(huán)繼續(xù), 直到插入成功為止。

自旋或者阻塞等待

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

加入同步等待隊列以后,線程就可以進入等待狀態(tài)了, 當(dāng)前節(jié)點是循環(huán)讀取前驅(qū)節(jié)點的狀態(tài)來處理的:

  1. 前驅(qū)節(jié)點是head, 那么就嘗試去獲取鎖, 獲取成功的話,就出隊列(將自己設(shè)置為head),返回中斷狀態(tài);
  2. 如果前驅(qū)節(jié)點不是head, 或者是加鎖失敗了, 調(diào)用shouldParkAfterFailedAcquire檢查是否應(yīng)該阻塞自己:
    2.1 如果需要阻塞,就調(diào)用parkAndCheckInterrupt阻塞自己,這時候線程暫停了,再次恢復(fù)時,是被其他線程喚醒了自己,這時候需要檢查下線程的中斷狀態(tài),如果是已經(jīng)中斷,那么設(shè)置返回結(jié)果為true, 在獲取到鎖返回后, 線程會調(diào)用selfInterrupt中斷自己。
    2.2 如果不需要阻塞,那么就繼續(xù)循環(huán)。
  3. 流程處理失?。ū热鐠伋霎惓#?需要調(diào)用cancelAcquire,取消節(jié)點加鎖請求。

檢查是否應(yīng)該阻塞自己

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

是否應(yīng)該阻塞自己是根據(jù)前驅(qū)節(jié)點的狀態(tài)判斷的:

  1. 前驅(qū)節(jié)點狀態(tài)是SIGNAL, 那么本節(jié)點需要阻塞, 返回true;
  2. 前驅(qū)節(jié)點狀態(tài)>0(只有CANCEL狀態(tài)),說明前驅(qū)節(jié)點被取消了, 這時候往前搜索,找到一個非取消節(jié)點,把當(dāng)前節(jié)點鏈接上去,返回false表明不需要阻塞自己;
  3. 前驅(qū)節(jié)點的狀態(tài)是0或者PROPAGATE,嘗試設(shè)置前驅(qū)狀態(tài)為SIGNAL, 返回false表明不需要阻塞自己;

阻塞自己

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

解鎖流程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

調(diào)用tryRelease嘗試釋放自己, 如果成功的話,判斷自己的狀態(tài),不為0(說明可能是CANCEL,SIGNAL或者PROPAGATE)時調(diào)用unparkSuccessor喚醒后繼線程。

tryRelease

這是具體獨占鎖需要實現(xiàn)的方法,一般而言是把加上去的同步變量值減回來, 釋放成功(同步變量值為0)時,返回true。 下面是ReentrantLock的實現(xiàn):

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

喚醒后繼者

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
  1. 如果節(jié)點的狀態(tài)是SIGNAL、PROPAGATE, 那么將自己設(shè)置為0 。
  2. 檢查后繼節(jié)占是否為null或者是取消狀態(tài),如果是的話, 找到非取消的后繼節(jié)點(因為node.next為null并不表示真的沒有后繼節(jié)點,需要使用tail從后往前遍歷一次)。
  3. 如果有非取消的后繼節(jié)點,對非取消狀態(tài)的后繼節(jié)點調(diào)用LockSupport.unpark喚醒線程, 后繼節(jié)點被喚醒后,會檢查前驅(qū)的狀態(tài)進行處理。

取消流程


   private void cancelAcquire(Node node) {
        if (node == null)
            return;
        node.thread = null;
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        Node predNext = pred.next;
        node.waitStatus = Node.CANCELLED;

        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }
            node.next = node; // help GC
        }
    }
  1. 清空節(jié)點的thread ;
  2. 循環(huán)往前找到非取消節(jié)點,pred指向非取消節(jié)點;
  3. 使predNext指向pred的下個節(jié)點(這個節(jié)點一定是取消節(jié)點);
  4. 設(shè)置當(dāng)前節(jié)點狀態(tài)為CANCELLED, 接下來其他線程的操作都會跳過本節(jié)點;
  5. 如果當(dāng)前節(jié)點是尾節(jié)點, 那么直接設(shè)置pred為尾節(jié)點;
  6. 如果pred是頭節(jié)點了, 那么直接調(diào)用unparkSuccessor喚醒本節(jié)點的后繼節(jié)點;
  7. 如果pred節(jié)點非頭節(jié)點,狀態(tài)為SIGNAL狀態(tài)(或者非取消狀態(tài)下設(shè)置狀態(tài)為SIGNAL成功),thread不為空,本節(jié)點的后繼是非取消狀態(tài),這些條件下使用CAS設(shè)置pred節(jié)點的后繼為當(dāng)前節(jié)點的后繼。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容