前言
ReentrantReadWriteLock可以說(shuō)是最復(fù)雜的鎖實(shí)現(xiàn)類(lèi),這篇文章帶你弄懂ReentrantReadWriteLock實(shí)現(xiàn)讀寫(xiě)鎖的所有細(xì)節(jié),在閱讀本文之前,讀者需要了解一些前置的知識(shí)點(diǎn),比如AQS,公平鎖以及非公平鎖,CAS等等,了解了這些知識(shí)點(diǎn)后,可以更加輕松的理解ReentrantReadWriteLock,因?yàn)镽eentrantReadWriteLock是在這些的基礎(chǔ)上構(gòu)建的。
ReentrantReadWriteLock總體邏輯
AQS中維護(hù)這一個(gè)阻塞線程隊(duì)列,在該阻塞線程隊(duì)列為空時(shí),讀線程可以同時(shí)獲取讀鎖,在所有讀線程釋放讀鎖之前,如果有寫(xiě)線程想要獲取寫(xiě)鎖,則阻塞該線程,并加入阻塞隊(duì)列中。后續(xù)的讀寫(xiě)線程在第一個(gè)寫(xiě)線程之前的所有讀線程釋放鎖之前都會(huì)通過(guò)尾插的方式加入阻塞隊(duì)列。當(dāng)?shù)谝粋€(gè)寫(xiě)線程之前的所有讀線程釋放鎖后,才會(huì)陸續(xù)喚醒隊(duì)列中的線程嘗試加鎖。
ReentrantReadWriteLock的基本內(nèi)部單元
Sync繼承AQS,讀寫(xiě)鎖的核心業(yè)務(wù)邏輯都在AQS中,不管是公平鎖還是非公平鎖,讀鎖還是寫(xiě)鎖,他們都只是調(diào)用Sync中的方法實(shí)現(xiàn)公平競(jìng)爭(zhēng)和非公平競(jìng)爭(zhēng),加鎖和解鎖的業(yè)務(wù)邏輯。寫(xiě)鎖調(diào)用Sync#tryAcquire()方法加鎖,調(diào)用Sync#tryRelease()方法解鎖。讀鎖調(diào)用Sync#tryAcquireShared()加鎖,調(diào)用Sync#tryReleaseShared()加鎖。而這些方法的本質(zhì)都是通過(guò)改變AQS#state的值來(lái)改變鎖狀態(tài)。
// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫(xiě)鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 抽象靜態(tài)內(nèi)部類(lèi),繼承AQS,讀寫(xiě)鎖的核心的業(yè)務(wù)邏輯都在Sync類(lèi)中
final Sync sync;
// 非公平鎖,繼承Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false;
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
// 公平鎖,繼承Sync
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
// 讀鎖
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
// 構(gòu)造函數(shù)
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 讀鎖的加鎖方法
public void lock() {
sync.acquireShared(1);
}
// 讀鎖的解鎖方法
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
// 構(gòu)造函數(shù)
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 寫(xiě)鎖加鎖方法
public void lock() {
sync.acquire(1);
}
// 寫(xiě)鎖解鎖方法
public void unlock() {
sync.release(1);
}
}
下面來(lái)看下抽象內(nèi)部類(lèi)Sync的基本組成,Java中不管哪個(gè)并發(fā)類(lèi)都是通過(guò)定義AQS中int類(lèi)型的狀態(tài)碼state實(shí)現(xiàn)的。ReentrantReadWriteLock將狀態(tài)碼state分為高16位和低16位。狀態(tài)碼state的高16位用于存儲(chǔ)并發(fā)讀取的線程數(shù)(讀線程重入的次數(shù)也加入計(jì)數(shù)),即狀態(tài)碼state的高16位=(線程1的重入次數(shù)+線程2的重入次數(shù)+...+線程n的重入次數(shù))。狀態(tài)碼state的低16位用于存儲(chǔ)單個(gè)寫(xiě)線程的重入次數(shù)(寫(xiě)鎖為互斥鎖,只有一個(gè)線程能獲取寫(xiě)鎖)。
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
// 用于高16加一操作(高16位用于存儲(chǔ)并發(fā)讀取的線程數(shù))
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 最大并發(fā)讀取的線程數(shù)(讀線程的重入也加入計(jì)數(shù)) = 最大的單個(gè)寫(xiě)線程的重入次數(shù) = 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 16位的二進(jìn)制1,用于計(jì)算低16位的寫(xiě)單個(gè)寫(xiě)進(jìn)程沖入次數(shù)
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 返回高16位并發(fā)讀的線程數(shù)
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回低16位單個(gè)寫(xiě)進(jìn)程重入次數(shù)
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 用于保存線程id和對(duì)應(yīng)的重入次數(shù)
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
// 繼承自ThreadLocal,用于保存除了第一個(gè)讀線程外,其他讀取線程各自的重入次數(shù)
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 用于保存除了第一個(gè)讀線程外,其他讀取線程的重入次數(shù)
private transient ThreadLocalHoldCounter readHolds;
// 用于上一次讀讀線程的id和重入次數(shù)
private transient HoldCounter cachedHoldCounter;
// 用于報(bào)錯(cuò)第一個(gè)讀線程對(duì)象
private transient Thread firstReader = null;
// 用于保存第一個(gè)讀線程的重入次數(shù)
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
// 寫(xiě)線程解鎖方法
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// 寫(xiě)線程加鎖方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 讀線程解鎖方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 讀線程加鎖方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
}
ReentrantReadWriteLock初始化過(guò)程
下面先來(lái)看下ReentrantReadWriteLock的初始化過(guò)程的源碼
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
}
}
/*ReentrantReadWriteLock#ReentrantReadWriteLock()*/
public ReentrantReadWriteLock() {
this(false);
}
/*ReentrantReadWriteLock#ReentrantReadWriteLock(boolean fair)*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/*ReentrantReadWriteLock$ReadLock#ReadLock(ReentrantReadWriteLock lock)*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/*ReentrantReadWriteLock$WriteLock#WriteLock(ReentrantReadWriteLock lock)*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
我們可以看到,默認(rèn)初始化為非公平鎖。我們看讀寫(xiě)鎖的初始化邏輯,他們都傳入了當(dāng)前ReentrantReadWriteLock對(duì)lock,并將sync參數(shù)設(shè)置為lock.sync。讀寫(xiě)鎖的業(yè)務(wù)邏輯都是通過(guò)調(diào)用sync中的方法實(shí)現(xiàn),這里就可以看出sync才是實(shí)現(xiàn)讀寫(xiě)鎖的關(guān)鍵類(lèi)。
ReentrantReadWriteLock讀鎖加鎖過(guò)程
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
readLock.lock();
}
}
/*ReentrantReadWriteLock$ReadLock#lock()*/
public void lock() {
sync.acquireShared(1);
}
/*AbstractQueuedSynchronizer#acquireShared()*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/*ReentrantReadWriteLock$Sync#tryAcquireShared()*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 獲取AQS中的狀態(tài)碼state
int c = getState();
// 如果低16位的獨(dú)占寫(xiě)線程的重入次數(shù)不為0,并且獨(dú)占線程不等于當(dāng)前線程則返回-1。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 獲取高16位所有讀線程的重入次數(shù)之和
int r = sharedCount(c);
// 如果阻塞隊(duì)列為空或者阻塞隊(duì)列第一個(gè)線程獲取讀鎖&&高16位所有讀線程的重入次數(shù)之和小于65535則當(dāng)前線程允許獲取讀鎖&&使用CAS將高16位所有讀線程的重入次數(shù)之和加一成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第一個(gè)獲取讀鎖的線程,則使用firstReader記錄第一個(gè)獲取讀鎖的線程對(duì)象,并設(shè)置第一個(gè)獲取讀鎖的線程的重入次數(shù)firstReaderHoldCount為1。
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果當(dāng)前線程是第一次獲取鎖的線程
} else if (firstReader == current) {
// 將第一次獲取鎖的線程的重入次數(shù)加1
firstReaderHoldCount++;
} else {
// 如果不是第一個(gè)獲取讀鎖的線程
// 獲取上一次獲取讀鎖的線程的相關(guān)記錄(上次獲取讀鎖的線程的id和重入次數(shù))
HoldCounter rh = cachedHoldCounter;
// 如果沒(méi)有上一個(gè)獲取獲取讀鎖的線程的信息(第一個(gè)獲取讀鎖的線程不算)||上一個(gè)獲取讀鎖的線程id不等于當(dāng)前的線程的id
if (rh == null || rh.tid != getThreadId(current))
// 從TreadLocal中獲取當(dāng)前線程的信息(重入信息),將上次獲取讀鎖的線程信息設(shè)置為當(dāng)前獲取讀鎖的線程信息。
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 當(dāng)前線程的重入次數(shù)加1
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
非公平獲取讀鎖:在線程獲取讀鎖的過(guò)程中,有一步很關(guān)鍵,就是通過(guò)NonfairSync#readerShouldBlock()判斷阻塞隊(duì)列的對(duì)頭的等待線程是請(qǐng)求讀鎖還是寫(xiě)鎖。如果對(duì)頭的等待線程請(qǐng)求的是讀鎖,則當(dāng)前線程不需要加入阻塞隊(duì)列,可以非公平的競(jìng)爭(zhēng)讀鎖。如果對(duì)頭的等待線程獲取的寫(xiě)鎖,則當(dāng)前線程加入阻塞隊(duì)列并掛起。
/*ReentrantReadWriteLock$NonfairSync#readerShouldBlock()*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
/*AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive()*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
/*AbstractQueuedSynchronizer#isShared()*/
final boolean isShared() {
return nextWaiter == SHARED;
}
ReentrantReadWriteLock寫(xiě)鎖加鎖過(guò)程
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
}
}
/*ReentrantReadWriteLock$WriteLock#lock()*/
public void lock() {
sync.acquire(1);
}
/*AbstractQueuedSynchronizer#acquire()*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/*ReentrantReadWriteLock$Sync#tryAcquire()*/
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 獲取AQS中的狀態(tài)碼state
int c = getState();
// 獲取低16位獨(dú)占線程的重入次數(shù)
int w = exclusiveCount(c);
if (c != 0) {
// 如果獨(dú)占線程的重入次數(shù)w為0(狀態(tài)碼c不為0,獨(dú)占線程的重入次數(shù)w為0說(shuō)明當(dāng)前有讀鎖還沒(méi)釋放,不能加寫(xiě)鎖)|| 獨(dú)占線程的重入次數(shù)w為不為零但是當(dāng)前線程不是獲得寫(xiě)鎖得獨(dú)占線程。
if (w == 0 || current != getExclusiveOwnerThread())
// 加鎖失敗
return false;
// 獨(dú)占線程的重入次數(shù)w加1后如果大于最大得重入次數(shù),則拋出異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 獨(dú)占線程的重入次數(shù)w加1,不需要CAS進(jìn)行加一,因?yàn)槭仟?dú)占線程地重入(不存在并發(fā))
setState(c + acquires);
// 加鎖成功
return true;
}
// 在狀態(tài)碼c=0時(shí),說(shuō)明當(dāng)前沒(méi)有任何線程獲取鎖。寫(xiě)線程可以直接獲取寫(xiě)鎖,不需要阻塞,所以writerShouldBlock()直接返回false, 并使用CAS對(duì)寫(xiě)?yīng)氄季€程的重入次數(shù)加1
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 設(shè)獨(dú)占線程為當(dāng)前線程
setExclusiveOwnerThread(current);
return true;
}
/*ReentrantReadWriteLock$NonfairSync#writerShouldBlock*/
final boolean writerShouldBlock() {
return false; // writers can always barge
}
寫(xiě)鎖的加鎖過(guò)程比較簡(jiǎn)單,直接注釋就能看懂,這里不在贅述。
ReentrantReadWriteLock讀鎖阻塞過(guò)程
線程在獲取讀鎖時(shí),有兩種情況會(huì)導(dǎo)致線程阻塞:
- 當(dāng)前已經(jīng)有線程獲取寫(xiě)鎖,并且還沒(méi)有釋放寫(xiě)鎖。
- 已經(jīng)有線程獲取了讀鎖同時(shí)沒(méi)有釋放讀鎖并且阻塞隊(duì)列的第一個(gè)阻塞線程請(qǐng)求寫(xiě)鎖。
第一種情況很好理解,當(dāng)已經(jīng)有線程獲取了寫(xiě)鎖時(shí),任何其他線程嘗試加鎖都會(huì)失敗并并加入阻塞隊(duì)列。我們下面來(lái)看下第二種情況。
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();
// 線程1獲取讀鎖
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
}
}, "Thread1-tryRead");
thread1.start();
// 線程2獲取寫(xiě)鎖
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
writeLock.lock();
}
}, "Thread2-tryWrite");
thread2.start();
// 線程3獲取讀鎖
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
}
}, "Thread3-tryRead");
thread3.start();
}
}
加入阻塞隊(duì)列。head指向阻塞隊(duì)列的頭節(jié)點(diǎn)(頭節(jié)點(diǎn)不存具體阻塞線程的信息,只是用來(lái)指向阻塞隊(duì)列的第一個(gè)阻塞線程對(duì)應(yīng)的Node),head指向隊(duì)列的節(jié)點(diǎn),tail指向隊(duì)列的尾節(jié)點(diǎn)。
我們先開(kāi)看下thread請(qǐng)求寫(xiě)鎖的失敗后的操作,即調(diào)用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的源碼,下面來(lái)看下阻塞隊(duì)列的插入過(guò)程addWaiter(Node.EXCLUSIVE)。Node.EXCLUSIVE為空,表示當(dāng)前阻塞線程請(qǐng)求的是寫(xiě)鎖。
/*ReentrantReadWriteLock$WriteLock#lock()*/
public void lock() {
sync.acquire(1);
}
/*AbstractQueuedSynchronizer#acquire()*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/*AbstractQueuedSynchronizer#acquire()*/
private Node addWaiter(Node mode) {// Node.EXCLUSIVE(Node EXCLUSIVE = null);
// 為當(dāng)前阻塞線程創(chuàng)建Node節(jié)點(diǎn)用于存儲(chǔ)阻塞線程的基本信息
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果尾節(jié)點(diǎn)為空,則創(chuàng)建首尾節(jié)點(diǎn),并使用尾插法在尾節(jié)點(diǎn)tail后面插入當(dāng)前node
enq(node);
return node;
}
/*AbstractQueuedSynchronizer$Node#Node(Thread thread, Node mode)*/
Node(Thread thread, Node mode) {
// nextWaiter = null代表當(dāng)前阻塞線程請(qǐng)求獲取寫(xiě)鎖
this.nextWaiter = mode;
// 用于thread用于存儲(chǔ)阻塞線程對(duì)象
this.thread = thread;
}
/*AbstractQueuedSynchronizer$Node#enq(final Node node)*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 使用CAS使head指向新建的頭節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
// 由于當(dāng)前阻塞隊(duì)列只有一個(gè)頭節(jié)點(diǎn),因此頭節(jié)點(diǎn)就是尾節(jié)點(diǎn)
tail = head;
} else {
// 新的阻塞節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)指尾節(jié)點(diǎn)tail
node.prev = t;
// 使用CAS將尾節(jié)點(diǎn)指針tail指向當(dāng)前節(jié)點(diǎn)
if (compareAndSetTail(t, node)) {
// 前任尾節(jié)點(diǎn)的后一個(gè)節(jié)點(diǎn)指向新的尾節(jié)點(diǎn)
t.next = node;
return t;
}
}
}
}
addWaiter(Node.EXCLUSIVE)執(zhí)行完后,節(jié)點(diǎn)插入到阻塞隊(duì)列,如下圖所示,head指向阻塞隊(duì)列的頭節(jié)點(diǎn)(頭節(jié)點(diǎn)不存具體阻塞線程的信息,只是用來(lái)指向阻塞隊(duì)列的第一個(gè)阻塞線程對(duì)應(yīng)的Node),head指向隊(duì)列的節(jié)點(diǎn),tail指向隊(duì)列的尾節(jié)點(diǎn)。在線程2對(duì)應(yīng)的阻塞節(jié)點(diǎn)插入阻塞隊(duì)列后,阻塞隊(duì)列如下圖所示:
線程2對(duì)應(yīng)的阻塞節(jié)點(diǎn)插入到阻塞隊(duì)列后,線程2還未掛起。還要繼續(xù)執(zhí)行acquireQueued(node, arg)方法,下面來(lái)看下acquireQueued(final Node node, int arg)源碼。
/*AbstractQueuedSynchronizer#acquireQueued(final Node node, int arg)*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 獲取當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)
final Node p = node.predecessor();
// 前一個(gè)節(jié)點(diǎn)如果是頭節(jié)點(diǎn)則再次嘗試獲取寫(xiě)鎖
if (p == head && tryAcquire(arg)) {
// 如果寫(xiě)鎖獲取成功,將當(dāng)前線程對(duì)應(yīng)的阻塞節(jié)點(diǎn)從阻塞隊(duì)列中移除
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire(p,node)會(huì)將當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的waitStatus設(shè)為-1,并再次進(jìn)去for循環(huán),如果還是不能獲取寫(xiě)鎖則掛起當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued(final Node node, int arg)方法會(huì)進(jìn)行兩次for循環(huán)讓線程2嘗試獲取寫(xiě)鎖,當(dāng)然嘗試獲取寫(xiě)鎖的前提的線程2對(duì)應(yīng)的阻塞節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn)。顯然這里滿足這個(gè)條件,于是線程2會(huì)兩次嘗試獲取寫(xiě)鎖,但是還是以失敗告終,線程2最終調(diào)用parkAndCheckInterrupt()掛起。在線程2掛起后阻塞隊(duì)列圖下圖所示。
最后我們來(lái)看看線程3獲取讀鎖的過(guò)程,判斷線程3是否需要阻塞的條件是隊(duì)列的第一個(gè)阻塞線程請(qǐng)求的是否是讀鎖,如果請(qǐng)求的是讀鎖,線程3嘗試獲取讀鎖(非公平)。如果請(qǐng)求的是寫(xiě)鎖,則線程3需要阻塞。
/*ReentrantReadWriteLock$ReadLock#lock()*/
public void lock() {
sync.acquireShared(1);
}
/*AbstractQueuedSynchronizer#acquireShared()*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/*ReentrantReadWriteLock$Sync#tryAcquireShared()*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 獲取AQS中的狀態(tài)碼state
int c = getState();
// 如果低16位的獨(dú)占寫(xiě)線程的重入次數(shù)不為0,并且獨(dú)占線程不等于當(dāng)前線程則返回-1。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 獲取高16位所有讀線程的重入次數(shù)之和
int r = sharedCount(c);
// 如果阻塞隊(duì)列為空或者阻塞隊(duì)列第一個(gè)線程獲取讀鎖&&高16位所有讀線程的重入次數(shù)之和小于65535則當(dāng)前線程允許獲取讀鎖&&使用CAS將高16位所有讀線程的重入次數(shù)之和加一成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
/*ReentrantReadWriteLock$NonfairSync#readerShouldBlock()*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
/*AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive()*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
/*AbstractQueuedSynchronizer#isShared()*/
final boolean isShared() {
return nextWaiter == SHARED;
}
顯然,當(dāng)前阻塞隊(duì)列的第一個(gè)阻塞線程2嘗試獲取的是寫(xiě)鎖,接下來(lái)運(yùn)行阻塞方法doAcquireShared(arg); doAcquireShared(arg)會(huì)將線程3對(duì)應(yīng)的阻塞節(jié)點(diǎn)插入到阻塞隊(duì)列中,并兩次嘗試獲取讀鎖,如果還是獲取不到,則掛起當(dāng)前線程。顯然這里線程3最后還是會(huì)獲取讀鎖失敗。
/*AbstractQueuedSynchronizer#doAcquireShared()*/
private void doAcquireShared(int arg) {
// 插入當(dāng)前線程對(duì)應(yīng)的阻塞節(jié)點(diǎn)到阻塞隊(duì)列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果當(dāng)前線程對(duì)應(yīng)的阻塞節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn),直接嘗試獲取讀鎖
if (p == head) {
// 獲取讀鎖成功
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire(p,node)會(huì)將當(dāng)前節(jié)點(diǎn)的前一個(gè)節(jié)點(diǎn)的waitStatus設(shè)為-1,并再次進(jìn)去for循環(huán),如果還是不能獲取讀鎖則掛起當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/*AbstractQueuedSynchronizer#acquire()*/
private Node addWaiter(Node mode) {// Node.SHARED(Node SHARED = new Node();)
// 為當(dāng)前阻塞線程創(chuàng)建Node節(jié)點(diǎn)用于存儲(chǔ)阻塞線程的基本信息
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 在尾部插入線程3的阻塞節(jié)點(diǎn)
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
在線程3掛起后阻塞隊(duì)列如下圖所示。
總結(jié)
以上就是ReentrantReadWriteLock的獲取讀寫(xiě)鎖的全部?jī)?nèi)容,關(guān)于解鎖部分的原理,這里不在贅述。需要注意一下幾點(diǎn):
- 如果已經(jīng)有線程獲取了寫(xiě)鎖,則其他任何線程如果嘗試獲取讀寫(xiě)鎖都會(huì)失敗。
- 如果已經(jīng)有線程獲取了讀鎖,其他線程嘗試獲取讀鎖時(shí),需要判斷阻塞隊(duì)列的第一個(gè)阻塞線程嘗試獲取的鎖的類(lèi)型,如果是讀鎖,則可以非公平的競(jìng)爭(zhēng)讀鎖。如果是寫(xiě)鎖,則加入阻塞隊(duì)列,在兩次嘗試獲取讀鎖失敗后被掛起。