
使用 `synchronize` 來做同步處理時(shí),鎖的獲取和釋放都是隱式的,實(shí)現(xiàn)的原理是通過編譯后加上不同的機(jī)器指令來實(shí)現(xiàn)。
而 ReentrantLock 就是一個(gè)普通的類,它是基于 AQS(AbstractQueuedSynchronizer)來實(shí)現(xiàn)的。
是一個(gè)重入鎖:一個(gè)線程獲得了鎖之后仍然可以反復(fù)的加鎖,不會(huì)出現(xiàn)自己阻塞自己的情況。
AQS是Java并發(fā)包里實(shí)現(xiàn)鎖、同步的一個(gè)重要的基礎(chǔ)框架。
鎖類型
ReentrantLock 分為公平鎖和非公平鎖,可以通過構(gòu)造方法來指定具體類型:
//默認(rèn)非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
//公平鎖
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
默認(rèn)一般使用非公平鎖,它的效率和吞吐量都比公平鎖高的多(后面會(huì)分析具體原因)。
獲取鎖
通常的使用方式如下:
private ReentrantLock lock = new ReentrantLock();
public void run() {
lock.lock();
try {
//do bussiness
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
公平鎖獲取鎖
首先看下獲取鎖的過程:
public void lock() {
sync.lock();
}
可以看到是使用 sync的方法,而這個(gè)方法是一個(gè)抽象方法,具體是由其子類(FairSync)來實(shí)現(xiàn)的,以下是公平鎖的實(shí)現(xiàn):
final void lock() {
acquire(1);
}
//AbstractQueuedSynchronizer 中的 acquire()
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
第一步是嘗試獲取鎖(tryAcquire(arg)),這個(gè)也是由其子類實(shí)現(xiàn):
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
首先會(huì)判斷 AQS 中的 state 是否等于 0,0 表示目前沒有其他線程獲得鎖,當(dāng)前線程就可以嘗試獲取鎖。
注意:嘗試之前會(huì)利用 hasQueuedPredecessors() 方法來判斷 AQS 的隊(duì)列中中是否有其他線程,如果有則不會(huì)嘗試獲取鎖(這是公平鎖特有的情況)。
如果隊(duì)列中沒有線程就利用 CAS 來將 AQS 中的 state 修改為1,也就是獲取鎖,獲取成功則將當(dāng)前線程置為獲得鎖的獨(dú)占線程(setExclusiveOwnerThread(current))。
如果 state 大于 0 時(shí),說明鎖已經(jīng)被獲取了,則需要判斷獲取鎖的線程是否為當(dāng)前線程(ReentrantLock 支持重入),是則需要將 state + 1,并將值更新。
寫入隊(duì)列
如果 tryAcquire(arg) 獲取鎖失敗,則需要用 addWaiter(Node.EXCLUSIVE) 將當(dāng)前線程寫入隊(duì)列中。
寫入之前需要將當(dāng)前線程包裝為一個(gè) Node 對(duì)象(addWaiter(Node.EXCLUSIVE))。
AQS 中的隊(duì)列是由 Node 節(jié)點(diǎn)組成的雙向鏈表實(shí)現(xiàn)的。
包裝代碼:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
首先判斷隊(duì)列是否為空,不為空時(shí)則將封裝好的 Node 利用 CAS 寫入隊(duì)尾,如果出現(xiàn)并發(fā)寫入失敗就需要調(diào)用 enq(node); 來寫入了。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
這個(gè)處理邏輯就相當(dāng)于自旋加上 CAS 保證一定能寫入隊(duì)列。
掛起等待線程
寫入隊(duì)列之后需要將當(dāng)前線程掛起(利用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)):
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
首先會(huì)根據(jù) node.predecessor() 獲取到上一個(gè)節(jié)點(diǎn)是否為頭節(jié)點(diǎn),如果是則嘗試獲取一次鎖,獲取成功就萬事大吉了。
如果不是頭節(jié)點(diǎn),或者獲取鎖失敗,則會(huì)根據(jù)上一個(gè)節(jié)點(diǎn)的 waitStatus 狀態(tài)來處理(shouldParkAfterFailedAcquire(p, node))。
waitStatus 用于記錄當(dāng)前節(jié)點(diǎn)的狀態(tài),如節(jié)點(diǎn)取消、節(jié)點(diǎn)等待等。
shouldParkAfterFailedAcquire(p, node) 返回當(dāng)前線程是否需要掛起,如果需要?jiǎng)t調(diào)用 parkAndCheckInterrupt():
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
他是利用 LockSupport 的 part 方法來掛起當(dāng)前線程的,直到被喚醒。
非公平鎖獲取鎖
公平鎖與非公平鎖的差異主要在獲取鎖:
公平鎖就相當(dāng)于買票,后來的人需要排到隊(duì)尾依次買票,不能插隊(duì)。
而非公平鎖則沒有這些規(guī)則,是搶占模式,每來一個(gè)人不會(huì)去管隊(duì)列如何,直接嘗試獲取鎖。
非公平鎖:
final void lock() {
//直接嘗試獲取鎖
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
公平鎖:
final void lock() {
acquire(1);
}
還要一個(gè)重要的區(qū)別是在嘗試獲取鎖時(shí)tryAcquire(arg),非公平鎖是不需要判斷隊(duì)列中是否還有其他線程,也是直接嘗試獲取鎖:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//沒有 !hasQueuedPredecessors() 判斷
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
釋放鎖
公平鎖和非公平鎖的釋放流程都是一樣的:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//喚醒被掛起的線程
unparkSuccessor(h);
return true;
}
return false;
}
//嘗試釋放鎖
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
首先會(huì)判斷當(dāng)前線程是否為獲得鎖的線程,由于是重入鎖所以需要將 state 減到 0 才認(rèn)為完全釋放鎖。
釋放之后需要調(diào)用 unparkSuccessor(h) 來喚醒被掛起的線程。
總結(jié)
由于公平鎖需要關(guān)心隊(duì)列的情況,得按照隊(duì)列里的先后順序來獲取鎖(會(huì)造成大量的線程上下文切換),而非公平鎖則沒有這個(gè)限制。
所以也就能解釋非公平鎖的效率會(huì)被公平鎖更高。
號(hào)外
最近在總結(jié)一些 Java 相關(guān)的知識(shí)點(diǎn),感興趣的朋友可以一起維護(hù)。
