ReentrantLock是如何基于AQS實(shí)現(xiàn)的

ReentrantLock是一個(gè)可重入的互斥鎖,基于AQS實(shí)現(xiàn),它具有與使用 synchronized 方法和語(yǔ)句相同的一些基本行為和語(yǔ)義,但功能更強(qiáng)大。

lock和unlock

ReentrantLock?中進(jìn)行同步操作都是從lock方法開(kāi)始。lock獲取鎖,進(jìn)行一系列的業(yè)務(wù)操作,結(jié)束后使用unlock釋放鎖。

private final ReentrantLock lock = new ReentrantLock();

? ? public void sync(){

? ? ? ? lock.lock();

? ? ? ? try {

? ? ? ? ? ? // ... method body

? ? ? ? } finally {

? ? ? ? ? lock.unlock()

? ? ? ? }

? ? }

lock

ReentrantLock?中l(wèi)ock的實(shí)現(xiàn)是通過(guò)調(diào)用AQS的?AbstractQueuedSynchronizer#acquire?方法實(shí)現(xiàn)。

public final void acquire(int arg) {

? ? ? ? //嘗試獲取鎖

? ? ? ? if (!tryAcquire(arg) &&

? ? ? ? ? ? acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

? ? ? ? ? ? selfInterrupt();

? ? }

根據(jù)之前介紹的模板方法模式,對(duì)于鎖的獲取tryAcquire是在ReentrantLock中實(shí)現(xiàn)的。而非公平鎖中的實(shí)際實(shí)現(xiàn)方法為nonfairTryAcquire。

ReentrantLock#nonfairTryAcquire

protected final boolean tryAcquire(int acquires) {

? ? return nonfairTryAcquire(acquires);

}

final boolean nonfairTryAcquire(int acquires) {

? ? final Thread current = Thread.currentThread();

? ? int c = getState();

? ? if (c == 0) {

? ? ? ? if (compareAndSetState(0, acquires)) {

? ? ? ? ? ? setExclusiveOwnerThread(current);

? ? ? ? ? ? return true;

? ? ? ? }

? ? }

? ? else if (current == getExclusiveOwnerThread()) {

? ? ? ? int nextc = c + acquires;

? ? ? ? if (nextc < 0) // overflow

? ? ? ? ? ? throw new Error("Maximum lock count exceeded");

? ? ? ? setState(nextc);

? ? ? ? return true;

? ? }

? ? return false;

}

在獲取鎖的邏輯中首先是嘗試以cas方式獲取鎖,如果獲取失敗則表示鎖已經(jīng)被線程持有。

再判斷持有該鎖的線程是否為當(dāng)前線程,如果是當(dāng)前線程就將state的值加1,在釋放鎖是也需要釋放多次。這就是可重入鎖的實(shí)現(xiàn)。

如果持有鎖的線程并非當(dāng)前線程則這次加鎖失敗,返回false。加鎖失敗后將調(diào)用?AbstractQueuedSynchronizer#acquireQueued(addWaiter(Node.EXCLUSIVE), arg)?。

首先會(huì)調(diào)用addWaiter方法將該線程入隊(duì)。

private Node addWaiter(Node mode) {

? ? Node node = new Node(Thread.currentThread(), mode);

? ? Node pred = tail;

? ? if (pred != null) {

? ? ? ? node.prev = pred;

? ? ? ? if (compareAndSetTail(pred, node)) {

? ? ? ? ? ? pred.next = node;

? ? ? ? ? ? return node;

? ? ? ? }

? ? }

? ? enq(node);

? ? return node;

}

mode是指以何種模式的節(jié)點(diǎn)入隊(duì),這里傳入的是Node.EXCLUSIVE(獨(dú)占鎖)。首先將當(dāng)前線程包裝為node節(jié)點(diǎn)。然后判斷等待隊(duì)列的尾節(jié)點(diǎn)是否為空,如果不為空則通過(guò)cas的方式將當(dāng)前節(jié)點(diǎn)接在隊(duì)尾。如果tail為空則執(zhí)行enq方法。

AbstractQueuedSynchronizer#enq

private Node enq(final Node node) {

? ? for (;;) {

? ? ? ? Node t = tail;

? ? ? ? if (t == null) { // Must initialize

? ? ? ? ? ? if (compareAndSetHead(new Node()))

? ? ? ? ? ? ? ? tail = head;

? ? ? ? } else {

? ? ? ? ? ? node.prev = t;

? ? ? ? ? ? if (compareAndSetTail(t, node)) {

? ? ? ? ? ? ? ? t.next = node;

? ? ? ? ? ? ? ? return t;

? ? ? ? ? ? }

? ? ? ? }

? ? }

}

enq方法通過(guò)for(;;)無(wú)限循環(huán)的方式將node節(jié)點(diǎn)設(shè)置到等待隊(duì)列的隊(duì)尾(隊(duì)列為空時(shí)head和tail都指向當(dāng)前節(jié)點(diǎn))。

綜上可知addWaiter方法的作用是將競(jìng)爭(zhēng)鎖失敗的節(jié)點(diǎn)放到等待隊(duì)列的隊(duì)尾。

等待隊(duì)列中的節(jié)點(diǎn)也并不是什么都不做,這些節(jié)點(diǎn)也會(huì)不斷的嘗試獲取鎖,邏輯在acquireQueued中實(shí)現(xiàn)。

AbstractQueuedSynchronizer#acquireQueued

final boolean acquireQueued(final Node node, int arg) {

? ? boolean failed = true;

? ? try {

? ? ? ? boolean interrupted = false;

? ? ? ? for (;;) {

? ? ? ? ? ? final Node p = node.predecessor();

? ? ? ? ? ? if (p == head && tryAcquire(arg)) {

? ? ? ? ? ? ? ? setHead(node);

? ? ? ? ? ? ? ? p.next = null; // help GC

? ? ? ? ? ? ? ? failed = false;

? ? ? ? ? ? ? ? return interrupted;

? ? ? ? ? ? }

? ? ? ? ? ? if (shouldParkAfterFailedAcquire(p, node) &&

? ? ? ? ? ? ? ? parkAndCheckInterrupt())

? ? ? ? ? ? ? ? interrupted = true;

? ? ? ? }

? ? } finally {

? ? ? ? if (failed)

? ? ? ? ? ? cancelAcquire(node);

? ? }

}

可以看到該方法也是使用for(;;)無(wú)限循環(huán)的方式來(lái)嘗試獲取鎖。首先判斷當(dāng)前節(jié)點(diǎn)是否為頭結(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn),如果是則再次調(diào)用tryAcquire嘗試獲取鎖。當(dāng)然這個(gè)過(guò)程并不是一定不停進(jìn)行的,這樣的話(huà)多線程競(jìng)爭(zhēng)下cpu切換也極耗費(fèi)資源。

shouldParkAfterFailedAcquire會(huì)判斷是否對(duì)當(dāng)前節(jié)點(diǎn)進(jìn)行阻塞,阻塞之后只有當(dāng)unpark后節(jié)點(diǎn)才會(huì)繼續(xù)假如爭(zhēng)奪鎖的行列。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

? ? int ws = pred.waitStatus;

? ? if (ws == Node.SIGNAL)

? ? ? ? return true;

? ? if (ws > 0) {

? ? ? ? do {

? ? ? ? ? ? node.prev = pred = pred.prev;

? ? ? ? } while (pred.waitStatus > 0);

? ? ? ? pred.next = node;

? ? } else {

? ? ? ? compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

? ? }

? ? return false;

}

