分享Java鎖機制實現(xiàn)原理,細節(jié)涉及volatile修飾符、CAS原子操作、park阻塞線程與unpark喚醒、雙向鏈表、鎖的公平性與非公平性、獨占鎖和共享鎖、線程等待await、線程中斷interrupt。Java ReentrantLock鎖機制源碼篇
一、看下面的場景
外賣某商家上線了一個【10元一只雞】拉新活動,外賣APP定時推送活動日營業(yè)額。
假如模擬1000個用戶同時進行10元購,統(tǒng)計商家日營業(yè)額,模擬的腳手架代碼實現(xiàn)如下:
public static void main(String[] arg) throws InterruptedException{
//外賣商家
BaseMerchant merchantRunnable = new MerchantRunnableUnsafe();
//模擬1000個用戶
for (int i = 0; i < 1000; i++) {
Thread client = new Thread(merchantRunnable);
client.setName("Client-" + i);
client.start();
}
Thread.sleep(2000);
System.out.println("今天的營業(yè)額是 " + merchantRunnable.getTodayRMB());
}
//外賣商家
public static class MerchantRunnableUnsafe extends BaseMerchant {
@Override
public void run() {
try {
Thread.sleep(1);//耗時操作
} catch (InterruptedException e) {
e.printStackTrace();
}
// todayRMB = todayRMB + 10; 非原子操作
long temp = todayRMB;
long update = temp + 10;
todayRMB = update;
System.out.println(Thread.currentThread().getName() + " todayRMB:" + todayRMB);
}
}
public static abstract class BaseMerchant implements Runnable {
protected volatile long todayRMB = 0;
public long getTodayRMB() {
//10元購商家日營業(yè)額統(tǒng)計
return todayRMB;
}
}
MerchantRunnableUnsafe執(zhí)行結果:今天的營業(yè)額是 9810
線程非安全問題
大家能看出 MerchantRunnableUnsafe實現(xiàn)方式,存在線程安全問題,如下圖,打印了兩次80。Client-6把80賦值給update后時間片到,壓棧后CPU執(zhí)行權交給Client-7,Client-7順利把todayRMB更新為80后,CPU執(zhí)行權讓回給Client-6,Client-6出棧后把值為80的update更新給todayRMB,就出現(xiàn)了Client-6和Client-7分別累加10,只生效了一次的線程安全問題。

使用synchronized關鍵字解決線程非安全問題
public static class MerchantRunnableSync extends BaseMerchant {
@Override
public void run() {
synchronized (MerchantRunnableSync.class) {
try {
Thread.sleep(1);//耗時操作
} catch (InterruptedException e) {
e.printStackTrace();
}
long temp = todayRMB;
long update = temp + 10;
todayRMB = update;
System.out.println(Thread.currentThread().getName() + " todayRMB:" + todayRMB);
}
}
}
MerchantRunnableSync執(zhí)行結果:今天的營業(yè)額是 10000
使用ReentrantLock解決線程非安全問題
private static ReentrantLock mLock = new ReentrantLock();
public static class MerchantRunnableLock extends BaseMerchant {
@Override
public void run() {
mLock.lock();
try {
Thread.sleep(10);//耗時操作
} catch (InterruptedException e) {
e.printStackTrace();
}
long temp = todayRMB;
long update = temp + 10;
todayRMB = update;
System.out.println(Thread.currentThread().getName() + " todayRMB:" + todayRMB);
mLock.unlock();
}
}
MerchantRunnableLock執(zhí)行結果:今天的營業(yè)額是 10000
在1.5之前synchronized效率比ReentrantLock低,并且synchronized屬于系統(tǒng)關鍵字不方便查看實現(xiàn)源碼,所以我們研究ReentrantLock實現(xiàn)原理。
ReentrantLock.lock()和 ReentrantLock.unlock() 對應 synchronized 代碼塊;
ReentrantLock.newCondition.await() 對應 object.wait();
ReentrantLock.newCondition.signal() 對應 object.notify();
要完全搞懂鎖,涉及大量概念,再對概念的源碼實現(xiàn),就好比你第一次吃大閘蟹,不告訴你,黃色的是蟹黃,說不定就被你扔了,因為它黃黃的黏黏的像那個啥。。。
所以分兩篇介紹鎖,概念篇、源碼篇。
二、概念篇
ReentrantLock內(nèi)部是如何實現(xiàn)的?我們先一起來猜想下ReentrantLock內(nèi)部實現(xiàn)原理。

