Java并發(fā)編程:用AQS寫一把可重入鎖

AQS簡介

AQS是J.U.C包下AbstractQueuedSynchronizer抽象的隊列式的同步器的簡稱,這是一個抽象類,它定義了一套多線程訪問共享資源的同步器框架,J.U.C包下的許多同步類實現(xiàn)都依賴于它,比如ReentrantLock/Semaphore/CountDownLatch,可以說這個抽象類是J.U.C并發(fā)包的基礎。

之所以把這一章節(jié)叫做AQS簡介而不是叫AQS詳解,是因為已經(jīng)有大神寫過詳解的文章Java并發(fā)之AQS詳解,這篇文章對AQS的源碼解析很透徹,博主讀了之后受益匪淺,鑒于對原作者的尊重,所以如上附上原文的鏈接。要想弄懂AQS還得從這一圖說起。

如上圖所述,AQS維護了一個state變量和一個FIFO先進先出隊列,這個state用來干嘛的可以參考我前一篇博客中的那個count計數(shù)器,就是用來計數(shù)線程的重入次數(shù)的。上一篇博客還用了一個變量currentThread來記錄已經(jīng)獲得這把鎖的線程。而我們的AQS用的是一個先進先出的等待隊列的完成這件事。當新的線程進來的時候,AQS調(diào)用tryAquice()方法試圖去獲得鎖,如果獲得的話,則調(diào)用interupt中斷方法;如果沒有獲得鎖,則把當前線程放入排隊的隊列,AQS隊列不斷的自旋嘗試去判斷已經(jīng)占用的線程是否已經(jīng)放開,如果鎖依然被線程繼續(xù)占用,則繼續(xù)添加進等待隊列。

源碼如下:

public final void acquire(int arg) {

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

}

那個addWaiter方法,此方法用于將當前線程加入到等待隊列的隊尾,并返回當前線程所在的結點。

private Node addWaiter(Node mode) {

//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)

Node node = new Node(Thread.currentThread(), mode);

//嘗試快速方式直接放到隊尾。

Node pred = tail;

if (pred != null) {

node.prev = pred;

if (compareAndSetTail(pred, node)) {

pred.next = node;

return node;

}

}

//上一步失敗則通過enq入隊。

enq(node);

return node;

}

我們以獨占式的同步幫助器為例來看一下AQS的執(zhí)行流程。

大致流程如下:

調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;

沒成功,則addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式;

acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。

如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。

上述的流程和步驟已經(jīng)是AQS幫我們實現(xiàn)了的功能,估計我講的也不太清楚,這里再次推薦讀者閱讀這篇文章Java并發(fā)之AQS詳解,下面我們應該來看看如何使用AQS。

用AQS寫一把互斥鎖

互斥鎖是為了保證數(shù)據(jù)的安全,在任一時刻只能有一個線程訪問該對象。由上一個小節(jié)我們可知,AQS已經(jīng)為我們實現(xiàn)所有排隊和阻塞機制,我們只需要調(diào)用getState()、setState(int) 和 compareAndSetState(int, int) 方發(fā)來維護state變量的數(shù)值和調(diào)用setExclusiveOwnerThread/getExclusiveOwnerThread來維護當前占用的線程是誰就行了。翻越JDK提供的API,它建議我們:應該將子類定義為非公共內(nèi)部幫助器類,可用它們來實現(xiàn)其封閉類的同步屬性。類 AbstractQueuedSynchronizer 沒有實現(xiàn)任何同步接口。而是定義了諸如 acquireInterruptibly(int) 之類的一些方法,在適當?shù)臅r候可以通過具體的鎖和相關同步器來調(diào)用它們,以實現(xiàn)其公共方法。

什么意思呢?意思就是建議我們:如果你想要使用AQS實現(xiàn)一把互斥鎖Mutex,就必須先用一個類去繼承AbstractQueuedSynchronizer這個抽象類,然而這個實現(xiàn)的子類(暫取名叫Sync)應該是作為Mutex的內(nèi)部類來用的,提供給Mutex當作幫助器來使用。那么Lock接口,Mutex互斥鎖,AbstractQueuedSynchronizer抽象類和Sync幫助器這四者存在什么聯(lián)系呢?為了避免你聽糊涂了,下面我整理他們的UML類圖如下。

