
特性
- 顯示鎖,自旋鎖,可重入鎖
- aqs隊(duì)列鎖的實(shí)現(xiàn)
- 支持多條件喚醒
- 支持打斷
- 支持公平,非公平鎖
- 可嘗試加鎖
使用方式
ReentrantLock lock = new ReentrantLock();
try {
// 加鎖
lock.lock();
// do something
} finally {
// 在finally中解鎖,避免出現(xiàn)異常導(dǎo)致 沒(méi)釋放鎖而死鎖
lock.unlock();
}
源碼
1. lock() 公平鎖
final void lock() {
acquire(1);
}
AbstractQueuedSynchronizer. acquire(1)
public final void acquire(int arg) {
// !tryAcquire(arg) 嘗試加鎖
// acquireQueued(addWaiter(Node.EXCLUSIVE) 加入隊(duì)列
// selfInterrupt 自我打斷
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(int acquires)
protected final boolean tryAcquire(int acquires) {
// 獲取當(dāng)前線程
final Thread current = Thread.currentThread();
// 判斷當(dāng)前鎖的狀態(tài), 0 代表沒(méi)有人持有
int c = getState();
if (c == 0) {
// 判斷隊(duì)列中是否有線程在等待,如果沒(méi)有則cas改變鎖的狀態(tài)
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// cas成功,設(shè)置當(dāng)前線程為持有鎖的線程
setExclusiveOwnerThread(current);
return true;
}
}
// 有線程占有鎖,判斷持有線程是否有是自身,是的話重入
else if (current == getExclusiveOwnerThread()) {
// 鎖狀態(tài) + 1
int nextc = c + acquires;
// 鎖狀態(tài) < 0 ? 加鎖次數(shù)太多,導(dǎo)致整形溢出?
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 設(shè)置鎖的狀態(tài)
setState(nextc);
return true;
}
// 搶鎖失敗
return false;
}
hasQueuedPredecessors()
判斷隊(duì)列中是否有線程在排隊(duì)等鎖
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 頭結(jié)點(diǎn)一般都是 空結(jié)點(diǎn)
// 頭結(jié)點(diǎn) != 尾結(jié)點(diǎn)
// 頭結(jié)點(diǎn)下的next為空 或者 頭結(jié)點(diǎn)的后一個(gè)結(jié)點(diǎn)的線程不是當(dāng)前線程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
acquireQueued(final Node node, int arg) : 入隊(duì)睡眠
如果當(dāng)前鎖被其他線程持有(非自身重入), 或者隊(duì)列中有線程在排隊(duì),則加鎖失敗,需要?jiǎng)?chuàng)建Node節(jié)點(diǎn),入隊(duì)(如果隊(duì)列還沒(méi)有初始化則會(huì)初始化隊(duì)列),并且入隊(duì)之后,會(huì)判斷自己Node的前驅(qū)節(jié)點(diǎn)是不是head,是的話,會(huì)自旋拿一次鎖。
自旋拿鎖成功會(huì)把自己做為新的頭節(jié)點(diǎn),空Node
addWaiter(Node mode) : 創(chuàng)建Node節(jié)點(diǎn)
// 創(chuàng)建線程為當(dāng)前線程的Node對(duì)象
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 獲取隊(duì)列中的尾節(jié)點(diǎn)
Node pred = tail;
// 如果尾節(jié)點(diǎn)不為空
if (pred != null) {
// 將新創(chuàng)建的Node節(jié)點(diǎn)的前節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)
node.prev = pred;
// 將新創(chuàng)建的Node節(jié)點(diǎn)設(shè)置為新的尾節(jié)點(diǎn)
if (compareAndSetTail(pred, node)) {
// 之前的尾節(jié)點(diǎn)的后驅(qū)節(jié)點(diǎn)設(shè)置為 新創(chuàng)建的Node對(duì)象
pred.next = node;
// 返回
return node;
}
}
// 如果尾節(jié)點(diǎn)為空,初始化隊(duì)列
enq(node);
return node;
enq(Node mode) : 入隊(duì)
如果隊(duì)列還沒(méi)有初始化,就會(huì)新建隊(duì)列,隊(duì)列的頭結(jié)點(diǎn)和尾節(jié)點(diǎn)都為空的Node
將當(dāng)前線程的Node節(jié)點(diǎn)接在之前的尾節(jié)點(diǎn)的后面,并設(shè)置成新的尾節(jié)點(diǎn)
private Node enq(final Node node) {
for (;;) {
// 尾節(jié)點(diǎn)
Node t = tail;
// 第一次進(jìn)來(lái),如果為空,說(shuō)明隊(duì)列還沒(méi)有初始化
// 第二次進(jìn)來(lái)就不為空了
if (t == null) { // Must initialize
// 第一次進(jìn)來(lái),創(chuàng)建空的Node,將頭節(jié)點(diǎn)和尾節(jié)點(diǎn)都設(shè)置成這個(gè)空的Node
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 不為空走這里,傳進(jìn)來(lái)的Node的前驅(qū)設(shè)置成之前的尾節(jié)點(diǎn)
node.prev = t;
// 傳進(jìn)來(lái)的Node成為新的尾節(jié)點(diǎn),
if (compareAndSetTail(t, node)) {
// 之前的尾節(jié)點(diǎn)的后驅(qū)節(jié)點(diǎn)設(shè)置為傳進(jìn)來(lái)的Node
t.next = node;
return t;
}
}
}
}
acquireQueued :自旋,失敗后睡眠
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 拿前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
// 判斷是不是頭節(jié)點(diǎn),是的自旋一次
if (p == head && tryAcquire(arg)) {
// 自己會(huì)變成空節(jié)點(diǎn),并且成為頭節(jié)點(diǎn)
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 設(shè)置前驅(qū)節(jié)點(diǎn)為需喚醒狀態(tài),第二次循環(huán)的時(shí)候,睡眠
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire(Node pred, Node node)
Node的狀態(tài)
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
// 被取消
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
// 釋放鎖的時(shí)候需要叫醒自身后面的節(jié)點(diǎn)
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
// 條件喚醒
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
更改前驅(qū)節(jié)點(diǎn)的狀態(tài) :
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 判斷前驅(qū)Node,是不是喚醒狀態(tài),是的話直接返回true,讓當(dāng)前Node去睡眠
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 如果是> 0,說(shuō)明是取消狀態(tài)
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 一直往前查找,知道找到非取消的狀態(tài)的節(jié)點(diǎn),作為當(dāng)前Node的前驅(qū)
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 如果是0, cas將前驅(qū)Node的狀態(tài)改為Node.SIGNAL, 外面第二次循環(huán),直接返回true,讓當(dāng)前節(jié)點(diǎn)睡眠
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
parkAndCheckInterrupt()
阻塞當(dāng)前線程,直到被打斷,打斷后會(huì)清空打斷標(biāo)記,在下一次循環(huán)中繼續(xù)進(jìn)來(lái)阻塞。造成一種無(wú)法打斷的假象。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
加鎖流程
-
當(dāng)鎖為無(wú)鎖狀態(tài)時(shí) :
image -
線程t1來(lái)加鎖 : 持有鎖的線程改成t1,鎖的狀態(tài)改為1:被占有狀態(tài)
image -
線程t1沒(méi)有釋放鎖,線程t2來(lái)?yè)屾i, t2會(huì)創(chuàng)建隊(duì)列,并且排隊(duì)
image
- 線程t1沒(méi)有釋放鎖,線程t2入隊(duì),線程t3來(lái)?yè)屾i

