ReentrantLock加鎖源碼淺析

加鎖邏輯將分成三個(gè)部分來看:

  • 競(jìng)爭(zhēng)鎖
  • 加入等待隊(duì)列
  • 阻塞等待

1.競(jìng)爭(zhēng)鎖
我們先從公平鎖入手

public void lock() {
    // sync的實(shí)例是new FairSync()
    sync.acquire(1);
}
// 加鎖的代碼就是這幾行
public final void acquire(int arg) {
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

上述代碼可以拆分成以下幾段:

// 競(jìng)爭(zhēng)鎖
tryAcquire(arg)
// 加入等待隊(duì)列
addWaiter(Node.EXCLUSIVE)
// 阻塞等待
acquireQueued(node, arg)
  • 競(jìng)爭(zhēng)鎖
protected final boolean tryAcquire(int acquires) {
    // 獲取當(dāng)前線程
    final Thread current = Thread.currentThread();
    // 獲取當(dāng)前state狀態(tài)
    int c = getState();
    // 如果當(dāng)前state是沒有任何線程搶占的話
    if (c == 0) {
        // 如果等待隊(duì)列中有任何一個(gè)等待的節(jié)點(diǎn),都不會(huì)搶占鎖
        if (!hasQueuedPredecessors() &&
            // CAS搶占鎖成功
            compareAndSetState(0, acquires)) {
            // 搶占成功后,標(biāo)記當(dāng)前線程已經(jīng)搶占到鎖了。
            setExclusiveOwnerThread(current);
            // 返回加鎖成功
            return true;
        }
    }
    // 如果是同一個(gè)線程重復(fù)加鎖的情況下
    else if (current == getExclusiveOwnerThread()) {
        // 在這種情況下,只是簡(jiǎn)單地操作state
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 因?yàn)楫?dāng)前線程已經(jīng)加鎖成功了,再次加鎖的話,直接在state上增加加鎖次數(shù)即可。
        setState(nextc);
        // 返回加鎖成功
        return true;
    }
    // 如果已經(jīng)有別的線程加鎖了,或者還有很多線程在排隊(duì)等待,那么返回false加鎖失敗。
    return false;
}

上述代碼分幾部分:

  1. 如果當(dāng)前state=0,也就是沒有任何線程搶占鎖的情況下
    1.1: 沒有等待隊(duì)列的情況下,可以CAS搶占鎖
    1.2: 有等待隊(duì)列的話,該隊(duì)列中第一個(gè)等待節(jié)點(diǎn)不是當(dāng)前線程,不可以搶占鎖,因?yàn)檫@是公平鎖。


    image.png

    如果當(dāng)前等待隊(duì)列中還有任意節(jié)點(diǎn),并且當(dāng)前節(jié)點(diǎn)中的線程不是當(dāng)前線程,說明有其他線程處于等待過程中,那么當(dāng)前線程就應(yīng)該乖乖排隊(duì)去。

1.3: 有等待隊(duì)列,并且當(dāng)前第一個(gè)等待節(jié)點(diǎn)就是當(dāng)前線程,可以搶占鎖。這種情況會(huì)出現(xiàn)在線程剛從阻塞中被喚醒的時(shí)候。
image.png

假如當(dāng)前線程是被剛剛喚醒的,并且它處于等待隊(duì)列中的第一個(gè)等待的位置,那么這個(gè)時(shí)候是可以去搶占鎖的。

  1. 如果已經(jīng)搶占了鎖的線程就是當(dāng)前線程。這種情況我們叫做重入。
    示例如下:
ReentrantLock lock = new ReentrantLock();
try {
    // 加鎖
    lock.lock();
    // 執(zhí)行業(yè)務(wù)邏輯
    System.out.println("獲取的鎖");
    try {
        // 再次獲取鎖
        lock.lock();
        // 執(zhí)行業(yè)務(wù)邏輯
        System.out.println("再次獲取的鎖");
    } finally {
        // 解鎖
        lock.unlock();
    }
} finally {
    // 解鎖
    lock.unlock();
}