由上圖可知:Mutex互斥鎖繼承了Lock鎖的接口,具有鎖的屬性,可以提供上鎖和釋放鎖的方法,他是對外提供服務的服務者,而Mutex類有個Sync類型的私有對象sync,這個私有對象繼承了AbstractQueuedSynchronizer抽象類,是Mutex鎖和AQS的橋梁,是加鎖和釋放鎖真正的服務者。如果你看明白了上面的UML類圖,那么我們的Mutex互斥鎖的定義應該如下:

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {

private Sync sync = new Sync();

private class Sync extends AbstractQueuedSynchronizer {

@Override

protected boolean tryAcquire(int arg) {

// TODO Auto-generated method stub

return super.tryAcquire(arg);

}

@Override

protected boolean tryRelease(int arg) {

// TODO Auto-generated method stub

return super.tryRelease(arg);

}

}

@Override

public void lock() {

// TODO Auto-generated method stub

}

@Override

public void lockInterruptibly() throws InterruptedException {

// TODO Auto-generated method stub

}

@Override

public boolean tryLock() {

// TODO Auto-generated method stub

return false;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

// TODO Auto-generated method stub

return false;

}

@Override

public void unlock() {

// TODO Auto-generated method stub

}

@Override

public Condition newCondition() {

// TODO Auto-generated method stub

return null;

}

}

?這里我們實現(xiàn)的是獨占式的鎖,Sync幫助器只需要覆蓋父類的tryAcquire(),tryRelease()方法就行了,其他方法可以暫時刪掉,如共享式的tryAcquireShared(),tryReleaseShared(),已經(jīng)Condition用到的isHeldExclusively()和toString()方法都可以暫時不用實現(xiàn),因為我們只是想先用AQS來做一把可以保證數(shù)據(jù)安全的鎖,考慮的問題暫時沒有那么多。

/**

* 互斥鎖

* @author 張仕宗

* @date 2018.11.9

*/

public class Mutex implements Lock{

//AQS子類的對象,Mutex互斥鎖用它來工作

private Sync sync = new Sync();

//Sync同步器類作為公共內(nèi)部幫助器,可用它來實現(xiàn)其封閉類的同步屬性

private class Sync extends AbstractQueuedSynchronizer {

@Override

protected boolean tryAcquire(int arg) {

assert arg == 1; //這里用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等于1,則直接中斷

if(this.compareAndSetState(0, 1)) { //這里做一下判斷,如果state的值為等于0,立馬將state設置為1

//返回true,告訴acqure方法,獲取鎖成功

return true;

}

return false;

}

@Override

protected boolean tryRelease(int arg) {

//釋放鎖,由于這是一把互斥鎖,state不是0就是1,所以你需要做兩步:

//1.直接將state置為0

this.setState(0);

//返回true,告訴aqs的release方法釋放鎖成功

return true;

}

}

/**

* 上鎖的方法

*/

@Override

public void lock() {

sync.acquire(1);

}

/**

* 釋放鎖的方法

*/

@Override

public void unlock() {

sync.release(1);

}

@Override

public void lockInterruptibly() throws InterruptedException {

// TODO Auto-generated method stub

}

@Override

public boolean tryLock() {

// TODO Auto-generated method stub

return false;

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

// TODO Auto-generated method stub

return false;

}

@Override

public Condition newCondition() {

// TODO Auto-generated method stub

return null;

}

}

?上訴代碼實現(xiàn)了一把最簡單的鎖,我們只實現(xiàn)其lock()和unlock()方法,其他方法請暫時忽略,而lock()方法和unlock()方法是如何實現(xiàn)的呢?lock()方法調(diào)用了Sync幫助器對象的sync.acquire(1)方法,由于我們的幫助器Sync并沒有實現(xiàn)這個方法,所以實際調(diào)用的是AQS的acquire()方法,而AQS這時候做了什么時呢?再來一次該方法的源碼:

