AQS是JDK并發(fā)包下的一個(gè)基礎(chǔ)類,或者模板類,JUC下的很多工具類都是基于它實(shí)現(xiàn)的。其內(nèi)部:
- 依靠?jī)?nèi)部維護(hù)一個(gè)FIFO的等待隊(duì)列
- 對(duì)于只依賴一個(gè)原子值來(lái)表示狀態(tài)的同步機(jī)制來(lái)說(shuō)很有用
- 子類繼承AQS來(lái)修改原子狀態(tài),從而實(shí)現(xiàn)相關(guān)同步操作。
- getState()
- setState()
- compareAndSetState()
AQS支持排它模式和共享模式,等待在不同線程的共享同一個(gè)FIFO隊(duì)列,目前大部分都會(huì)只實(shí)現(xiàn)一種機(jī)制,排它或者共享,JDK中有一個(gè)類同時(shí)實(shí)現(xiàn)了兩種機(jī)制:ReadWriteLock
想要使用當(dāng)前的類,重定義其幾個(gè)方法:
- tryAcquire 嘗試獲取獨(dú)占鎖,
- tryRelease 嘗試釋放排它鎖
- tryAcquireShared 嘗試獲取共享鎖,返回負(fù)數(shù)代表失敗,返回0代表當(dāng)前的鎖獲取成功,但是后續(xù)無(wú)法獲取,返回正數(shù),代表后續(xù)的節(jié)點(diǎn)仍然可以繼續(xù)獲取
- tryReleaseShared 嘗試釋放共享鎖
- isHeldExclusively 是否排它狀態(tài)
獲取和釋放操作本質(zhì)即變更state。
再真正調(diào)用的時(shí)候則使用,AQS內(nèi)部的方法,其會(huì)調(diào)用我們定義的幾個(gè)方法,使用時(shí):
- acquire\acquireInterruptibly\tryAcquireNanos 獲取排它鎖
- acquireShared\acquireSharedInterruptibly\tryAcquireSharedNanos 獲取共享鎖
- release\releaseShared 釋放鎖
例如排它鎖的實(shí)現(xiàn)形式:
Acquire:
while (!tryAcquire(arg)) {
// enqueue thread if it is not already queued;
如果還沒(méi)有加入隊(duì)列,那么將當(dāng)前線程加入隊(duì)列
// possibly block current thread;
可能block當(dāng)前的線程
}
Release:
if (tryRelease(arg))
unblock the first queued thread;
// 解鎖第一個(gè)入隊(duì)列的線程
JUC的CountdownLatch和ReentrantLock給了我們一個(gè)很經(jīng)典的實(shí)現(xiàn)方式,后面我們?cè)俜治銎渚唧w實(shí)現(xiàn)。當(dāng)前看下CountdownLatch代碼:
public class CountDownLatch {
// 實(shí)現(xiàn)同步機(jī)制
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
}
下次再分析下AQS的具體方法含義。