小結(jié)一下:

  1. 如果當(dāng)前鎖未被搶占,并且沒有其他線程等待,那么直接搶占鎖
  2. 如果當(dāng)前鎖未被搶占,有其他線程等待,不可用搶占鎖
  3. 如果當(dāng)前鎖被當(dāng)前線程搶占了,那么直接重入即可
  4. 不符合上述情況,直接加鎖失敗。也就是鎖被其他線程搶占了,或者目前還有其他線程處于等待中,都會(huì)導(dǎo)致公平鎖加鎖失敗。
// 判斷等待隊(duì)列中是否有其他線程等待
public final boolean hasQueuedPredecessors() {
    Node h, s;
    // 如果等待隊(duì)列頭節(jié)點(diǎn)不為空,說明等待隊(duì)列已經(jīng)創(chuàng)建出來了。否則直接返回false。
    if ((h = head) != null) {
        // 如果頭節(jié)點(diǎn)后面的節(jié)點(diǎn)為空,或者該節(jié)點(diǎn)的狀態(tài)是取消狀態(tài)
        if ((s = h.next) == null || s.waitStatus > 0) {
            s = null; // traverse in case of concurrent cancellation
            // 從后往前遍歷,直至最后一個(gè)狀態(tài)小于等于0的節(jié)點(diǎn)。只有小于等于0的節(jié)點(diǎn)才是正常的可以競(jìng)爭(zhēng)鎖的節(jié)點(diǎn)。
            for (Node p = tail; p != h && p != null; p = p.prev) {
                // 發(fā)現(xiàn)小于等于0的節(jié)點(diǎn),就賦值給s
                if (p.waitStatus <= 0)
                    s = p;
            }
        }
        // 如果最終得到的節(jié)點(diǎn)不為空。有可能當(dāng)前沒有任何等待的節(jié)點(diǎn),s=null。
        // 并且這個(gè)不為空的等待線程不是當(dāng)前線程。其實(shí)就是說明前面還有其他線程排隊(duì)。
        if (s != null && s.thread != Thread.currentThread())
            // 返回true,說明有其他線程在排隊(duì)。
            return true;
    }
    // 1.如果等待隊(duì)列不存在,直接返回false
    // 2.如果當(dāng)前等待隊(duì)列中,沒有任何其他節(jié)點(diǎn)的waitStatus<=0
    return false;
}

至此,線程競(jìng)爭(zhēng)鎖的邏輯就完畢了。

  • 加入等待隊(duì)列
private Node addWaiter(Node mode) {
    // 創(chuàng)建一個(gè)節(jié)點(diǎn),該節(jié)點(diǎn)默認(rèn)
    // waitStatus=0, thread=currentThread
    Node node = new Node(mode);
    // 開啟自旋
    for (;;) {
        // 取出尾節(jié)點(diǎn)
        Node oldTail = tail;
        // 如果尾節(jié)點(diǎn)不為空
        if (oldTail != null) {
            // 設(shè)置node的前一個(gè)節(jié)點(diǎn)為尾節(jié)點(diǎn)
            node.setPrevRelaxed(oldTail);
            // CAS把尾節(jié)點(diǎn)設(shè)置為node
            if (compareAndSetTail(oldTail, node)){
                // 如果CAS設(shè)置成功,那么就把oldTail的next引用設(shè)置成node
                oldTail.next = node;
                // 返回node節(jié)點(diǎn)
                return node;
            }
        } else {
            // 如果尾節(jié)點(diǎn)為null,說明等待隊(duì)列還不存在,這個(gè)時(shí)候就要準(zhǔn)備初始化等待隊(duì)列。
            // 初始化完畢后繼續(xù)自旋,最終把新創(chuàng)建的節(jié)點(diǎn)添加進(jìn)等待隊(duì)列
            initializeSyncQueue();
        }
    }
}
// 初始化等待隊(duì)列。其實(shí)是一個(gè)雙向鏈表,所以只要初始化head、tail節(jié)點(diǎn)即可。
private final void initializeSyncQueue() {
    Node h;
    // CAS設(shè)置head節(jié)點(diǎn)。如果head節(jié)點(diǎn)為null,就設(shè)置為new Node()。該node節(jié)點(diǎn)waitStatus=0,thread=null。
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        // 頭節(jié)點(diǎn)設(shè)置成功后,尾節(jié)點(diǎn)初始化為同一個(gè)節(jié)點(diǎn)。
        tail = h;
}
  1. 初始化等待隊(duì)列
