Java并發(fā)開篇--ReentrantLock公平鎖的可重入性

Java并發(fā)編程--ReentrantLock可重入性探索

我們直接先看其公平鎖情況下的可重入性到底是怎么回事,由于我們討論的是公平鎖的情況,而相關(guān)的代碼在ReentrantLock的內(nèi)部類FairSync中。

1. lock()

public void lock() {
    sync.lock();
}  

由于是公平鎖,所以我們需要重FairSync中查看lock方法:

final void lock() {
    acquire(1);
}  

而這里的acquire方法繼承自AbstractQueuedSynchronizer

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}  

首先我們先提前說一下tryAcquire返回值是一個boolean,為true說明當(dāng)前線程成功獲取了ReentrantLock的鎖,并且ReentrantLock鎖是一個獨占鎖,而這個if條件,如果成功獲取了鎖,那么acquire方法就直接返回了。

AQS已經(jīng)為該方法做了方法的實現(xiàn),在FairSync中我們只要實現(xiàn)tryAcquire方法即可:

protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {  // 說明當(dāng)前鎖還未被持有
            // hasQueuedPredecessors返回false的情況為:當(dāng)前線程在等待隊列的頭部或者等待隊列為空
            // 這就說明了:只有等待隊列的頭結(jié)點可以獲取鎖  
            // Q:什么情況下當(dāng)前線程已經(jīng)在頭結(jié)點了,但是還沒有獲取鎖?
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {   // 這里說明當(dāng)前線程就是獨占的線程
            int nextc = c + acquires;       // 持有鎖的線程獲取鎖的次數(shù),這也表明了可重入性
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);           // 原來可重入性是這么個意思
            return true;         // 如果current再次使用該所對象加鎖,那么會直接返回true,可重入就是這么個意思
        }
        return false;
    }

如注釋中所說,當(dāng)前線程能夠成功獲得鎖有兩種情況,分別代表首次加鎖和重入鎖

  • 如果c為0的話,說明當(dāng)前鎖還沒有被任何線程獨占,這時候會對隊列進(jìn)行判斷
    • hasQueuedPredecessors方法返回false有兩種情況:當(dāng)前等待隊列為空或者當(dāng)前線程就是隊列頭部;此時才可以嘗試加鎖
    • 一個CAS操作將ReentrantLock的state屬性設(shè)置為acquire值(調(diào)用的時候傳遞的為1)
    • setExclusiveOwnerThread方法將當(dāng)前線程設(shè)置為獨占鎖的線程
  • 如果當(dāng)前線程為獨占線程,則保證可重入性:
    • 將state進(jìn)行加一操作

從這一個成功加鎖的過程我們可以產(chǎn)生一些大膽的推斷:

  1. 獨占該鎖的線程位于等待隊列的頭部
  2. state屬性表示獨占的線程加鎖的次數(shù),在之后解鎖的時候可能也要這么多次數(shù)的unlock才可以釋放鎖
  3. 可重入性的保證就是一句current == getExclusiveOwnerThread()

注意:有沒有發(fā)現(xiàn),對于第一個加鎖的線程,它會加鎖成功,但是這個第一次加鎖的線程,沒有被封裝成一個Node節(jié)點放到隊列中;所以說,持有鎖的線程是隊列的頭結(jié)點這句話有問題:因為持有鎖的線程根本不在隊列中,何來頭結(jié)點一說。在下文分析加鎖失敗的情況會證明這一論斷。

關(guān)于hasQueuedPredecessors方法:

public final boolean hasQueuedPredecessors() {
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}  

首先對于第一次加鎖的線程,此時由于h == t == null所以返回false,而對于后面的返回false的情況大都是h != t但是s != null、s.thread == Thread.currentThread()返回的false,也就是只有頭結(jié)點的后繼節(jié)點調(diào)用該方法時,才會返回false表示可以嘗試加鎖。同時這也是可重入鎖中公平鎖的來源,對于之前已經(jīng)在隊列中的節(jié)點,那么新來的節(jié)點想要加鎖,該方法會返回true說明隊列中在你之前還有人在等待,得前面沒人等待了你才能返回false,才能嘗試去加鎖,保證了先來后到的公平性。

注意:對于第二個嘗試加鎖的線程,由于此時前面有一個人持有鎖,所以它在調(diào)用lock-acquire-tryAcquire方法時由于判斷state!=0且當(dāng)前線程不是獨占鎖的線程直接判斷了加鎖失敗,從而被添加到了隊列中,這條路線中不涉及到hasQueuedPredecessors方法的調(diào)用(顯然此時它是滿足h==t這個判斷的)

加鎖失敗情況

還記得acquire方法嗎:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}  

如果tryAcquire方法返回false的話,說明加鎖失敗,同時通過上面的代碼我們知道,如果加鎖失敗的話,當(dāng)前線程沒有被執(zhí)行各種處理,所以我們在分析acquireQueued方法的時候沒有任何后顧之憂,它的代碼沒有收到tryAcquire的影響:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {   // 注意,這里的意思是頭結(jié)點的后繼節(jié)點tryAcquire成功,也就是獲取了鎖
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;             // 這里說明了,如果頭結(jié)點的后繼節(jié)點成功獲得了鎖,直接返回false
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())          // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}  

對于這個入隊操作,有幾點需要說明:

  1. 只有頭結(jié)點的next節(jié)點才會主動調(diào)用tryAcquire方法取申請獲取鎖
    1. 當(dāng)頭結(jié)點的next節(jié)點成功的得到了鎖之后,通過setHead方法會將自己設(shè)為頭結(jié)點
    2. 移除原來的頭結(jié)點之后return false

