AQS簡介
AQS是J.U.C包下AbstractQueuedSynchronizer抽象的隊列式的同步器的簡稱,這是一個抽象類,它定義了一套多線程訪問共享資源的同步器框架,J.U.C包下的許多同步類實現(xiàn)都依賴于它,比如ReentrantLock/Semaphore/CountDownLatch,可以說這個抽象類是J.U.C并發(fā)包的基礎。
之所以把這一章節(jié)叫做AQS簡介而不是叫AQS詳解,是因為已經(jīng)有大神寫過詳解的文章Java并發(fā)之AQS詳解,這篇文章對AQS的源碼解析很透徹,博主讀了之后受益匪淺,鑒于對原作者的尊重,所以如上附上原文的鏈接。要想弄懂AQS還得從這一圖說起。
如上圖所述,AQS維護了一個state變量和一個FIFO先進先出隊列,這個state用來干嘛的可以參考我前一篇博客中的那個count計數(shù)器,就是用來計數(shù)線程的重入次數(shù)的。上一篇博客還用了一個變量currentThread來記錄已經(jīng)獲得這把鎖的線程。而我們的AQS用的是一個先進先出的等待隊列的完成這件事。當新的線程進來的時候,AQS調(diào)用tryAquice()方法試圖去獲得鎖,如果獲得的話,則調(diào)用interupt中斷方法;如果沒有獲得鎖,則把當前線程放入排隊的隊列,AQS隊列不斷的自旋嘗試去判斷已經(jīng)占用的線程是否已經(jīng)放開,如果鎖依然被線程繼續(xù)占用,則繼續(xù)添加進等待隊列。
源碼如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那個addWaiter方法,此方法用于將當前線程加入到等待隊列的隊尾,并返回當前線程所在的結點。
private Node addWaiter(Node mode) {
//以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享)
Node node = new Node(Thread.currentThread(), mode);
//嘗試快速方式直接放到隊尾。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//上一步失敗則通過enq入隊。
enq(node);
return node;
}
我們以獨占式的同步幫助器為例來看一下AQS的執(zhí)行流程。
大致流程如下:
調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
沒成功,則addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式;
acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
上述的流程和步驟已經(jīng)是AQS幫我們實現(xiàn)了的功能,估計我講的也不太清楚,這里再次推薦讀者閱讀這篇文章Java并發(fā)之AQS詳解,下面我們應該來看看如何使用AQS。
用AQS寫一把互斥鎖
互斥鎖是為了保證數(shù)據(jù)的安全,在任一時刻只能有一個線程訪問該對象。由上一個小節(jié)我們可知,AQS已經(jīng)為我們實現(xiàn)所有排隊和阻塞機制,我們只需要調(diào)用getState()、setState(int) 和 compareAndSetState(int, int) 方發(fā)來維護state變量的數(shù)值和調(diào)用setExclusiveOwnerThread/getExclusiveOwnerThread來維護當前占用的線程是誰就行了。翻越JDK提供的API,它建議我們:應該將子類定義為非公共內(nèi)部幫助器類,可用它們來實現(xiàn)其封閉類的同步屬性。類 AbstractQueuedSynchronizer 沒有實現(xiàn)任何同步接口。而是定義了諸如 acquireInterruptibly(int) 之類的一些方法,在適當?shù)臅r候可以通過具體的鎖和相關同步器來調(diào)用它們,以實現(xiàn)其公共方法。
什么意思呢?意思就是建議我們:如果你想要使用AQS實現(xiàn)一把互斥鎖Mutex,就必須先用一個類去繼承AbstractQueuedSynchronizer這個抽象類,然而這個實現(xiàn)的子類(暫取名叫Sync)應該是作為Mutex的內(nèi)部類來用的,提供給Mutex當作幫助器來使用。那么Lock接口,Mutex互斥鎖,AbstractQueuedSynchronizer抽象類和Sync幫助器這四者存在什么聯(lián)系呢?為了避免你聽糊涂了,下面我整理他們的UML類圖如下。
由上圖可知:Mutex互斥鎖繼承了Lock鎖的接口,具有鎖的屬性,可以提供上鎖和釋放鎖的方法,他是對外提供服務的服務者,而Mutex類有個Sync類型的私有對象sync,這個私有對象繼承了AbstractQueuedSynchronizer抽象類,是Mutex鎖和AQS的橋梁,是加鎖和釋放鎖真正的服務者。如果你看明白了上面的UML類圖,那么我們的Mutex互斥鎖的定義應該如下:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class Mutex implements Lock {
private Sync sync = new Sync();
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
// TODO Auto-generated method stub
return super.tryAcquire(arg);
}
@Override
protected boolean tryRelease(int arg) {
// TODO Auto-generated method stub
return super.tryRelease(arg);
}
}
@Override
public void lock() {
// TODO Auto-generated method stub
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public void unlock() {
// TODO Auto-generated method stub
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
?這里我們實現(xiàn)的是獨占式的鎖,Sync幫助器只需要覆蓋父類的tryAcquire(),tryRelease()方法就行了,其他方法可以暫時刪掉,如共享式的tryAcquireShared(),tryReleaseShared(),已經(jīng)Condition用到的isHeldExclusively()和toString()方法都可以暫時不用實現(xiàn),因為我們只是想先用AQS來做一把可以保證數(shù)據(jù)安全的鎖,考慮的問題暫時沒有那么多。
/**
* 互斥鎖
* @author 張仕宗
* @date 2018.11.9
*/
public class Mutex implements Lock{
//AQS子類的對象,Mutex互斥鎖用它來工作
private Sync sync = new Sync();
//Sync同步器類作為公共內(nèi)部幫助器,可用它來實現(xiàn)其封閉類的同步屬性
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1; //這里用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等于1,則直接中斷
if(this.compareAndSetState(0, 1)) { //這里做一下判斷,如果state的值為等于0,立馬將state設置為1
//返回true,告訴acqure方法,獲取鎖成功
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//釋放鎖,由于這是一把互斥鎖,state不是0就是1,所以你需要做兩步:
//1.直接將state置為0
this.setState(0);
//返回true,告訴aqs的release方法釋放鎖成功
return true;
}
}
/**
* 上鎖的方法
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
* 釋放鎖的方法
*/
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// TODO Auto-generated method stub
}
@Override
public boolean tryLock() {
// TODO Auto-generated method stub
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// TODO Auto-generated method stub
return false;
}
@Override
public Condition newCondition() {
// TODO Auto-generated method stub
return null;
}
}
?上訴代碼實現(xiàn)了一把最簡單的鎖,我們只實現(xiàn)其lock()和unlock()方法,其他方法請暫時忽略,而lock()方法和unlock()方法是如何實現(xiàn)的呢?lock()方法調(diào)用了Sync幫助器對象的sync.acquire(1)方法,由于我們的幫助器Sync并沒有實現(xiàn)這個方法,所以實際調(diào)用的是AQS的acquire()方法,而AQS這時候做了什么時呢?再來一次該方法的源碼:
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
?次方法干的第一件事就是去調(diào)用tryAcquire()方法,這個方法需要Sync來實現(xiàn),如果自己的Sync沒有實現(xiàn)這個方法的話,父類會直接拋出UnsupportedOperationException這個異常。
@Override
protected boolean tryAcquire(int arg) {
assert arg == 1; //這里用到了斷言,互斥鎖,鎖只能被獲取一次,如果arg不等于1,則直接中斷
if(this.compareAndSetState(0, 1)) { //這里做一下判斷,如果state的值為等于0,立馬將state設置為1
//返回true,告訴acqure方法,獲取鎖成功
return true;
}
return false;
}
由于這是一把互斥鎖,所以只能有同一時刻只能獲得一次鎖。代碼中用到了assert斷言,如果預獲得鎖的次數(shù)不是1,則中斷。接下來if中判斷state狀態(tài)是否為0,如果state狀態(tài)為0,則說明鎖還沒有被占用,那么我立刻占用這把鎖,判斷state當前值和設置state為1這兩步用原子性操作的代碼語句是this.compareAndSetState(0, 1),并立馬放回true,這時候AQS獲得返回值,獲得鎖成功。如果是第二個線程進來,if語句判斷得到的值非0,則直接返回false,這時候AQS將新進來的線程放進FIFO隊列排隊。
接下來看看Mutex的unlock()方法,該方法調(diào)用了sync.release(1),看看AQS這時候做了什么!
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
?此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。同樣的,我們的同步器Sync需要去實現(xiàn)這個tryRelease方法,不然同樣會拋出UnsupportedOperationException異常。Sync的tryRelease方法比較簡單:
@Override
protected boolean tryRelease(int arg) {
//釋放鎖,由于這是一把互斥鎖,state不是0就是1,所以你需要做兩步:
//1.直接將state置為0
this.setState(0);
//返回true,告訴aqs的release方法釋放鎖成功
return true;
}
?只需要設置state為0即可,由于這是一把互斥鎖,state不是0就是1所以直接調(diào)用this.setSate(0)。
用AQS寫一把重入鎖
上訴的Mutex并非一把可重入鎖,為了實現(xiàn)這把鎖能夠讓同一線程多次進來,回憶一下上一篇博客中怎么實現(xiàn)的?當時的做法是在鎖的lock()自旋方法中判斷新進來的是不是正在運行的線程,如果新進來的線程就是正在運行的線程,則獲取鎖成功,并讓計數(shù)器+1。而在釋放鎖的時候,如果釋放鎖的線程等于當前線程,讓計數(shù)器-1,只有當計數(shù)器count歸零的時候才真正的釋放鎖。同樣的,用AQS實現(xiàn)的鎖也是這個思路,那么我們的tryAcquice方法如下:
@Override
protected boolean tryAcquire(int arg) {
//如果第一個線程進來,直接獲得鎖,并設置當前獨占的線程為當前線程
int state = this.getState();
if(state == 0) { //state為0,說明當前沒有線程占用該線程
if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個線程進來,立刻設置state為arg
this.setExclusiveOwnerThread(Thread.currentThread()); //設置當前獨占線程為當前線程
return true; //告訴頂級aqs獲取鎖成功
}
} else { //如果是第二個線程進來
Thread currentThread = Thread.currentThread();//當前進來的線程
Thread ownerThread = this.getExclusiveOwnerThread();//已經(jīng)保存進去的獨占式線程
if(currentThread == ownerThread) { //判斷一下進來的線程和保存進去的線程是同一線程么?如果是,則獲取鎖成功,如果不是則獲取鎖失敗
this.setState(state+arg); //設置state狀態(tài)
return true;
}
}
return false;
}
?tryAcquice()方法代碼含義如注釋所示,與Mutex互斥鎖不同的是當state狀態(tài)不為0時我們的邏輯處理,如果第二次進來的線程currentThread和正在獨占的線程ownerThread為統(tǒng)一線程,第一步設置state增加1,第二步返回true給AQS。
tryRelease()方法代碼如下:
@Override
protected boolean tryRelease(int arg) {
//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖
if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
//如果釋放鎖的不是當前線程,則拋出異常
throw new RuntimeException();
}
int state = this.getState()-arg;
//接下來判斷state是否已經(jīng)歸零,只有state歸零的時候才真正的釋放鎖
if(state == 0) {
//state已經(jīng)歸零,做掃尾工作
this.setState(0);
this.setExclusiveOwnerThread(null);
return true;
}
this.setState(state);
return false;
}
?tryRelease()首先是獲取當前state的值,并對這個值進行欲判:如果當前值state減去sync.release()傳來的參數(shù)歸零,則真正的釋放鎖,那么我們要做的第一步是設置state為0,接著設置當前獨占的線程為null,再然后返回true告訴AQS釋放鎖成功。如果如果當前值state減去sync.release()傳來的參數(shù)歸零,如果讓state的值為state-arg相減之后的值。
目前為此,我們以來了AQS框架來改寫的重入鎖代碼如下:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 用AQS實現(xiàn)的重入鎖
* @author 張仕宗
* @date 2018.11.9
*/
public class MyAqsLock implements Lock{
//AQS子類的對象,用它來輔助MyAqsLock工作
private Sync sync = new Sync();
private class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
//如果第一個線程進來,直接獲得鎖,并設置當前獨占的線程為當前線程
int state = this.getState();
if(state == 0) { //state為0,說明當前沒有線程占用該線程
if(this.compareAndSetState(0, arg)) { //判斷當前state值,第一個線程進來,立刻設置state為arg
this.setExclusiveOwnerThread(Thread.currentThread()); //設置當前獨占線程為當前線程
return true; //告訴頂級aqs獲取鎖成功
}
} else { //如果是第二個線程進來
Thread currentThread = Thread.currentThread();//當前進來的線程
Thread ownerThread = this.getExclusiveOwnerThread();//已經(jīng)保存進去的獨占式線程
if(currentThread == ownerThread) { //判斷一下進來的線程和保存進去的線程是同一線程么?如果是,則獲取鎖成功,如果不是則獲取鎖失敗
this.setState(state+arg); //設置state狀態(tài)
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
//鎖的獲取和鎖的釋放是一一對應的,獲取過多少次鎖就釋放多少次鎖
if(Thread.currentThread() != this.getExclusiveOwnerThread()) {
//如果釋放鎖的不是當前線程,則拋出異常
throw new RuntimeException();
}
int state = this.getState()-arg;
//接下來判斷state是否已經(jīng)歸零,只有state歸零的時候才真正的釋放鎖
if(state == 0) {
//state已經(jīng)歸零,做掃尾工作
this.setState(0);
this.setExclusiveOwnerThread(null);
return true;
}
this.setState(state);
return false;
}
public Condition newCondition() {
return new ConditionObject();
}
}
/**
* 上鎖的方法
*/
@Override
public void lock() {
sync.acquire(1);
}
/**
* 釋放鎖的方法
*/
@Override
public void unlock() {
sync.release(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
//調(diào)用幫助器的tryAcquire方法,測試獲取鎖一次,不會自旋
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//調(diào)用幫助器的tryRelease方法,測試釋放鎖一次,不會子旋
return sync.tryRelease(1);
}
@Override
public Condition newCondition() {
//調(diào)用幫助類獲取Condition對象
return sync.newCondition();
}
}
以下大綱所有的內(nèi)容群內(nèi)已經(jīng)將知識體系整理好(源碼,筆記,PPT,學習視頻)進群免費領取。
加QQ群:897889510,免費領取資料