
1.使用示例
public class ReentrantReadWriteLockTest {
static class Queue3{
private Object data = null;//共享數(shù)據(jù),只能有一個(gè)線程能寫該數(shù)據(jù),但可以有多個(gè)線程同時(shí)讀該數(shù)據(jù)。
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
public void get(){
rwl.readLock().lock();//上讀鎖,其他線程只能讀不能寫
System.out.println(Thread.currentThread().getName() + " be ready to read data!");
try {
Thread.sleep((long)(Math.random()*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "have read data :" + data);
rwl.readLock().unlock(); //釋放讀鎖,最好放在finnaly里面
}
public void put(Object data){
rwl.writeLock().lock();//上寫鎖,不允許其他線程讀也不允許寫
System.out.println(Thread.currentThread().getName() + " be ready to write data!");
try {
Thread.sleep((long)(Math.random()*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
this.data = data;
System.out.println(Thread.currentThread().getName() + " have write data: " + data);
rwl.writeLock().unlock();//釋放寫鎖
}
}
public static void main(String[] args) {
final Queue3 q3 = new Queue3();
for(int i=0;i<3;i++)
{
new Thread(){
@Override
public void run(){
while(true){
q3.get();
}
}
}.start();
}
for(int i=0;i<3;i++)
{
new Thread(){
@Override
public void run(){
while(true){
q3.put(new Random().nextInt(10000));
}
}
}.start();
}
}
}
2.整體調(diào)用結(jié)構(gòu)
2.1 讀鎖寫鎖共用一個(gè)Sync
默認(rèn)情況下是非公平鎖:
public ReentrantReadWriteLock() {
this(false);
}
讀鎖和寫鎖的Sync最后指向的是同一個(gè)Sync,形式上分離,但是實(shí)現(xiàn)上是共用同一個(gè)Sync。
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
2.2 讀鎖加鎖、解鎖
public void lock() {
sync.acquireShared(1);
}
調(diào)用的是AQS的acquireShared,與ReentrantLock一樣:
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
區(qū)別還在Sync.tryAcquireShared。
public void unlock() {
sync.releaseShared(1);
}
調(diào)用的是AQS的releaseShared:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
區(qū)別還在Sync.tryReleaseShared。
2.3 寫鎖加鎖、解鎖
public void lock() {
sync.acquire(1);
}
調(diào)用的是AQS的acquire:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
區(qū)別在Sync.tryAcquire。
public void unlock() {
sync.release(1);
}
調(diào)用AQS的release:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
區(qū)別在Sync.tryRelease。
3.讀寫鎖是怎樣分離的?狀態(tài)怎么分開表示?
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/*
* Read vs write count extraction constants and functions.
* Lock state is logically divided into two unsigned shorts:
* The lower one representing the exclusive (writer) lock hold count,
* and the upper the shared (reader) hold count.
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
讀寫鎖需要在AQS的state上面維護(hù)多個(gè)讀線程和一個(gè)寫線程的同步狀態(tài),其具體的實(shí)現(xiàn)方式是:
- 高16位表示讀
- 低16位表示寫
getWriteHoldCount()返回寫鎖被獲取的次數(shù):
public int getWriteHoldCount() {
return sync.getWriteHoldCount();
}
final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}
getReadLockCount方法返回當(dāng)前讀鎖被獲取的次數(shù),其不一定等于獲取讀鎖的線程數(shù),因?yàn)橐粋€(gè)線程可能重復(fù)獲取。
final int getReadLockCount() {
return sharedCount(getState());
}
getReadHoldCount獲取當(dāng)前線程獲取讀鎖的次數(shù):
public int getReadHoldCount() {
return sync.getReadHoldCount();
}
final int getReadHoldCount() {
if (getReadLockCount() == 0)
return 0;
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == getThreadId(current))
return rh.count;
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}
3.1 getReadHoldCount涉及的三個(gè)機(jī)制
3.1.1 firstReader和firstReaderHoldCount
/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*
* <p>More precisely, firstReader is the unique thread that last
* changed the shared count from 0 to 1, and has not released the
* read lock since then; null if there is no such thread.
*
* <p>Cannot cause garbage retention unless the thread terminated
* without relinquishing its read locks, since tryReleaseShared
* sets it to null.
*
* <p>Accessed via a benign data race; relies on the memory
* model's out-of-thin-air guarantees for references.
*
* <p>This allows tracking of read holds for uncontended read
* locks to be very cheap.
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
firstReader是第一個(gè)獲取讀鎖的線程,更精確地說,是最近一次將共享count從0變?yōu)?的線程,并且未釋放讀鎖,如果釋放了,則為null。firstReaderHoldCount是該線程獲取讀鎖的次數(shù)。
tryAcquireShared中firstReader和firstReaderHoldCount更新:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
tryReleaseShared中firstReader和firstReaderHoldCount更新:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
3.1.2 cachedHoldCounter簡單緩存機(jī)制
/**
* The hold count of the last thread to successfully acquire
* readLock. This saves ThreadLocal lookup in the common case
* where the next thread to release is the last one to
* acquire. This is non-volatile since it is just used
* as a heuristic, and would be great for threads to cache.
*
* <p>Can outlive the Thread for which it is caching the read
* hold count, but avoids garbage retention by not retaining a
* reference to the Thread.
*
* <p>Accessed via a benign data race; relies on the memory
* model's final field and out-of-thin-air guarantees.
*/
private transient HoldCounter cachedHoldCounter;
最后一個(gè)成功獲取讀鎖線程的獲取計(jì)數(shù)。對(duì)于常見的釋放鎖的線程就是最近剛剛獲取鎖的線程這種情況,這種方式可以節(jié)省ThreadLocal查找時(shí)間。這是非volatile,因?yàn)槠鋬H僅用作啟發(fā)式算法,并且非常適合于使用線程來進(jìn)行緩存。
/**
* A counter for per-thread read hold counts.
* Maintained as a ThreadLocal; cached in cachedHoldCounter
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
從上面3.1.1中可以看出,cachedHoldCounter是作為一個(gè)簡單的緩存使用的,只要當(dāng)前線程等于cachedHoldCounter中記錄的線程id,則使用cachedHoldCounter進(jìn)行操作,而不會(huì)去從ThreadLocal readHolds進(jìn)行查找。
3.1.3 readHolds 線程局部變量機(jī)制
/**
* ThreadLocal subclass. Easiest to explicitly define for sake
* of deserialization mechanics.
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
/**
* The number of reentrant read locks held by current thread.
* Initialized only in constructor and readObject.
* Removed whenever a thread's read hold count drops to 0.
*/
private transient ThreadLocalHoldCounter readHolds;
初始化:
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
readHolds與cachedHoldCounter簡單緩存機(jī)制配合使用,優(yōu)先使用緩存,如果緩存未命中時(shí),才去readHolds進(jìn)行查找。
4.寫鎖的獲取與釋放
根據(jù)2.3可知,主要看Sync.tryAcquire和Sync.tryRelease。
加鎖Sync.tryAcquire:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
writerShouldBlock()體現(xiàn)公平和非公平的區(qū)別,如果是公平的,要看前面有沒有線程排隊(duì),如果是非公平,總是去嘗試搶占。
解鎖Sync.tryRelease:
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
5.讀鎖的獲取與釋放
根據(jù)2.2可知,主要看Sync.tryAcquireShared和Sync.tryReleaseShared。
加鎖Sync.tryAcquireShared:
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
如果其他線程獲取了寫鎖,則獲取失敗。如果是本線程獲取寫鎖,并沒有返回失敗。
readerShouldBlock()體現(xiàn)了公平和非公平的區(qū)別,公平時(shí)都需要看看前面有沒有排隊(duì)的,非公平要看看第一個(gè)是不是寫鎖。
解鎖Sync.tryReleaseShared:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
6.公平鎖和非公平鎖
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}
/**
* Returns {@code true} if the apparent first queued thread, if one
* exists, is waiting in exclusive mode. If this method returns
* {@code true}, and the current thread is attempting to acquire in
* shared mode (that is, this method is invoked from {@link
* #tryAcquireShared}) then it is guaranteed that the current thread
* is not the first queued thread. Used only as a heuristic in
* ReentrantReadWriteLock.
*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
非公平的寫鎖總是返回false,因?yàn)槭仟?dú)占的非公平的,所以總是可以搶占。
而對(duì)于非公平的讀鎖,因?yàn)槭枪蚕淼姆枪降模砸搓?duì)列中第一個(gè)等待結(jié)點(diǎn)的線程是否是寫線程,如果是則返回true,否則返回false。
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
公平的讀鎖和寫鎖都要排隊(duì)。
7.鎖降級(jí)
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
當(dāng)緩存無效時(shí),需要對(duì)緩存進(jìn)行更新,此時(shí)需要釋放讀鎖獲取寫鎖進(jìn)行更新。更新后,在獲取讀鎖,然后釋放寫鎖。然后就可以使用更新后的緩存數(shù)據(jù)。
鎖降級(jí)是否必要?
是必要的。如果線程不獲取讀鎖而是直接釋放寫鎖,如果此時(shí)其他線程更改了數(shù)據(jù),那么當(dāng)前線程將丟失剛剛更改的數(shù)據(jù)。
鎖降級(jí)是怎么實(shí)現(xiàn)的?
tryAcquireShared中獲取讀鎖時(shí),只有當(dāng)其他線程獲取了寫鎖才獲取失敗,而如果是本線程獲取了寫鎖,還可以繼續(xù)獲取讀鎖。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
不支持鎖升級(jí),如果讀鎖已被多個(gè)線程獲取,其中任意線程成功獲取了寫鎖并更新數(shù)據(jù),則該更新對(duì)其他獲取到讀鎖的線程是不可見的。 內(nèi)存可見性是通過volatile state保證的。