看上圖,三個線程A\B\C,同時進入run()方法,若要確保線程安全,必須保證多線程串行的方式執(zhí)行運行態(tài),也就是,A\B\C同時執(zhí)行Lock時,只能有一個通過,假如是A通過,只有A執(zhí)行完unLock后,B\C其中一個才能再通過Lock,進入運行態(tài),重復以上步驟運行下去,直到三個線程都運行完畢。
上述過程有幾個問題
- Lock環(huán)節(jié),多線程時保證有且僅有一個線程通過Lock環(huán)節(jié)(獲得鎖資源),這個應該怎么實現(xiàn)?有同學回答我:用synchronized。MyGod??!ReentrantLock出現(xiàn)原因就是要取代synchronized
- 線程A通過了Lock環(huán)節(jié)執(zhí)行運行態(tài)代碼時,獲鎖失敗后的B和C線程要去做什么?
- unLock后線程A釋放鎖資源,是如何找到、通知線程B或C去競爭鎖資源的?
- 鎖是個什么類型?int?boolean?object?
下面我們一個問題一個問題的來解決。
問題1 CAS原子操作——多線程時如何保證有且僅有一個線程通過Lock環(huán)節(jié)?
用一個全局的boolean型變量?
public static class MerchantRunnableBoolean extends BaseMerchant {
@Override
public void run() {
lock();
try {
Thread.sleep(1);//放棄CPU執(zhí)行權,有可能再也拿不到了
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
long temp = todayRMB;
long update = temp + 10;
todayRMB = update;
System.out.println(Thread.currentThread().getName() + " todayRMB:" + todayRMB);
mBooleanLock = false;
}
private void lock() {
while (true) {
if (cas(false, true)) {
break;
}
}
}
}
// true:鎖資源已經(jīng)被占用 false:鎖資源未被占用
private static boolean mBooleanLock = false;
private static boolean cas(boolean expect, boolean update) {
if (mBooleanLock == expect) {
mBooleanLock = update;
return true;
}
return false;
}
MerchantRunnableBoolean:今天的營業(yè)額是 3790
哎,還是存在不安全問題,問題是,無法保證執(zhí)行函數(shù)cas(false, true)時不被其他線程打斷,也就是滿足原子性。
去趟衛(wèi)生間
。。。。
。。。。
回來后發(fā)現(xiàn)鼠標指針出奇的卡,MAC風扇呼呼轉,再一看,run運行臺提示代碼運行沒有結束??!我擦,上面代碼有嚴重毛?。。。。?/p>
不信的話,各位客官你們瞪大眼睛仔細看代碼

用volatile修飾符保證變量的可見性
什么是可見性?可見性:當多線程訪問某一個(同一個)變量時,其中一條線程對此變量作出修改,其他線程可以立刻讀取到最新修改后的變量。 volatile 變量修飾符可保證變量可見性。 支持并發(fā)三要素:可見性、原子性、有序性。
private static volatile boolean mBooleanLock = false;

運行臺,最終結果9800,并且顯示運行已結束。原子性怎么辦?
用CAS保證原子性
boolean型變量替換為int變量,代碼如下
Runnable mRunnable = new Runnable() {
@Override
public void run() {
lock();
doSomething();
unlock();
}
};
private void lock() {
//獲得鎖資源
if (compareAndSetState(0, 1)) {
mRunningThread = Thread.currentThread();
} else {
//當前線程掛起自己,并且不再往后執(zhí)行
}
}
/**
* lockState 0:鎖資源未占用 1:鎖資源被占用
*/
private int lockState = 0;
/**
* @param expect 期望值
* @param update 更新值
* @return
*/
private final boolean compareAndSetState(int expect, int update) {
if (expect == lockState) {
lockState = update;
return true;
}
return false;
}
現(xiàn)在我們把函數(shù)compareAndSetState()簡稱為 CAS。lockState代表鎖資源。如果能保證CAS操作為原子操作,就可以保證只有一個線程獲得鎖資源,其他線程執(zhí)行else邏輯。
好消息是 CPU底層指令支持CAS原子操作。Java通過Unsafe.java執(zhí)行CAS。
/**
* From Unsafe.java
*
* @param object 變量所在的當前類
* @param valueOffset 變量在內(nèi)存中偏移地址
* 獲取方法:unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
* @param expect 期望變量的當前值
* @param update 滿足期望值時,變量要更新的值
* @return ture:成功
*/
public final native boolean compareAndSwapInt(Object object, int valueOffset, int expect, int update);
為什么使用偏移地址而不直接用變量值?是因為直接用變量值,不能保證可見性。
到此,我們借用CPU提供的CAS原子操作命令,解決了如何保證只有一個線程獲得鎖資源的問題。
CAS屬于樂觀鎖、非阻塞鎖,樂觀鎖:先執(zhí)行如果遇到錯誤了再匯報,非阻塞:在不阻塞線程的情況下,借助原子操作保證線程安全。相比于阻塞鎖,沒有切換線程的開銷,所以更高效。CAS是AtomicInteger的基礎,AtomicInteger是線程池的基礎,是ReentrantLock的基礎,ReentrantLock是阻塞隊列BlockingQueue的基礎??梢奀AS的重要性。

使用CAS解決線程非安全問題
有兩個思路,思路一:todayRMB自身作為鎖資源,思路二:新增一個變量作為鎖資源,思路一代碼如下:
//10元購 商家日營業(yè)額統(tǒng)計
public static class MerchantRunnableCAS extends BaseMerchant {
@Override
public void run() {
try {
Thread.sleep(10);//耗時操作
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
int reCount = 0;
for (; ; ) {
reCount++;
System.out.println(Thread.currentThread().getName());
long temp = todayRMB;
long update = temp + 10;
if (compareAndSetState(temp, update)) {
System.out.println(Thread.currentThread().getName() + " reCount:" + reCount);
break;
}
}
}
protected final boolean compareAndSetState(long expect, long update) {
return U.compareAndSwapLong(this, TODAY_RMB, expect, update);
}
private static final sun.misc.Unsafe U = getUnsafe();
private static final long TODAY_RMB;
static {
try {
TODAY_RMB = U.objectFieldOffset(getDeclaredField(MerchantRunnableCAS.class, "todayRMB"));
} catch (Exception e) {
throw new Error(e);
}
}
public static Field getDeclaredField(Class clazz, String fieldName) {
Field field = null;
for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
try {
field = clazz.getDeclaredField(fieldName);
return field;
} catch (Exception e) {
//這里甚么都不要做!并且這里的異常必須這樣寫,不能拋出去。
//如果這里的異常打印或者往外拋,則就不會執(zhí)行clazz = clazz.getSuperclass(),最后就不會進入到父類中了
}
}
return null;
}
static Unsafe getUnsafe() {
try {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (Unsafe) f.get(null);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
MerchantRunnableCAS執(zhí)行結果:今天的營業(yè)額是 10000

從運行過程截圖看出,在CAS原子鎖下,線程Client-451 執(zhí)行了兩次compareAndSetState(),第一次執(zhí)行失敗,第二次執(zhí)行成功,消除了MerchantRunnableUnsafe類中運行結果被覆蓋的非安全問題。
問題2 Unsafe.park、unpark ——如何管理獲鎖失敗后的B和C線程?
上面用CAS解決的多線程問題,會發(fā)現(xiàn)獲鎖失敗后,一般通過死循環(huán)的方式重新獲取鎖,直到成功為止,這種方案叫自旋鎖,對耗時任務加鎖情況下,缺點很明顯,CPU資源耗盡、應用卡死、手機發(fā)燙,還有其他做法么?
當然有,ReentrantLock的做法是,第一次獲鎖失敗后,將當前線程插入雙向鏈表的末尾,然后阻塞當前線程,有鎖資源時再被喚醒,解決了自旋鎖的缺點。這種方案叫阻塞鎖。
如何阻塞、喚醒當前線程呢?系統(tǒng)提供的API如下:
阻塞當前線程
LockSupport.park(Thread.currentThread());
喚醒當前線程
LockSupport.unpark(Thread.currentThread());
如下源碼,Unsafe.class好眼熟,上文實現(xiàn)原子性核心函數(shù)用的也是Unsafe.class內(nèi)的方法,so,大家自己看下Unsafe源碼。
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
public static void unpark(Thread thread) {
if (thread != null)
U.unpark(thread);
}
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
問題3 公平鎖和非公平鎖
上文提到,有鎖資源時,雙向鏈表中阻塞線程會被再次喚醒,假如有鎖資源時,恰好又來了個新線程D,這個時候,新線程D和阻塞隊列中線程B,要競爭鎖資源,應該給誰呢?這個地方存在鎖的公平性的問題。
公平鎖:鎖給線程B,新線程D加入FIFO鏈表對尾并阻塞。先來先執(zhí)行。
非公平鎖:鎖給新線程D,線程B繼續(xù)阻塞。先來不一定先執(zhí)行。
問題4 鎖的可重入性 獨占鎖和共享鎖
回頭看MerchantRunnableBoolean類中 lock方法。如果一個線程多次執(zhí)行l(wèi)ock會發(fā)生死鎖的后果。鎖的可重入性是指,同一個線程可以執(zhí)行多次lock。
用途:考慮到代碼的復用性,重構如下,funcA和funcB存在多線程訪問,要用到mLock的可重入性,否則funcB()存在線程安全。
private void funcA() {
mLock.lock();
doSomething();
funcB();
mLock.unlock();
}
private void funcB() {
mLock.lock();
doSomething();
mLock.unlock();
}
優(yōu)化讀、寫頻繁的DB模塊,首要任務是獨占鎖替換為共享鎖。
讀操作加讀鎖,寫操作加寫鎖,共享鎖有N個讀鎖資源,一個寫鎖資源,并且讀和寫互斥,共享鎖允許多線程同時執(zhí)行讀操作,減少了讀操作時阻塞、喚醒的次數(shù),而獨占鎖不區(qū)分讀、寫,只有一個鎖。
可重入性鎖和共享鎖里的讀鎖,有一個相似之處,都是可以多次。可重入鎖可以讓一個線程多次執(zhí)行l(wèi)ock操作,讀鎖可以讓多個線程分別執(zhí)行l(wèi)ock操作,一個是作用于單線程,一個作用于多線程,后面分析源碼時,你會發(fā)現(xiàn),他們的實現(xiàn)原理卻是一樣的。
問題5 鎖是個什么類型?int?boolean?object?
int型。boolean型無法實現(xiàn)鎖的可重入特性。
問題6 執(zhí)行await后,線程狀態(tài)有什么變化?如何被恢復?
await()后,線程A放棄CPU執(zhí)行權,釋放鎖資源,并且進入等待隊列,而不是阻塞隊列,當結束等待條件滿足后,也就是其他線程執(zhí)行signal(),線程A將會從等待隊列 轉移到 阻塞對列,當競爭到鎖資源后,才進入運行態(tài)。
新生-->就緒(Runnable)-->運行(Running)-->遇到wait造成的等待需要喚醒notify,醒了后-->回到就緒(Runnable)-->運行(Running)-->死亡

問題7 執(zhí)行Thread.interrupt()后,線程為什么不停止運行?
因為執(zhí)行Thread.interrupt(),只會引起如下兩個地方發(fā)生變化,不會拋出InterruptedException異常,不會停止當前線程。
Thread內(nèi)Interrupted狀態(tài)位被設置為true。
-
從 LockSupport.park() 阻塞態(tài)退出,進入運行態(tài),執(zhí)行后續(xù)代碼。
private static boolean TestPark() { threadPark = new Thread(new Runnable() { @Override public void run() { mLock.lock(); try { System.out.println("1 before park"); LockSupport.park(Thread.currentThread()); System.out.println("3 after park"); //Thread.interrupted(); System.out.println("4 isInterrupted " + threadPark.isInterrupted()); try { mCondition.await(); } catch (Exception e) { System.out.println("5 發(fā)生中斷 " + e.getMessage()); } } finally { mLock.unlock(); } } }); threadPark.start(); try { Thread.sleep(1000); System.out.println("Thread.sleep(1000)"); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("2 執(zhí)行interrupt()"); threadPark.interrupt(); return true; }
無Thread.interrupted()的執(zhí)行結果
1 before park
Thread.sleep(1000)
2 執(zhí)行interrupt()
3 after park
4 isInterrupted true
5 發(fā)生中斷 null
Process finished with exit code 0
再介紹一個常用函數(shù)Thread.interrupted() ,具有消費Interrupted消息能力,運行結果如下:
有Thread.interrupted()的執(zhí)行結果
1 before park
Thread.sleep(1000)
2 執(zhí)行interrupt()
3 after park
4 isInterrupted false
Process finished with exit code 130 (interrupted by signal 2: SIGINT)
可以看出,Interrupted消息被提前消費后,mCondition.await()不再發(fā)生中斷。
有什么用呢?用于區(qū)分處理不同階段發(fā)生的中斷。比如執(zhí)行await()后,線程進入等待態(tài),當執(zhí)行signal()由等待態(tài)進入阻塞態(tài)。通過Thread.interrupt()可以區(qū)分等待態(tài)還是阻塞態(tài)發(fā)生的中斷,最終做區(qū)分處理,如下:
- 等待態(tài)執(zhí)行 Thread.interrupt(),提前進入阻塞態(tài),當獲得鎖資源后,會拋出InterruptedException異常,由用戶catch自行處理。
- 阻塞態(tài)執(zhí)行 Thread.interrupt(),無反應,會再次進入阻塞態(tài),當獲得鎖資源后,不拋出異常,需要用戶自行判斷處理。
概念匯總