lock() : 非公平鎖
非公平鎖的加鎖流程與公平鎖不同的是,非公平鎖在加鎖的時(shí)候,不會(huì)判斷隊(duì)列中是否有線程在排隊(duì),直接cas加鎖
final void lock() {
// cas改鎖的狀態(tài)
if (compareAndSetState(0, 1))
// 修改成功直接搶占鎖,當(dāng)鎖的線程改為當(dāng)前線程
setExclusiveOwnerThread(Thread.currentThread());
else
// 嘗試加鎖
acquire(1);
}
tryAcquire() & nonfairTryAcquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平鎖這里會(huì)判斷隊(duì)列是否有人在排序,非公平鎖直接cas上手
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
再后面搶不到鎖,進(jìn)入隊(duì)列之后和公平鎖的邏輯是一模一樣的。
公平和非公平之前在于 入隊(duì)之前嘗試加鎖,看不看隊(duì)列中有無(wú)線程排隊(duì),如果加鎖失敗,入隊(duì)了,那么就再?zèng)]有公平和非公平之分
2. unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 解鎖, state一直減到0,鎖才會(huì)被完全釋放
if (tryRelease(arg)) {
Node h = head;
// 當(dāng)鎖完全釋放后
if (h != null && h.waitStatus != 0)
// 叫醒后面排隊(duì)的線程
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
釋放鎖,state - 1,減到0,鎖才會(huì)被完全釋放,當(dāng)前持有鎖線程設(shè)置為null
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 狀態(tài)為0,鎖才完全釋放, 重入時(shí),state會(huì)被累加到 > 1,需要解鎖多次
if (c == 0) {
// 完全釋放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor
:喚醒隊(duì)列自己后面最近的一個(gè)非取消狀態(tài)的線程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 下一個(gè)節(jié)點(diǎn)
Node s = node.next;
// 當(dāng)前下一個(gè)節(jié)點(diǎn)的是取消狀態(tài)的
if (s == null || s.waitStatus > 0) {
s = null;
// 則從尾節(jié)點(diǎn)向前遍歷,找到自己后面最近的非取消狀態(tài)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 進(jìn)行喚醒
if (s != null)
LockSupport.unpark(s.thread);
}
解鎖流程


3.lockInterruptibly()
和lock方法不同的是,該加鎖方法會(huì)響應(yīng)打斷異常
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果再睡眠過(guò)程中被打斷,會(huì)拋出異常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
4.Condition 條件喚醒
await() : 等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 創(chuàng)建condition等待隊(duì)列
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 睡眠
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter()
如果尾節(jié)點(diǎn)為空,新建一個(gè)Node節(jié)點(diǎn)(持有當(dāng)前線程),state為-2, 頭節(jié)點(diǎn)和尾節(jié)點(diǎn)都等于新建的Node節(jié)點(diǎn)
如果尾節(jié)點(diǎn)不為空,新建一個(gè)Node節(jié)點(diǎn)(持有當(dāng)前線程),接在尾節(jié)點(diǎn)的后面,并作為新的尾節(jié)點(diǎn)
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal() : 喚醒
喚醒條件隊(duì)列的第一個(gè)Node
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 頭節(jié)點(diǎn)
Node first = firstWaiter;
if (first != null)
// 喚醒
doSignal(first);
}
doSignal
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
transferForSignal(Node node)
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 將當(dāng)前節(jié)點(diǎn)從條件隊(duì)列加入到 等待隊(duì)列中
// 返回是上一個(gè)節(jié)點(diǎn)
Node p = enq(node);
int ws = p.waitStatus;
// 然后將上一個(gè)節(jié)點(diǎn)的狀態(tài)改為SIGNAL -1
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 如果cas失敗,unpark
LockSupport.unpark(node.thread);
return true;
}
doSignalAll : 喚醒所有
將條件隊(duì)列中所有的Node按順序 從頭到尾加到等待隊(duì)列中
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 無(wú)限循環(huán),直到條件隊(duì)列中沒(méi)有元素
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
// 從頭到尾按順序,加到等待隊(duì)列中
transferForSignal(first);
first = next;
} while (first != null);
}