if (!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

selfInterrupt();

?次方法干的第一件事就是去調(diào)用tryAcquire()方法,這個方法需要Sync來實現(xiàn),如果自己的Sync沒有實現(xiàn)這個方法的話,父類會直接拋出UnsupportedOperationException這個異常。

@Override

protected boolean tryAcquire(int arg) {

assert arg == 1; //這里用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等于1,則直接中斷

if(this.compareAndSetState(0, 1)) { //這里做一下判斷,如果state的值為等于0,立馬將state設置為1

//返回true,告訴acqure方法,獲取鎖成功

return true;

}

return false;

}

由于這是一把互斥鎖,所以只能有同一時刻只能獲得一次鎖。代碼中用到了assert斷言,如果預獲得鎖的次數(shù)不是1,則中斷。接下來if中判斷state狀態(tài)是否為0,如果state狀態(tài)為0,則說明鎖還沒有被占用,那么我立刻占用這把鎖,判斷state當前值和設置state為1這兩步用原子性操作的代碼語句是this.compareAndSetState(0, 1),并立馬放回true,這時候AQS獲得返回值,獲得鎖成功。如果是第二個線程進來,if語句判斷得到的值非0,則直接返回false,這時候AQS將新進來的線程放進FIFO隊列排隊。

接下來看看Mutex的unlock()方法,該方法調(diào)用了sync.release(1),看看AQS這時候做了什么!

public final boolean release(int arg) {

if (tryRelease(arg)) {

Node h = head;

if (h != null && h.waitStatus != 0)

unparkSuccessor(h);

return true;

}

return false;

}

?此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。同樣的,我們的同步器Sync需要去實現(xiàn)這個tryRelease方法,不然同樣會拋出UnsupportedOperationException異常。Sync的tryRelease方法比較簡單:

@Override

protected boolean tryRelease(int arg) {

//釋放鎖,由于這是一把互斥鎖,state不是0就是1,所以你需要做兩步:

//1.直接將state置為0

this.setState(0);

//返回true,告訴aqs的release方法釋放鎖成功

return true;

}

?只需要設置state為0即可,由于這是一把互斥鎖,state不是0就是1所以直接調(diào)用this.setSate(0)。

用AQS寫一把重入鎖

上訴的Mutex并非一把可重入鎖,為了實現(xiàn)這把鎖能夠讓同一線程多次進來,回憶一下上一篇博客中怎么實現(xiàn)的?當時的做法是在鎖的lock()自旋方法中判斷新進來的是不是正在運行的線程,如果新進來的線程就是正在運行的線程,則獲取鎖成功,并讓計數(shù)器+1。而在釋放鎖的時候,如果釋放鎖的線程等于當前線程,讓計數(shù)器-1,只有當計數(shù)器count歸零的時候才真正的釋放鎖。同樣的,用AQS實現(xiàn)的鎖也是這個思路,那么我們的tryAcquice方法如下:

@Override

protected boolean tryAcquire(int arg) {

//如果第一個線程進來,直接獲得鎖,并設置當前獨占的線程為當前線程

int state = this.getState();

if(state == 0) { //state為0,說明當前沒有線程占用該線程

if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個線程進來,立刻設置state為arg

this.setExclusiveOwnerThread(Thread.currentThread()); //設置當前獨占線程為當前線程

return true; //告訴頂級aqs獲取鎖成功

}

} else { //如果是第二個線程進來

Thread currentThread = Thread.currentThread();//當前進來的線程

Thread ownerThread = this.getExclusiveOwnerThread();//已經(jīng)保存進去的獨占式線程

if(currentThread == ownerThread) { //判斷一下進來的線程和保存進去的線程是同一線程么?如果是,則獲取鎖成功,如果不是則獲取鎖失敗

this.setState(state+arg); //設置state狀態(tài)

return true;

}

}

return false;

}

?tryAcquice()方法代碼含義如注釋所示,與Mutex互斥鎖不同的是當state狀態(tài)不為0時我們的邏輯處理,如果第二次進來的線程currentThread和正在獨占的線程ownerThread為統(tǒng)一線程,第一步設置state增加1,第二步返回true給AQS。

tryRelease()方法代碼如下:

@Override

protected boolean tryRelease(int arg) {

//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖

if(Thread.currentThread() != this.getExclusiveOwnerThread()) {

//如果釋放鎖的不是當前線程,則拋出異常

throw new RuntimeException();

}

int state = this.getState()-arg;

//接下來判斷state是否已經(jīng)歸零,只有state歸零的時候才真正的釋放鎖

if(state == 0) {

//state已經(jīng)歸零,做掃尾工作

this.setState(0);

this.setExclusiveOwnerThread(null);

return true;

}

this.setState(state);

return false;

}

