ReentrantLock源碼分析及AQS原理

ReentrantLock源碼分析及AQS原理

ReentrantLock源碼分析

ReentrantLock(可重入互斥鎖)??芍厝腈i又名遞歸鎖,是指在同一個(gè)線程在外層方法獲取鎖的時(shí)候,再進(jìn)入該線程的內(nèi)層方法會(huì)自動(dòng)獲取鎖(前提鎖對(duì)象得是同一個(gè)對(duì)象或者class),不會(huì)因?yàn)橹耙呀?jīng)獲取過還沒釋放而阻塞。

我們從構(gòu)造函數(shù)開始逐步分析。

ReentrantLock的兩個(gè)構(gòu)造函數(shù),默認(rèn)使用的是非公平sync對(duì)象

public ReentrantLock() {
        sync = new NonfairSync();
}
    
public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
}

看下繼承自Sync的非公平同步類NonfairSync

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * 非公平鎖:線程加鎖時(shí)直接嘗試獲取鎖,獲取不到才會(huì)到等待隊(duì)列的隊(duì)尾排隊(duì)等待
         */
        final void lock() {
            if (compareAndSetState(0, 1))//CAS
                setExclusiveOwnerThread(Thread.currentThread());//設(shè)置所有者線程為當(dāng)前線程
            else
                acquire(1);//失敗后嘗試獲取鎖。
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

compareAndSetState(0, 1)。0是期望值,1是更新值。只要期待值與當(dāng)前同步狀態(tài)state相等,則把state更新為update(更新值)。

這里state就是當(dāng)前線程獲得鎖的狀態(tài)。先看下state的說明。

private volatile int state;//state是Volatile修飾的,用于保證一定的可見性和有序性。
  1. State初始化的時(shí)候?yàn)?,表示沒有任何線程持有鎖。
  2. 當(dāng)有線程持有該鎖時(shí),值就會(huì)在原來的基礎(chǔ)上+1,同一個(gè)線程多次獲得鎖時(shí),就會(huì)多次+1,這里就是可重入的概念。
  3. 解鎖也是對(duì)這個(gè)字段-1,一直到0,此線程對(duì)鎖釋放。

所以此處的compareAndSetState(0, 1)就是為了判斷當(dāng)前線程state是否為0,是0則獲取鎖成功,并更新state為1.

protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

如果獲取鎖失敗呢?也就是當(dāng)前線程的state不為0,那就會(huì)執(zhí)行acquire(1);