好了,接下來我們就來說一說為啥上面說持有鎖的線程不在隊列中,如果說上文對于首次加鎖的線程沒有加入隊列產(chǎn)生懷疑的話,那么這里的setHead方法會使你幡然醒悟:

private void setHead(Node node) {     
    head = node;
    node.thread = null;
    node.prev = null;
}  

看到了沒:node.thread = null把頭結(jié)點的線程置為空了?。?!所以,對于獨占到鎖的線程來說,它此時已經(jīng)不再隊列中了??!所以說,只有頭結(jié)點時持有鎖的節(jié)點這句話不準(zhǔn)確??!

那么問題來了,這個頭結(jié)點時怎么初始化的呢?

頭結(jié)點的初始化

答案就在調(diào)用acquireQueue方法時的addWaiter方法:

private Node addWaiter(Node mode) {  
    // 思路:新鍵當(dāng)前線程的Node節(jié)點;如果tail被初始化過,則直接添加到尾部;否則執(zhí)行enq操作先初始化tail再入隊
    // 根據(jù)注釋:mode傳遞的值要么是Node.SHARED要么是Node.EXCLUSIVE  

    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}  

這里的代碼思路很簡單,就是構(gòu)建Node節(jié)點,然后插入到尾部,對于這個if判斷語句,為false的情況會執(zhí)行enq方法,在enq方法里面會有頭結(jié)點的初始化:

private Node enq(final Node node) {
    // 思路:獲取尾部,將參數(shù)中的node節(jié)點直接加入尾部;然后CAS更新tail引用 

    for (;;) {

        // 獲取尾部
        Node t = tail;

        // 如果tail為空,說明是第一次入隊操作,通過CAS初始化節(jié)點  
        // 這里初始化只是調(diào)用了默認(rèn)構(gòu)造器,目的是為了第二次for循環(huán)時tail不為null
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            // 因為tail是AQS在并發(fā)環(huán)境下的共享資源,所以修改tail變量要使用CAS操作
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}  

有沒有發(fā)現(xiàn),這個頭結(jié)點的thread沒有傳遞是參數(shù),是一個null,這也證明了我們之前說的是正確的。

所以我們就知道了,在第一次有線程加鎖的時候,它會成功得到鎖,那么當(dāng)?shù)诙€線程需要等待這個鎖的時候,調(diào)用addWaiter的時候會初始化鏈表的頭結(jié)點。

阻塞線程的掛起

可能有的人會問,每一個阻塞的節(jié)點都有一個無限for循環(huán)自旋,那么線程沒有被掛起的話豈不是很浪費cpu資源?掛起的操作就在for循環(huán)的后面

if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())          // parkAndCheckInterrupt方法最后一句return Thread.currentThread().interrupted()
                interrupted = true;  

其中shouldParkAfterFailedAcquire(p, node)方法用來判斷是否需要掛起獲取鎖失敗的線程:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)

        // 判斷是否可以掛起的思想是:如果該節(jié)點的pred節(jié)點的waitStatus已經(jīng)被設(shè)置為了SIGNAL  
        // 那么說明該節(jié)點已經(jīng)料理好后事了,可以在某個時刻被喚醒,所以可以安全的掛起
        return true;
    if (ws > 0) {    // 說明pred已經(jīng)被cancelled了,直接移除
     
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {

        // 說明pred的waitStatue是0或者PROPAGATE,此時設(shè)置為SINGAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}  

看到了嗎,只有參數(shù)中的pred,也就是需要被掛起的節(jié)點node的前一個節(jié)點的waitStatus為SINGLE的時候,才會返回true。而我們調(diào)用addWaiter方法創(chuàng)建Node節(jié)點的時候,waitStatue都是默認(rèn)值0,所以在該方法的后面else語句中有一個CAS操作將其設(shè)置為SINGLE,這樣的話,在acquireQueued方法的for自旋中,需要被掛起的線程經(jīng)歷兩個for循環(huán)就可以使得shouldParkAfterFailedAcquire方法返回true。

之后,真正為node執(zhí)行掛起的操作位于parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}  

這樣的話,沒有獲取到鎖的線程就真的被掛起了。

2. unlock()

public void unlock() {
    sync.release(1);
}  

release方法同acquire方法一樣,都是在AQS中又方法實現(xiàn)的:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}  

AQS的實現(xiàn)類只需要重寫tryRelease方法即可。

protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }  

tryRelease方法很簡單,就是將ReentrantLock的state值減去一,然后如果此時state為0說明獨占的線程已經(jīng)完全釋放了鎖,此時可以解除綁定,否則返回false。

而真正實現(xiàn)釋放鎖后喚醒其他線程的方法位于release中的

if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);  

我們上面在分析線程的掛起的時候說到了要想掛起,那么node的前一個節(jié)點的waitStatus必須為SINGLE,而SINGLE在Node這個類中的值為-1.是小于0的,所以一定會執(zhí)行到unparkSuccessor方法。

private void unparkSuccessor(Node node) {   // 通過名稱,該方法是喚醒后繼節(jié)點
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;  
      
    // 根據(jù)注釋和代碼:這里是處理node參數(shù)的next節(jié)點為null或者已經(jīng)取消的情況
    // 此時從尾部開始遍歷,找沒有被canclled的節(jié)點喚醒  
    // PS:為何不從node開始往后找,而是從尾部開始往前找?
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }

    // 代表參數(shù)node的next節(jié)點不為空,則喚醒其中的線程
    if (s != null)
        LockSupport.unpark(s.thread);
}  

這里的LockSupport.unpark方法就是喚起其他線程的地方,且一次只會喚醒一個線程,大部分情況就是頭結(jié)點的后繼節(jié)點。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容