前言
ReentrantLock這個類,相信大家多多少少在項目里都會去用到這個類,但我相信大部分人都沒去研究過源碼。我在這里把我學習這個類的一些經驗和心得分享出來,希望對大家有所幫助,水平有限,文章中有錯誤的地方也請不吝指正,共同進步。
學習基礎
這個類依舊是根據(jù)aqs框架去實現(xiàn)的,如果不知道什么是aqs的,可以去看看我寫的aqs的學習筆記aqs獨占鎖和aqs共享鎖的源碼,aqs是基礎,關于aqs的部分,本文將不再贅述。
引導
關于ReentrantLock的源碼解析,百度上沒有一萬個人寫,也至少有八千個人寫,大家隨便百度一篇原創(chuàng),我相信大家都會有很大收獲。所以這篇文章,我想從產品設計的角度去還原Doug Lea是如何開發(fā)出來ReentrantLock這個類的。還原僅靠猜測,不對的地方還原指正。
功能規(guī)劃
我們假設自己就是Doug Lea,當前jdk是1.4版本。我們需要在源碼里提供一個非常方便的加鎖的工具類,名字已經想好了,就叫ReentrantLock,目的是開發(fā)者在使用的時候可以很方便的進行加鎖,解鎖。而此時java中只有一個synchronized關鍵字。既然我們要開發(fā)一個新的工具類,我們肯定是希望比synchronized更好用,覆蓋的場景更廣泛。所以我們首先分析synchronized的缺點(1.4版本的缺點):
- synchronized加鎖的代價太大。
- synchronized不支持操作部分線程,釋放鎖的時候等同于喚醒全部。
- synchronized毫無公平性可言。
- 鎖的粒度太粗,一個實例的多個synchronized方法不能同時被多個線程訪問。
- synchronized在阻塞的過程中不可被中斷。
- synchronized超時時會一直鎖住,甚至會造成死鎖。
設計
第一點:
早期synchronized獲取不到鎖就會被阻塞,而阻塞線程需要cpu從用戶態(tài)轉換到內核態(tài),這是一個很消耗資源的操作,所以我們需要開發(fā)一個api級別的框架,這個框架就是aqs,aqs利用cas自旋和阻塞,多次cas獲取不到資源的時候再去阻塞,被喚醒的線程再使用cas去獲取資源,不是單純的cas自旋或者阻塞,這樣就同時兼顧了性能和效率。這樣,我們開發(fā)了一個aqs框架作為ReentrantLock的基礎,來解決synchronized加鎖代價大的問題。同時也提供了一個可擴展的aqs框架。
(synchronized以前確實是很笨重, 但是在各個版本優(yōu)化之后,現(xiàn)在的性能和ReentrantLock是相差無幾的,官方建議是如果synchronized能實現(xiàn)你的需求,那么盡量使用synchronized,具體的優(yōu)化大家可以百度一下。)
第二點:
我們想自定義規(guī)則讓部分線程去爭搶資源,讓部分線程不去爭搶資源。我們需要使用到aqs的node節(jié)點鏈表。但是我們知道,node隊列是輪詢喚醒全部,沒有對線程做區(qū)分,我們只想要操作部分線程。于是我們想到一種方法:
我們根據(jù)我們的定義,把線程歸類,然后把歸好類的線程放到一個個單獨的隊列里,我們需要操作哪部分線程, 就把這部分線程所在的隊列的所有線程,放到aqs隊列里,剩下的事情我們交給aqs,這樣我們就做到了操作部分線程,而不影響其他線程。
第三點
我們需要設計一個公平鎖,但是非公平鎖也肯定要有,因為非公平鎖性能更好,而且大部分人可能并不關心公平性。所以我們需要在代碼里對公平和非公平分開實現(xiàn)。
第四點
synchronized鎖粒度太粗,這跟synchronized的實現(xiàn)機制有關,因為synchronized是jvm去實現(xiàn)的,是對對象和類的操控。所以synchronized方法獲取的是對象鎖或者類鎖,當synchronized鎖住實例或者類對象的時候,其他線程再去獲取會被阻塞。而我們設計了ReentrantLock,這樣我們需要幾把鎖,在我們的類里定義幾個成員變量就可以了,各個ReentrantLock對象之間互不影響,這樣這個問題也就不解自解了。
第五點
synchronized在阻塞的過程中是忽略中斷的,只有在獲取到鎖的時候才會去響應中斷。為了解決這個問題我們需要在ReentrantLock設計一個可以檢測中斷的lock方法, 我們暫且命名為lockInterruptibly()。aqs中分為cas和阻塞喚醒兩個過程,我們可以在自旋cas中加入檢測中斷的邏輯,這樣我們在獲取鎖的部分過程中就做到了檢測中斷。
第六點
synchronized代碼塊中,如果碰到一段代碼偶爾需要執(zhí)行1分鐘或者兩分鐘,那其他的線程獲取鎖等待的時間就太長了,那顯然不是我們能接受的,所以我們在ReentrantLock設計一個根據(jù)我們設定時間去獲取鎖的操作,超過我們期望的時間就不再去獲取鎖, 直接返回結果。
設計工作已經做完了,我們接下來就是把我們的需求翻譯成代碼。
實現(xiàn)公平非公平(copy)
公平鎖和非公平鎖,區(qū)別只有在tryAcquire()方法上,所有會有很多可以共用的代碼。為了代碼美觀,我們先寫一個公平鎖和非公平鎖都可以繼承的基類,這個類就是Sync,Sync的功能依賴aqs,所以我們要繼承aqs。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
// 非公平鎖獲取資源
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// 釋放資源
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 判斷當前占用線程是否是當前線程
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
// Methods relayed from outer class
// 獲取當前占用線程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 判斷當前線程占用了多少資源
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 判斷是否已加鎖
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
然后我們定義一個公平鎖類和一個非公平鎖類,繼承這個Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
這樣,公平鎖和非公平鎖就實現(xiàn)了,他們倆唯一的區(qū)別就是公平鎖在獲取資源的時候,會調用hasQueuedPredecessors()方法判斷一下是否存在等待隊列,如果有就去乖乖排隊,這樣就保證了公平性,先來后到。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
我們用構造方法區(qū)控制使用者想要什么類型的鎖,至此,公平和非公平我們已經實現(xiàn)。
實現(xiàn)操作部分線程
我們已經有了思路,定義一個個隊列來分別存放線程,那么我們可以把這一個個線程定義為對象的一個屬性,所以我們定義一個接口,來操作這些線程。這個類就是Condition
public interface Condition {
void await() throws InterruptedException;
void awaitUninterruptibly();
long awaitNanos(long nanosTimeout) throws InterruptedException;
boolean await(long time, TimeUnit unit) throws InterruptedException;
boolean awaitUntil(Date deadline) throws InterruptedException;
void signal();
void signalAll();
}
同樣,定義一個類來保存這個隊列。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
我們只需要調用newCondition()方法就可以獲得一個保存一類線程的對象。ConditionObject對象中還有很多方法,我想單獨寫一篇文章來說明。
至此,操作部分線程的功能我們也實現(xiàn)了,這其實就是ReentrantLock的Condition場景的應用。
實現(xiàn)可中斷
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
// 獲取鎖之前校驗一下線程的中斷狀態(tài)
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 自旋里這個if判斷至少會走兩邊,每走一遍都去判斷一下線程的中斷狀態(tài)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
中斷操作已經在代碼里注釋說明。
獲取鎖設定超時時間
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
代碼很簡單,就是獲取鎖和每次自旋cas過程前,判斷一下是否超時,如果超時,直接返回false
總結
至此,ReentrantLock的大部分功能我們已經實現(xiàn)了,當你去翻閱源碼的時候。你會發(fā)現(xiàn),ReentrantLock里也就這么點東西,剩下的方法無非是開發(fā)者在開發(fā)時封裝了一些公用的方法。大家對比一下我們平時開發(fā)的過程,流程是不是非常的相似:
- 確定需求
- 確定需求實現(xiàn)的方式
- 書寫代碼
- 對代碼重構,封裝公共方法
由此可見,一個類,我們只要知道了設計這個類的初衷是為了解決什么問題,然后我們再去看源碼里作者是如何解決這個問題的,用什么方式去解決這個問題。
再深入一點我們可以研究一下,用這種方式去解決這個問題,但是代碼為什么要這么寫,這么寫有什么好處?如果是我們來實現(xiàn)這個功能,我們的代碼跟作者的代碼孰優(yōu)孰劣?一番比較下來,我們可以學習源碼里對方法的優(yōu)雅封裝,對變量的合理使用以及一些對異常情況的處理方式,源碼里真的是行行都是精髓。
寫后感
自從開始寫博客到現(xiàn)在,我發(fā)現(xiàn)我對于一件事情的梳理更快速和規(guī)范了,我覺得這有利于我思考,這得益于我每寫一篇博客做的總結,這也是一種我認為的提升自己的方式。
比如這篇文章所講的ReentrantLock和synchronized,在寫這篇文章之前,如果你問我他們兩個有什么區(qū)別,我的回答就是不知道,因為我不能條理清晰的去告訴你這兩個的區(qū)別和優(yōu)劣。也就是說原理我懂一點,但是我不知道如何組織語言。現(xiàn)在如果你們再問我,那么我可以很自信的跟你說個一二三來。這篇文章中有我以前就懂的知識,也有我在寫的時候一知半解而去學習的知識。我把這些知識用我的語言梳理脈絡去講給你們聽,我自身也在一個高度學習的狀態(tài)。贈人玫瑰,手有余香。如果大家到了一個學習的瓶頸期,不妨也試試寫博客這種方式。