// 初始化等待隊(duì)列。其實(shí)是一個(gè)雙向鏈表,所以只要初始化head、tail節(jié)點(diǎn)即可。
private final void initializeSyncQueue() {
    Node h;
    // CAS設(shè)置head節(jié)點(diǎn)。如果head節(jié)點(diǎn)為null,就設(shè)置為new Node()。該node節(jié)點(diǎn)waitStatus=0,thread=null。
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        // 頭節(jié)點(diǎn)設(shè)置成功后,尾節(jié)點(diǎn)初始化為同一個(gè)節(jié)點(diǎn)。
        tail = h;
}
image.png
  1. 添加新的節(jié)點(diǎn)
// 創(chuàng)建新節(jié)點(diǎn)
Node node = new Node(mode);
// 取出尾節(jié)點(diǎn)
Node oldTail = tail;
image.png
// 設(shè)置node的前一個(gè)節(jié)點(diǎn)為尾節(jié)點(diǎn)
node.setPrevRelaxed(oldTail);
// CAS把尾節(jié)點(diǎn)設(shè)置為node
if (compareAndSetTail(oldTail, node)){
// 如果CAS設(shè)置成功,那么就把oldTail的next引用設(shè)置成node
oldTail.next = node;
image.png

經(jīng)過上面幾步,新的節(jié)點(diǎn)就被添加到等待隊(duì)列中了。
有一個(gè)注意點(diǎn)需要提的是:

為什么判斷等待隊(duì)列是否存在,使用的是if(tail!=null),而不是if(head!=null)?

這個(gè)問題其實(shí)跟初始化等待隊(duì)列有關(guān)系,初始化的時(shí)候是使用CAS設(shè)置head節(jié)點(diǎn),成功后再設(shè)置tail節(jié)點(diǎn)。也就是說,隊(duì)列初始化完畢的標(biāo)識(shí)是tail!=null。
如果使用if(head!=null)來判斷隊(duì)列已經(jīng)存在,那么有可能此時(shí)tail還沒有初始化完畢。就會(huì)導(dǎo)致使用tail節(jié)點(diǎn)的時(shí)候空指針異常。

  • 阻塞等待
final boolean acquireQueued(final Node node, int arg) {
    // 默認(rèn)線程未被打斷
    boolean interrupted = false;
    try {
        // 開啟自旋
        for (;;) {
            // 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
            final Node p = node.predecessor();
            // 如果前一個(gè)節(jié)點(diǎn)是head節(jié)點(diǎn),那么就嘗試競(jìng)爭(zhēng)鎖
            if (p == head && tryAcquire(arg)) {
                // 競(jìng)爭(zhēng)鎖成功,把當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn)
                setHead(node);
                // 把前一個(gè)節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)斷開
                // 因?yàn)楫?dāng)前節(jié)點(diǎn)已經(jīng)設(shè)置為head節(jié)點(diǎn)了,之前的head就可以GC了
                p.next = null; // help GC
                // 返回是否當(dāng)前線程被打斷。
                // 這個(gè)返回結(jié)果的作用會(huì)被用在lockInterruptibly()這個(gè)方法上。
                // lock()方法可忽略。
                return interrupted;
            }
            // 判斷當(dāng)前節(jié)點(diǎn)是否應(yīng)該阻塞。
            if (shouldParkAfterFailedAcquire(p, node))
                // 下面這個(gè)代碼可以翻譯成:
                // if(parkAndCheckInterrupt()){
                //     interrupted = true;
                // }
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        // 拋出任何異常,都直接取消當(dāng)前節(jié)點(diǎn)正在競(jìng)爭(zhēng)鎖的操作
        // 如果在等待隊(duì)列中,就從等待隊(duì)列中移除。
        // 如果當(dāng)前線程已經(jīng)搶占到鎖了,那么就解鎖。
        cancelAcquire(node);
        // 如果當(dāng)前線程已經(jīng)被中斷
        if (interrupted)
            // 重新設(shè)置中斷信號(hào)
            selfInterrupt();
        // 拋出當(dāng)前異常
        throw t;
    }
}
  1. 獲取當(dāng)前節(jié)點(diǎn)的上一個(gè)節(jié)點(diǎn)
// 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();

final Node predecessor() {
    // 上一個(gè)節(jié)點(diǎn)
    Node p = prev;
    // 如果為null,直接拋異常
    if (p == null)
        throw new NullPointerException();
    else
        // 返回上一個(gè)節(jié)點(diǎn)
        return p;
}
  1. 如果上一個(gè)節(jié)點(diǎn)為head節(jié)點(diǎn)
// 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
// 如果前一個(gè)節(jié)點(diǎn)是head節(jié)點(diǎn),那么就嘗試競(jìng)爭(zhēng)鎖
if (p == head && tryAcquire(arg))
image.png
  1. 搶占成功鎖后
// 競(jìng)爭(zhēng)鎖成功,把當(dāng)前節(jié)點(diǎn)設(shè)置為head節(jié)點(diǎn)
setHead(node);
// 把前一個(gè)節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn)斷開
p.next = null;
image.png
  1. 判斷當(dāng)前節(jié)點(diǎn)的狀態(tài)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前一個(gè)節(jié)點(diǎn)的狀態(tài)
    int ws = pred.waitStatus;
    // 如果狀態(tài)等于-1。Node.SIGNAL的值就是-1
    if (ws == Node.SIGNAL)
        // 直接返回true,這個(gè)時(shí)候就要準(zhǔn)備阻塞。
        return true;
    // 如果狀態(tài)值大于0,說明是要取消的節(jié)點(diǎn)。
    if (ws > 0) {
        // 跳過“取消”狀態(tài)節(jié)點(diǎn)
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        
        pred.next = node;
    } else {
        // ws小于等于0的話,直接把前一個(gè)節(jié)點(diǎn)的狀態(tài)置為-1
        // 因?yàn)樾聞?chuàng)建的節(jié)點(diǎn)初始化狀態(tài)是0,
        // 那么意味著執(zhí)行到這里后,還要返回去重新自旋一次才能返回true。
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    // 返回false
    return false;
}
  1. 當(dāng)前線程阻塞
private final boolean parkAndCheckInterrupt() {
    // 阻塞當(dāng)前線程。
    // 1. 調(diào)用LockSupport.unpark()才能重新喚醒被阻塞的線程。
    // 2.調(diào)用thread.interrupt()也可以喚醒阻塞線程。
    LockSupport.park(this);
    // 判斷當(dāng)前線程是否被打斷。
    // 如果當(dāng)前線程是被打斷的,那么返回true,否則返回false。
    return Thread.interrupted();
}

小結(jié)一下:

  1. 先獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn),如果是head節(jié)點(diǎn),那么嘗試競(jìng)爭(zhēng)鎖
    1. 競(jìng)爭(zhēng)鎖成功后,重置head節(jié)點(diǎn),返回false(代表沒有被打斷)。
  2. 如果前一個(gè)節(jié)點(diǎn)狀態(tài)小于等于0,那么置為-1。
    1. 重新自旋一次,從第一步開始
    2. 如果前一個(gè)節(jié)點(diǎn)狀態(tài)等于-1,返回true,準(zhǔn)備阻塞。
  3. 調(diào)用LockSupport.park()阻塞當(dāng)前線程,直至unpark()或者interrupt()喚醒當(dāng)前線程。
    1. 通過unpark()喚醒,沒有被打斷,返回false
    2. 通過interrupt()喚醒,被打斷,返回true。
  4. 被喚醒的線程又開始自旋,直至獲取到鎖后返回是否被打斷的結(jié)果。
    1. 如果是被打斷后獲取鎖返回,那么返回true。
    2. 否則返回false。
public final void acquire(int arg) {
    // 嘗試獲取鎖
    if (!tryAcquire(arg) &&
    // addWaiter(Node.EXCLUSIVE):競(jìng)爭(zhēng)鎖失敗后,添加到等待隊(duì)列
    // acquireQueued(node, arg):阻塞等待,自旋獲取鎖后,返回判斷是否被打斷
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果被打斷,需要恢復(fù)中斷信號(hào)
        selfInterrupt();
}

// 其實(shí)就是重新中斷一次。
// 因?yàn)閳?zhí)行過Thread.interrupted()方法后,會(huì)讓中斷信號(hào)重置為false。
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

以上就是我對(duì)于公平鎖-加鎖實(shí)現(xiàn)的淺析。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容