ReentrantLock

類信息

  • 內(nèi)部抽象類:Sync 繼承 AbstractQueuedSynchronizer
  • 內(nèi)部類:非公平鎖:NonfairSync 繼承 Sync

加鎖過程

lock():CAS修改同步狀態(tài) 成功則設(shè)置exclusiveOwnerThread為當(dāng)前線程,然后執(zhí)行業(yè)務(wù),失敗則調(diào)用acquire(1);

final void lock() {
        //設(shè)置AbstractQueuedSynchronizer的state為1
        if (compareAndSetState(0, 1))
            //成功后設(shè)置AbstractOwnableSynchronizer的thread為當(dāng)前線程
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //失敗時(shí)線程阻塞
            acquire(1);
    }

acquire(1): AbstractQueuedSynchronizer中的方法,執(zhí)行tryAcquire(1),acquireQueued(Node, 1),addWaiter(Node.EXCLUSIVE),接下來依次看這三個(gè)方法;

public final void acquire(int arg) {
    //嘗試獲取鎖,如果獲取成功則已,不成功則加入等待隊(duì)列
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire(1):執(zhí)行nonfairTryAcquire(acquires),即Sync類的方法nonfairTryAcquire

protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }

nonfairTryAcquire(acquires):非公平鎖獲取

final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        //再次獲取同步狀態(tài),如果是0則獲取鎖執(zhí)行
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        /**
         * 如果是當(dāng)前線程是已經(jīng)獲取鎖的線程
         * 讓某個(gè)線程可以多次調(diào)用同一個(gè)ReentrantLock,每調(diào)用一次給state+1,
         * 由于某個(gè)線程已經(jīng)持有了鎖,所以這里不會(huì)有競爭,
         * 因此不需要利用CAS設(shè)置state(相當(dāng)于一個(gè)偏向鎖)。從這段代碼可以看到,nextc每次加1,
         * 當(dāng)nextc<0的時(shí)候拋出error,那么同一個(gè)鎖最多能重入Integer.MAX_VALUE次,也就是2147483647。
         */
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }

addWaiter(Node.EXCLUSIVE):AbstractQueuedSynchronizer中的方法,為當(dāng)前線程和指定模式創(chuàng)建并擴(kuò)充一個(gè)等待隊(duì)列。Node.EXCLUSIVE標(biāo)記表示節(jié)點(diǎn)正在獨(dú)占模式下等待

private Node addWaiter(Node mode) {
    //為當(dāng)前線程創(chuàng)建一個(gè)等待節(jié)點(diǎn)
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    //將需要等待的線程加入到等待隊(duì)列,如果沒有隊(duì)列則新建執(zhí)行enq
    //查看等待隊(duì)列的尾部節(jié)點(diǎn)是否為空
    Node pred = tail;
    if (pred != null) {
        //設(shè)置當(dāng)前線程的節(jié)點(diǎn)為尾節(jié)點(diǎn),擴(kuò)充隊(duì)列,并返回當(dāng)前線程節(jié)點(diǎn)
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

enq(node):判斷尾節(jié)點(diǎn)是否為空,為空則初始化頭尾節(jié)點(diǎn)(如果尾節(jié)點(diǎn)為空,則頭節(jié)點(diǎn)一定為空,頭節(jié)點(diǎn)為懶加載,只有隊(duì)列中插入第一個(gè)節(jié)點(diǎn)時(shí)才初始化),否則插入新節(jié)點(diǎn)

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued(Node, 1):加入隊(duì)列的節(jié)點(diǎn)再次嘗試獲取鎖,否則進(jìn)入阻塞狀態(tài)

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //再次嘗試獲取鎖
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                //獲取鎖 --->  清除節(jié)點(diǎn)信息,設(shè)置為隊(duì)列頭結(jié)點(diǎn)
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果獲取不到鎖
            // 1、shouldParkAfterFailedAcquire:判斷是否進(jìn)入阻塞,進(jìn)入等待
            // 2、parkAndCheckInterrupt 進(jìn)入等待,中斷線程
            // 條件滿足時(shí),修改中斷狀態(tài),再次進(jìn)入循環(huán)獲取鎖
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire(p, node):判斷是否進(jìn)入阻塞或者進(jìn)入等待,通過判斷當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)狀態(tài),如果是SIGNAL表示自己被阻塞,返回true;如果小于0,則表示前驅(qū)節(jié)點(diǎn)為取消狀態(tài),跳過,直到鏈接到不是取消狀態(tài)的節(jié)點(diǎn),返回false;如果兩種都不符合,則通過CAS修改前驅(qū)節(jié)點(diǎn)狀態(tài)為Node.SIGNAL,返回false.

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前驅(qū)節(jié)點(diǎn)的等待狀態(tài)
    int ws = pred.waitStatus;
    //判斷前驅(qū)節(jié)點(diǎn)狀態(tài)
    if (ws == Node.SIGNAL)
        /*
         * 此節(jié)點(diǎn)已設(shè)置狀態(tài),要求釋放信號(hào),因此可以安全停放。
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            //跳過狀態(tài)值為1的Node節(jié)點(diǎn)
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         * 更新狀態(tài)
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt():

//禁用當(dāng)前線程進(jìn)入等待狀態(tài)并中斷線程本身
private final boolean parkAndCheckInterrupt() {
    //除非許可證可用,否則禁用當(dāng)前線程以進(jìn)行線程調(diào)度。
    LockSupport.park(this);
    return Thread.interrupted();
}

cancelAcquire(node):如果發(fā)生異常,取消正在進(jìn)行的Node獲取鎖的嘗試

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    //跳過取消的前驅(qū)節(jié)點(diǎn)
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    //設(shè)置此節(jié)點(diǎn)的狀態(tài)為CANCELLED,代表取消狀態(tài)
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    //如果node是tail,更新tail為pred,并使pred.next指向null
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        //如果node既不是tail,又不是head的后繼節(jié)點(diǎn)
        //則將node的前繼節(jié)點(diǎn)的waitStatus置為SIGNAL
        //并使node的前繼節(jié)點(diǎn)指向node的后繼節(jié)點(diǎn)(相當(dāng)于將node從隊(duì)列中刪掉了)
        int ws;
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            //如果node是head的后繼節(jié)點(diǎn),則直接喚醒node的后繼節(jié)點(diǎn)
            //喚醒后繼節(jié)點(diǎn)線程后,在做出隊(duì)操作時(shí)將node節(jié)點(diǎn)刪除
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容