判斷一個(gè)節(jié)點(diǎn)是否需要被阻塞是通過(guò)該節(jié)點(diǎn)的前繼節(jié)點(diǎn)的狀態(tài)判斷的。

如果前繼節(jié)點(diǎn)狀態(tài)為?singal?,則表示前繼節(jié)點(diǎn)還在等待,當(dāng)前節(jié)點(diǎn)需要繼續(xù)被阻塞。返回true。

如果前繼節(jié)點(diǎn)大于0,則表示前繼節(jié)點(diǎn)為取消狀態(tài)。取消狀態(tài)的節(jié)點(diǎn)不參與鎖的競(jìng)爭(zhēng),直接跳過(guò)。返回false。

如果前繼節(jié)點(diǎn)時(shí)其他狀態(tài)(0,PROPAGATE),不進(jìn)行阻塞,表示當(dāng)前節(jié)點(diǎn)需要重試嘗試獲取鎖。返回false。

shouldParkAfterFailedAcquire方法如果返回true,表示需要將當(dāng)前節(jié)點(diǎn)阻塞,阻塞方法為parkAndCheckInterrupt。

AbstractQueuedSynchronizer#parkAndCheckInterrupt

private final boolean parkAndCheckInterrupt() {

? ? LockSupport.park(this);

? ? return Thread.interrupted();

}

阻塞是通過(guò)LockSupport進(jìn)行阻塞,被阻塞的節(jié)點(diǎn)不參與鎖的競(jìng)爭(zhēng)(不在進(jìn)行循環(huán)獲取鎖),只能被unpark后才繼續(xù)競(jìng)爭(zhēng)鎖。

而被阻塞的節(jié)點(diǎn)要被釋放則依賴(lài)于unlock方法。

unlock

ReentrantLock?中unlock的實(shí)現(xiàn)是通過(guò)調(diào)用AQS的?AbstractQueuedSynchronizer#release?方法實(shí)現(xiàn)。

public final boolean release(int arg) {

? ? ? ? if (tryRelease(arg)) {

? ? ? ? ? ? Node h = head;

? ? ? ? ? ? if (h != null && h.waitStatus != 0)

? ? ? ? ? ? ? ? unparkSuccessor(h);

? ? ? ? ? ? return true;

? ? ? ? }

? ? ? ? return false;

? ? }

