特點
- 讀讀共享
- 讀寫互斥
- 寫寫互斥
結(jié)構(gòu)

鎖的狀態(tài)表示
state
繼承AQS的類都需要使用state變量代表鎖的資源占用情況

高16位 表示讀鎖的上鎖次數(shù)(由于讀讀共享,可能由多個線程累加),
低16位 表示寫鎖的重入次數(shù), 由于寫寫互斥,讀寫互斥, 只能有一個線程重入累加而得
讀鎖 : ReadLock
lock() :加鎖
/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
Sync.tryAcquireShared():
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
// 鎖狀態(tài)
int c = getState();
/**
* exclusiveCount(c)
*
* 寫鎖的重入的次數(shù) state & 11111 1111 1111 1111
* static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
*
**/
// 如果此時有寫鎖,寫鎖重入次數(shù)應(yīng)該 > 0 ,并且不是當前線程,由于讀寫互斥,寫寫互斥,不管當前方法是讀寫還是寫鎖,這種情況都會加鎖失敗
// 如果是當前線程持有的寫鎖則 把寫鎖降級為讀鎖
// 如果當前線程并沒有寫鎖, 則直接 獲取讀鎖
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 獲取讀鎖上鎖次數(shù)
// state 右移 16位,把 用于 表示寫鎖的加鎖次數(shù) 的低16位 給 抵消掉,剩下的就是 高16位
int r = sharedCount(c);
// 如果隊列中沒有線程在排隊
// 這里會有公平非公平之分
// 公平鎖 會判斷 隊列是否有線程在排隊
// 非公平鎖 只判斷隊頭 是不是 寫鎖,是寫鎖 才會 等待
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// cas
// 讀鎖上鎖,直接 state + 1 0000 0000 0000 0000
// SHARED_UNIT 是 1 左移 16位
// 這樣就可以對表示讀鎖上鎖次數(shù)的高16位 + 1
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果上鎖次數(shù)為0,當前鎖沒有線程持有其讀鎖
if (r == 0) {
// 第一次上讀鎖的線程設(shè)置為當前線程
firstReader = current;
firstReaderHoldCount = 1;
// 第一次上讀鎖的線程重入
} else if (firstReader == current) {
// 首次上讀鎖 線程重入次數(shù) + 1
firstReaderHoldCount++;
} else {
// HoldCounter 主要有線程id和這個線程加鎖次數(shù)count, 用于統(tǒng)計某個線程 加鎖的次數(shù)
// 從緩存拿HoldCounter對象
HoldCounter rh = cachedHoldCounter;
// 如果緩存等于空 或者 緩存的線程id和當前線程id不相等
if (rh == null || rh.tid != getThreadId(current))
// 從線程的threadLocal里HoldCounter拿對象,如果沒有,會初始化HoldCounter
// 里面的count = 0,并且賦值給cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 對當前線程的HoldCounter的加鎖次數(shù)進行 ++
rh.count++;
}
return 1;
}
// 感覺跟上面代碼邏輯差不多 : cas 失敗,再來一遍
return fullTryAcquireShared(current);
}
doAcquireShared :
自旋 ,自旋失敗 入隊,阻塞
private void doAcquireShared(int arg) {
// 創(chuàng)建Node節(jié)點,節(jié)點類型為SHARED,持有當前線程
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 1. 如果當前節(jié)點的前驅(qū)節(jié)點時頭結(jié)點
// 2. 如果阻塞的線程被喚醒,由于是無限循環(huán),會繼續(xù)走到這里,頭結(jié)點永遠是空Node,這時會繼續(xù)去拿鎖。由于是讀鎖,可以接著上鎖
if (p == head) {
// 1. 自旋一次
int r = tryAcquireShared(arg);
if (r >= 0) {
// 1. 自旋成功,設(shè)置新的Node為head,并喚醒后一個線程
// 2. 第二次循環(huán)進來會接著設(shè)置新的Node為head,并喚醒下一個等待讀鎖的線程
// 直到Node的狀態(tài)不是Shared為止
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 1. 把前驅(qū)節(jié)點設(shè)置為Signal狀態(tài),阻塞自身節(jié)點
// 2. 如果線程被叫醒,會繼續(xù)循環(huán)
// 3. 以此類推,一直喚醒等待讀鎖的線程,直到遇到不是SHARED狀態(tài)的節(jié)點。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
讀鎖 Node 結(jié)構(gòu) :
{thread:線程id,nextWaiter = new Node() },沒有waitStatus
setHeadAndPropagate
設(shè)置新的頭節(jié)點,并釋放后面的SHARED狀態(tài)的節(jié)點
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果當前節(jié)點的后面一個節(jié)點是Shared狀態(tài)
if (s == null || s.isShared())
// 釋放后 一個節(jié)點
doReleaseShared();
}
}
doReleaseShared : 釋放下一個線程(head.next)
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 叫醒下一個線程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor(h) :喚醒下一個線程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 找到最近一個非CANCELLED(1)狀態(tài)的線程
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 喚醒它
LockSupport.unpark(s.thread);
}
總結(jié)
- 讀鎖加鎖時,如果有寫鎖持有鎖則上鎖失敗,進入隊列排隊, 如果是讀鎖則可以共享這把鎖,不用排隊,只是多了一個計數(shù)的步驟。
- 首次加鎖失敗,入隊 后,如果自己是head, 會搶鎖一次,搶鎖成功,會喚醒隊列后面所有SHARED節(jié)點
unlock() : 解鎖
讀鎖狀態(tài) - 1,并判斷讀鎖是否完全釋放
public final boolean releaseShared(int arg) {
// 讀鎖狀態(tài) - 1。并判斷是否完全釋放
if (tryReleaseShared(arg)) {
// 如果讀鎖完全釋放,那么喚醒下一個 等待寫鎖的線程 醒來搶寫鎖
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(int unused)
state讀鎖狀態(tài)-1,其他讀鎖上鎖次數(shù) -1
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果當前線程是第一個上讀鎖的線程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 如果第一次上讀鎖的線程只上過一次讀鎖
if (firstReaderHoldCount == 1)
// 清空第一次上讀鎖的線程
firstReader = null;
else
// 第一次上讀鎖的線程上鎖次數(shù) --
firstReaderHoldCount--;
} else {
// 否則,從ThreadLocal中取出上讀鎖次數(shù)-1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 高16讀鎖狀態(tài)位 - 1 = (- 1 << 16 : 1 0000 0000 0000 0000)
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 返回 是不是 讀鎖完全釋放。 讀鎖狀態(tài)為 = 0
return nextc == 0;
}
}
doReleaseShared() : 釋放下一個線程(head.next)
這個方法和讀鎖獲取到鎖之后,喚醒隊列中下一個等待讀鎖的線程是一樣的。如果下一個是SHARED狀態(tài)的話,會調(diào) doReleaseShared()。
而這里,如果讀鎖全部釋放,就會喚醒 隊列中 下一個等待寫鎖的 線程來獲取寫鎖。
總結(jié)
讀鎖被喚醒時, 如果當前被持有鎖是讀鎖,則自己也可以共享這把鎖,并且喚醒隊列中自己后面的一個狀態(tài)SHARED的節(jié)點來搶鎖。 以此類推,隊列中最近的的所有等待讀鎖的線程都會被喚醒,并共享這把鎖。
比如 W 表示寫鎖,R表示 讀鎖 隊列中R1,R2,R3,R4,W1,R5 :
R1被喚醒, 搶到讀鎖時,會喚醒 R2,R3,R4 來共享鎖,但是 R5不會,因為中間隔著一把寫鎖W1
寫鎖 : WriteLock
lock() :加鎖
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
// 嘗試加鎖
if (!tryAcquire(arg) &&
// 嘗試加鎖失敗,入隊等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg) :嘗試加鎖
這個方法和ReentrantLock實現(xiàn)的方法有一點點不一樣 :
只要有讀鎖或者寫鎖(不是自己重入)就加鎖失敗
cas更改state失敗表示加鎖失敗
如果是公平鎖隊列中有線程在排隊,則加鎖失敗
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
// 獲取到state
int c = getState();
// 獲取寫鎖上鎖次數(shù)
int w = exclusiveCount(c);
// 如果有讀鎖或者寫鎖
if (c != 0) {
// w==0表示沒有寫鎖,但是c!=0,所以只有讀鎖
// 或者 : current != getExclusiveOwnerThread() 有寫鎖,但是別的線程上的,不是自己重入的情況
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 第一個判斷 沒return false的情況下,那么就是 沒有讀鎖,并且 有寫鎖,但是是 當前線程重入
// 自己重入, 寫狀態(tài) + 1
setState(c + acquires);
return true;
}
// 能走到這,就說明 c==0,因為 c!=0 在上面這個判斷中 是肯定會返回的
// writerShouldBlock() 判斷是否應(yīng)該排隊,公平鎖返回的是 隊列中是否有線程,非公平鎖恒返回false
if (writerShouldBlock() ||
// cas失敗
!compareAndSetState(c, c + acquires))
return false;
// 否則加鎖成功
setExclusiveOwnerThread(current);
return true;
}
加鎖失敗,則入隊睡眠
這個和 ReentrantLock是一樣的
寫鎖 Node 結(jié)構(gòu) :和ReentrantLock的鎖結(jié)構(gòu)一樣
總結(jié)
當(讀鎖 不存在 并且 沒有寫鎖 ) 或者 寫鎖 只有 自己重入才能加鎖成功
unlock():解鎖
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 嘗試解鎖,返回是否釋放鎖的結(jié)果
if (tryRelease(arg)) {
Node h = head;
// 如果寫鎖已經(jīng)完全釋放,喚醒隊列中的頭節(jié)點.next
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(int releases)
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 低16位寫鎖狀態(tài)位 減 1
int nextc = getState() - releases;
// 是否完全釋放,低16位 == 0 : 重入鎖需要解鎖多次
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 完全釋放,設(shè)置為無鎖狀態(tài)
setExclusiveOwnerThread(null);
// 重新設(shè)置寫鎖狀態(tài)位
setState(nextc);
return free;
}
總結(jié)
寫鎖解鎖 減 低16位寫狀態(tài)就行,重入鎖需多次解鎖,直至完全釋放,才會喚醒隊列中等待的線程。
鎖降級
在ReentrantReadWriteLock 是支持鎖降級的。但不支持鎖的升級。
鎖降級
從寫鎖 降級到讀鎖 :
為什么可以 ?
因為是從寫鎖降級到讀鎖,寫鎖只能由一個線程持有,此時降級為讀鎖,其他線程上讀鎖或者寫鎖,因為此時寫鎖的狀態(tài)還沒釋放。所以自始至終只可能存在同一個線程在同時持有讀鎖和者寫鎖。
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 是否有寫鎖
if (exclusiveCount(c) != 0 &&
// 有寫鎖,但是 寫鎖持有線程就是當前線程,鎖降級到讀鎖
getExclusiveOwnerThread() != current)
return -1;
// 下面就是上讀鎖 邏輯了
int r = sharedCount(c);
// ................ 省略部分代碼
return fullTryAcquireShared(current);
}
為什么不支持鎖升級
可能會造成死鎖。
鎖升級 : 是讀鎖升級到寫鎖
假設(shè)有 A,B 和 C 三個線程,它們都已持有讀鎖。假設(shè)線程 A 嘗試從讀鎖升級到寫鎖。那么它必須等待 B 和 C 釋放掉已經(jīng)獲取到的讀鎖。如果隨著時間推移,B 和 C 逐漸釋放了它們的讀鎖,此時線程 A 確實是可以成功升級并獲取寫鎖。
但是我們考慮一種特殊情況。假設(shè)線程 A 和 B 都想升級到寫鎖,那么對于線程 A 而言,它需要等待其他所有線程,包括線程 B 在內(nèi)釋放讀鎖。而線程 B 也需要等待所有的線程,包括線程 A 釋放讀鎖。這就是一種非常典型的死鎖的情況。誰都愿不愿意率先釋放掉自己手中的鎖。