?tryRelease()首先是獲取當前state的值,并對這個值進行欲判:如果當前值state減去sync.release()傳來的參數(shù)歸零,則真正的釋放鎖,那么我們要做的第一步是設置state為0,接著設置當前獨占的線程為null,再然后返回true告訴AQS釋放鎖成功。如果如果當前值state減去sync.release()傳來的參數(shù)歸零,如果讓state的值為state-arg相減之后的值。

目前為此,我們以來了AQS框架來改寫的重入鎖代碼如下:

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.AbstractQueuedSynchronizer;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

/**

* 用AQS實現(xiàn)的重入鎖

* @author 張仕宗

* @date 2018.11.9

*/

public class MyAqsLock implements Lock{

//AQS子類的對象,用它來輔助MyAqsLock工作

private Sync sync = new Sync();

private class Sync extends AbstractQueuedSynchronizer {

@Override

protected boolean tryAcquire(int arg) {

//如果第一個線程進來,直接獲得鎖,并設置當前獨占的線程為當前線程

int state = this.getState();

if(state == 0) { //state為0,說明當前沒有線程占用該線程

if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個線程進來,立刻設置state為arg

this.setExclusiveOwnerThread(Thread.currentThread()); //設置當前獨占線程為當前線程

return true; //告訴頂級aqs獲取鎖成功

}

} else { //如果是第二個線程進來

Thread currentThread = Thread.currentThread();//當前進來的線程

Thread ownerThread = this.getExclusiveOwnerThread();//已經(jīng)保存進去的獨占式線程

if(currentThread == ownerThread) { //判斷一下進來的線程和保存進去的線程是同一線程么?如果是,則獲取鎖成功,如果不是則獲取鎖失敗

this.setState(state+arg); //設置state狀態(tài)

return true;

}

}

return false;

}

@Override

protected boolean tryRelease(int arg) {

//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖

if(Thread.currentThread() != this.getExclusiveOwnerThread()) {

//如果釋放鎖的不是當前線程,則拋出異常

throw new RuntimeException();

}

int state = this.getState()-arg;

//接下來判斷state是否已經(jīng)歸零,只有state歸零的時候才真正的釋放鎖

if(state == 0) {

//state已經(jīng)歸零,做掃尾工作

this.setState(0);

this.setExclusiveOwnerThread(null);

return true;

}

this.setState(state);

return false;

}

public Condition newCondition() {

return new ConditionObject();

}

}

/**

* 上鎖的方法

*/

@Override

public void lock() {

sync.acquire(1);

}

/**

* 釋放鎖的方法

*/

@Override

public void unlock() {

sync.release(1);

}

@Override

public void lockInterruptibly() throws InterruptedException {

sync.acquireInterruptibly(1);

}

@Override

public boolean tryLock() {

//調(diào)用幫助器的tryAcquire方法,測試獲取鎖一次,不會自旋

return sync.tryAcquire(1);

}

@Override

public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

//調(diào)用幫助器的tryRelease方法,測試釋放鎖一次,不會子旋

return sync.tryRelease(1);

}

@Override

public Condition newCondition() {

//調(diào)用幫助類獲取Condition對象

return sync.newCondition();

}

}

以下大綱所有的內(nèi)容群內(nèi)已經(jīng)將知識體系整理好(源碼,筆記,PPT,學習視頻)進群免費領取。

加QQ群:897889510,免費領取資料

以上大綱所有的內(nèi)容群內(nèi)已經(jīng)將知識體系整理好(源碼,筆記,PPT,學習視頻)進群免費領取。加QQ群:897889510,免費領取資料

以上大綱所有的內(nèi)容群內(nèi)已經(jīng)將知識體系整理好(源碼,筆記,PPT,學習視頻)進群免費領取。加QQ群:897889510,免費領取資料

以上大綱所有的內(nèi)容群內(nèi)已經(jīng)將知識體系整理好(源碼,筆記,PPT,學習視頻)進群免費領取。加QQ群:897889510,免費領取資料

以上大綱所有的內(nèi)容群內(nèi)已經(jīng)將知識體系整理好(源碼,筆記,PPT,學習視頻)進群免費領取。加QQ群:897889510,免費領取資料

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容