1、ReentrantLock 可重入鎖
2、ReadWriteLock 讀寫鎖或共享鎖
3、StampedLock 獨(dú)占鎖或排它鎖
具體源碼講解
1.1 ReentrantLock 四種加鎖方式
lock 沒拿到鎖無限等待
trylock 沒拿到鎖立即返回
trylock(long timeout, TimeUnit unit) 獲取鎖超時即失敗
lockInterruptibly 沒拿到鎖無限等待,但能被中斷
以上都支持公平鎖,可重入
a、以非公平鎖加鎖實現(xiàn)為例解讀,其他方式只是個別不同
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加鎖
final void lock() {
// CAS獲取鎖
if (compareAndSetState(0, 1))
// CAS獲取成功則記錄當(dāng)前線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 沒有成功則再進(jìn)行嘗試,不成功則入隊
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
public final void acquire(int arg) {
// 進(jìn)行重試加鎖 或 進(jìn)行重入
if (!tryAcquire(arg) &&
// 重試或重入失敗,則進(jìn)行入隊
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 重試獲取鎖
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判斷是否可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 可重入即增加重入次數(shù)
setState(nextc);
return true;
}
return false;
}
// 入隊
private Node addWaiter(Node mode) {
// 為當(dāng)前加鎖線程創(chuàng)建節(jié)點
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果已有尾節(jié)點,則CAS嘗試入隊
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// CAS入隊失敗 或 隊列未初始化,則進(jìn)入
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 隊列未初始化,則進(jìn)行創(chuàng)建空節(jié)點,進(jìn)行初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// CAS 嘗試入隊,不成功則重試
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 判斷當(dāng)前線程是否是隊列頭結(jié)點(即是空節(jié)點的下一個節(jié)點)
// 如果是頭結(jié)點,tryAcquire一定會拿到鎖
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 這里會嘗試2次進(jìn)入阻塞狀態(tài)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前驅(qū)節(jié)點是等待信號狀態(tài),則返回true進(jìn)入阻塞
if (ws == Node.SIGNAL)
return true;
// 如果前驅(qū)節(jié)點大于0,則是取消狀態(tài),
if (ws > 0) {
// 向前移除已取消的節(jié)點,直到頭節(jié)點
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 不是前兩種情況,則進(jìn)入等待信號狀態(tài)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
b、trylock
// 只是判斷了是否可以獲取鎖,或者是否可重入,否則直接返回失敗
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
c、tryLock(long timeout, TimeUnit unit)
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 規(guī)定時間內(nèi)未獲取到鎖,則返回失敗
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 這里規(guī)定了鎖等待超時時間要大于1s,不然不會進(jìn)行阻塞,
// 這里考慮兩點,1、時間太短,進(jìn)行重試即可,2、調(diào)用阻塞的性能損耗較高,但是CAS重試也有性能損耗
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
d、lockInterruptibly
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果改線程已被中斷,則會拋出異常,即獲取不到鎖,且會拋錯
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
e、公平鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 獲取鎖之前會判斷是否當(dāng)前線程是否是頭結(jié)點,不然不會嘗試獲取鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
2.1 ReentrantReadWriteLock
讀寫鎖有這幾種性質(zhì),讀讀共享,讀寫互斥,寫寫互斥
下面分析下讀鎖,寫鎖加鎖過程
a、讀鎖加鎖
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 低16位表示讀鎖次數(shù),高16位表示寫鎖次數(shù)
int c = getState();
// 如果存在寫鎖且占用線程不是當(dāng)前線程,則返回失敗
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 判斷當(dāng)前讀鎖是否需要阻塞,因為當(dāng)前線程存在已經(jīng)加了寫鎖未釋放情況
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 讀鎖次數(shù)為0,表示第一次加讀鎖,記錄第一讀鎖信息
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果是第一次讀鎖,則++
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 判斷當(dāng)前線程是否已存在加讀鎖,存在則++
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// CAS重試加讀鎖
return fullTryAcquireShared(current);
}
// 這個方法就是CAS重試,邏輯和tryAcquireShared沒什么區(qū)別
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}