//arg代表獲取鎖的次數(shù)
public final void acquire(int arg) {
        //tryAcquire(arg)為true,代表獲取鎖成功(當(dāng)前線程為獨(dú)占線程)。
        //獲取鎖失敗后,再執(zhí)行acquireQueued將線程排隊(duì)。
        if (!tryAcquire(arg) &&
            //addWaiter方法其實(shí)就是把對(duì)應(yīng)的線程以Node的數(shù)據(jù)結(jié)構(gòu)形式加入到雙端隊(duì)列里,返回的是一個(gè)包含該線程的Node。而這個(gè)Node會(huì)作為參數(shù),進(jìn)入到acquireQueued方法中。acquireQueued方法可以對(duì)排隊(duì)中的線程進(jìn)行“獲鎖”操作。
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //中斷當(dāng)前線程
            selfInterrupt();
    }

這幾個(gè)方法都是由ReentrantLock中的(非)公平同步類調(diào)用,而(Non)fairSync繼承自Sync類,Sync類繼承自AQS,所以這些方法都在AQS中定義。例如上面的tryAcquire方法就被ReentrantLock重寫了,直接調(diào)用AQS的tryAcquire會(huì)拋出UnsupportedOperationException異常。

在這里插入圖片描述

可以看到ReentrantLock中的公平同步類和非公平同步類都重寫了tryAcquire。

所以acquire調(diào)用的就是NonfairSync中已經(jīng)重寫了的tryAcquire

//NonfairSync類里最后一個(gè)方法,當(dāng)前acquires=1
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
            //當(dāng)前線程
            final Thread current = Thread.currentThread();
            //當(dāng)前state值
            int c = getState();
            //如果沒有線程獲取鎖
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    //獲取鎖成功,并將當(dāng)前線程設(shè)置為獨(dú)占線程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //如果當(dāng)前線程為獨(dú)占線程,則將state加1。
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

查看addWaiter方法前,要先來看下Node節(jié)點(diǎn),也就是未獲取到鎖的線程加入到了哪種隊(duì)列中去。

這就涉及到AQS核心思想如果被請(qǐng)求的共享資源空閑,那么就將當(dāng)前請(qǐng)求資源的線程設(shè)置為有效的工作線程,將共享資源設(shè)置為鎖定狀態(tài);如果共享資源被占用,就需要一定的阻塞等待喚醒機(jī)制來保證鎖分配。這個(gè)機(jī)制主要用的是CLH隊(duì)列的變體---虛擬雙向隊(duì)列(FIFO)---實(shí)現(xiàn)的,將暫時(shí)獲取不到鎖的線程加入到隊(duì)列中。

AQS使用一個(gè)Volatile的int類型的state來表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取的排隊(duì)工作,通過CAS完成對(duì)State值的修改。

AQS中基本的數(shù)據(jù)結(jié)構(gòu)——Node,虛擬雙向隊(duì)列(FIFO)中的節(jié)點(diǎn)。

static final class Node {
    /** 標(biāo)志線程以共享/獨(dú)占方式等待鎖 */
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    /** 表示線程獲取鎖的請(qǐng)求已經(jīng)取消 */
    static final int CANCELLED =  1;
    /** 表示線程已經(jīng)準(zhǔn)備好,就等資源釋放了(被喚醒了) */
    static final int SIGNAL    = -1;
    /** 表示節(jié)點(diǎn)在等待隊(duì)列中,節(jié)點(diǎn)線程等待喚醒 */
    static final int CONDITION = -2;
    /** 當(dāng)前線程處在SHARED情況下,該字段才會(huì)使用(其他操作介入,也要確保傳播繼續(xù)) */
    static final int PROPAGATE = -3;
    
    //當(dāng)前節(jié)點(diǎn)在隊(duì)列中的狀態(tài)
    volatile int waitStatus;
    //前驅(qū)指針
    volatile Node prev;
    //后繼指針
    volatile Node next;
    //表示處于該節(jié)點(diǎn)的線程
    volatile Thread thread;
    //指向下一個(gè)處于CONDITION狀態(tài)的節(jié)點(diǎn)
    Node nextWaiter;
    
    /** 返回前驅(qū)節(jié)點(diǎn),沒有則拋出空指針異常 */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    
    /** 幾個(gè)Node構(gòu)造函數(shù) */
    //用來初始化頭節(jié)點(diǎn)或SHARED標(biāo)志
    Node() {    
    }

    //用于addWaiter方法
    Node(Thread thread, Node mode) {    
        this.nextWaiter = mode;
        this.thread = thread;
    }

    //用于Condition
    Node(Thread thread, int waitStatus) { 
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
//隊(duì)列頭節(jié)點(diǎn)
private transient volatile Node head;
//隊(duì)列尾節(jié)點(diǎn)
private transient volatile Node tail;

現(xiàn)在再來看addWaiter方法是如何把線程加入到雙端隊(duì)列的。

//這里的mode就是Node.EXCLUSIVE,表示獨(dú)占模式
private Node addWaiter(Node mode) {
    //通過當(dāng)前的線程和鎖模式新建一個(gè)節(jié)點(diǎn)。
    Node node = new Node(Thread.currentThread(), mode);
    //pred指向尾節(jié)點(diǎn)tail
    Node pred = tail;
    //如果尾節(jié)點(diǎn)不為空,則將當(dāng)前節(jié)點(diǎn)插入到尾節(jié)點(diǎn)后面(設(shè)為尾節(jié)點(diǎn)),并返回node
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果Pred指針是Null(說明等待隊(duì)列中沒有元素),或者當(dāng)前Pred指針和Tail指向的位置不同(說明已經(jīng)被別的線程修改),則進(jìn)入enq初始化head節(jié)點(diǎn),并將node設(shè)為尾節(jié)點(diǎn)。
    enq(node);
    return node;
}

注意雙向鏈表中,第一個(gè)節(jié)點(diǎn)(頭節(jié)點(diǎn))為虛節(jié)點(diǎn),其實(shí)并不存儲(chǔ)任何信息,只是占位。真正的第一個(gè)有數(shù)據(jù)的節(jié)點(diǎn),是在第二個(gè)節(jié)點(diǎn)開始的。

回到上面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法,acquireQueued會(huì)把放入隊(duì)列中的線程不斷去獲取鎖,直到獲取成功或者不再需要獲?。ㄖ袛啵?。

那么隊(duì)列中的線程什么時(shí)候可以獲取到鎖?一直獲取不到又會(huì)怎樣呢?

final boolean acquireQueued(final Node node, int arg) {
    // 標(biāo)記是否成功拿到資源
    boolean failed = true;
    try {
        // 標(biāo)記等待過程中是否中斷過
        boolean interrupted = false;
        // 開始自旋,要么獲取鎖,要么中斷
        for (;;) {
            // 獲取當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)
            final Node p = node.predecessor();
            // 如果p是頭結(jié)點(diǎn),說明當(dāng)前節(jié)點(diǎn)在真實(shí)數(shù)據(jù)隊(duì)列的首部,就嘗試獲取鎖(別忘了頭結(jié)點(diǎn)是虛節(jié)點(diǎn))
            if (p == head && tryAcquire(arg)) {
                // 獲取鎖成功,頭指針移動(dòng)到當(dāng)前node
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 說明p為頭節(jié)點(diǎn)且當(dāng)前沒有獲取到鎖(可能是非公平鎖被搶占了)或者是p不為頭結(jié)點(diǎn),這個(gè)時(shí)候就要判斷當(dāng)前node是否要被阻塞(被阻塞條件:前驅(qū)節(jié)點(diǎn)的waitStatus為-1),防止無限循環(huán)浪費(fèi)資源。
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)//未成功拿到資源,設(shè)置當(dāng)前節(jié)點(diǎn)狀態(tài)為CANCELLED
            cancelAcquire(node);
    }
}

// 靠前驅(qū)節(jié)點(diǎn)判斷當(dāng)前線程是否應(yīng)該被阻塞,true阻塞,false不阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 獲取前驅(qū)結(jié)點(diǎn)的節(jié)點(diǎn)狀態(tài)
    int ws = pred.waitStatus;
    // 說明前驅(qū)結(jié)點(diǎn)處于喚醒狀態(tài),此時(shí)需要阻塞當(dāng)前節(jié)點(diǎn),返回true
    if (ws == Node.SIGNAL)
        return true; 
    // 通過枚舉值我們知道waitStatus>0是取消狀態(tài)
    if (ws > 0) {
        do {
            // 循環(huán)向前查找取消節(jié)點(diǎn),把取消節(jié)點(diǎn)從隊(duì)列中剔除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        //直到前面某一個(gè)節(jié)點(diǎn)非取消節(jié)點(diǎn),將非取消節(jié)點(diǎn)連接當(dāng)前節(jié)點(diǎn)
        pred.next = node;
    } else {//前驅(qū)節(jié)點(diǎn)waitStatus為0或-3。
        // 設(shè)置前驅(qū)節(jié)點(diǎn)等待狀態(tài)為SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//parkAndCheckInterrupt主要用于掛起當(dāng)前線程,阻塞調(diào)用棧,返回當(dāng)前線程的中斷狀態(tài)。
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

前面的兩個(gè)問題得到解決:只要前驅(qū)節(jié)點(diǎn)是頭節(jié)點(diǎn),就嘗試獲取鎖。前驅(qū)節(jié)點(diǎn)不是頭節(jié)點(diǎn),或獲取鎖失敗,則判斷是否需要阻塞當(dāng)前線程。如果前驅(qū)節(jié)點(diǎn)狀態(tài)是SIGNAL,表明前驅(qū)節(jié)點(diǎn)準(zhǔn)備獲取資源,所以當(dāng)前節(jié)點(diǎn)阻塞;如果前驅(qū)節(jié)點(diǎn)是取消狀態(tài)CANCELLED(不再獲取資源),移除所有取消狀態(tài)的前驅(qū)節(jié)點(diǎn),繼續(xù)自旋嘗試獲取鎖;如果前驅(qū)節(jié)點(diǎn)waitStatus為0(默認(rèn)值)或-3,則更改前驅(qū)節(jié)點(diǎn)狀態(tài)為SIGNAL,繼續(xù)自旋。

參考鏈接 : 美團(tuán)技術(shù)團(tuán)隊(duì):從ReentrantLock的實(shí)現(xiàn)看AQS的原理及應(yīng)用

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容