release調(diào)用tryRelease方法,tryRelease是在?ReentrantLock?中實(shí)現(xiàn)。

ReentrantLock#tryRelease

protected final boolean tryRelease(int releases) {

? ? ? ? int c = getState() - releases;

? ? ? ? if (Thread.currentThread() != getExclusiveOwnerThread())

? ? ? ? ? ? throw new IllegalMonitorStateException();

? ? ? ? boolean free = false;

? ? ? ? if (c == 0) {

? ? ? ? ? ? free = true;

? ? ? ? ? ? setExclusiveOwnerThread(null);

? ? ? ? }

? ? ? ? setState(c);

? ? ? ? return free;

? ? }

tryRelease方法邏輯很簡(jiǎn)單,首先減去releases(一般為1)表示釋放一個(gè)鎖,如果釋放后state=0表示釋放鎖成功,后續(xù)等待的節(jié)點(diǎn)可以獲取該鎖了。如果state!=0則表示該鎖為重入鎖,需要多次釋放。

當(dāng)釋放鎖成功后(state=0),會(huì)對(duì)頭結(jié)點(diǎn)的后繼節(jié)點(diǎn)進(jìn)行unpark。

AbstractQueuedSynchronizer#unparkSuccessor

private void unparkSuccessor(Node node) {

? ? int ws = node.waitStatus;

? ? if (ws < 0)

? ? ? ? compareAndSetWaitStatus(node, ws, 0);

? ? Node s = node.next;

? ? if (s == null || s.waitStatus > 0) {

? ? ? ? s = null;

? ? ? ? for (Node t = tail; t != null && t != node; t = t.prev)

? ? ? ? ? ? if (t.waitStatus <= 0)

? ? ? ? ? ? ? ? s = t;

? ? }

? ? if (s != null)

? ? ? ? LockSupport.unpark(s.thread);

? ? }

unparkSuccessor見(jiàn)名知意適用于接觸后面節(jié)點(diǎn)的阻塞狀態(tài)。整個(gè)方法的邏輯就是找到傳入節(jié)點(diǎn)的后繼節(jié)點(diǎn),將其喚醒(排除掉狀態(tài)為cancel即waitStatus > 0的節(jié)點(diǎn))。

公平鎖和非公平鎖

ReentrantLock?的構(gòu)造方法接受一個(gè)可選的公平參數(shù)。當(dāng)設(shè)置為?true?時(shí),在多個(gè)線程的競(jìng)爭(zhēng)時(shí),傾向于將鎖分配給等待時(shí)間最長(zhǎng)的線程。

public ReentrantLock(boolean fair) {

? ? sync = fair ? new FairSync() : new NonfairSync();

}

在多個(gè)鎖競(jìng)爭(zhēng)統(tǒng)一資源的環(huán)境下,AQS維護(hù)了一個(gè)等待隊(duì)列,未能獲取到鎖的線程都會(huì)被掛到該隊(duì)列中。如果使用公平鎖則會(huì)從隊(duì)列的頭結(jié)點(diǎn)開(kāi)始獲取該資源。

而根據(jù)代碼在公平鎖和非公平鎖的實(shí)現(xiàn)的差別僅僅在于公平鎖多了一個(gè)檢測(cè)的方法。

公平鎖

protected final boolean tryAcquire(int acquires) {

? ? //...

? ? if (c == 0) {

? ? ? ? if (!hasQueuedPredecessors() //!hasQueuedPredecessors()便是比非公平鎖多出來(lái)的操作

? ? ? ? && compareAndSetState(0, acquires)) {

? ? ? ? ? ? setExclusiveOwnerThread(current);

? ? ? ? ? ? return true;

? ? ? ? }

? ? }

? ? //...

? ? return false;

}

hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {

? ? Node t = tail; // Read fields in reverse initialization order

? ? Node h = head;

? ? Node s;

? ? return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());

}

方法邏輯很簡(jiǎn)單,就是如果等待隊(duì)列還有節(jié)點(diǎn)并且排在首位的不是當(dāng)前線程所處的節(jié)點(diǎn)返回true表示還有等待更長(zhǎng)時(shí)間的節(jié)點(diǎn)。

歡迎工作一到五年的Java工程師朋友們加入Java程序員開(kāi)發(fā): 721575865

群內(nèi)提供免費(fèi)的Java架構(gòu)學(xué)習(xí)資料(里面有高可用、高并發(fā)、高性能及分布式、Jvm性能調(diào)優(yōu)、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個(gè)知識(shí)點(diǎn)的架構(gòu)資料)合理利用自己每一分每一秒的時(shí)間來(lái)學(xué)習(xí)提升自己,不要再用"沒(méi)有時(shí)間“來(lái)掩飾自己思想上的懶惰!趁年輕,使勁拼,給未來(lái)的自己一個(gè)交代!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容