
溫馨提示:本文內容較長廢話較多,如有心臟病、精神病史等請酌情查看。
一、概述
本文源碼基于openJDK8u。在閱讀本文前,你需要對并發(fā)有所了解。
在并發(fā)中,為了解決程序中多個進程和線程對資源的搶占問題,在 Java 中引入了鎖的概念。
各種各樣的鎖,對于初碰 Java 并發(fā)的同學來說,面對多達 20 種的鎖,瞬間懵逼,退游戲這把雞勞資不吃了......
其實不要緊張,雖然鎖的種類很多,但是都是根據其特性衍生出來的概念而已,如果你對 Java 鎖不是很清晰,希望這篇文章能夠對你有所幫助。朋友們,如果你不會做飯或者不知道吃什么請關注我麻辣德子感謝您的雙...呸,學好 Java , 拒絕沉迷某音,哈哈哈哈哈~
下面,是一張關于鎖的思維導圖,帶大家有一個總體的認識,配合此圖食用效果更佳哈。

ps:如果看不清楚可以點擊點擊原圖查看哦。
二、synchronized 帶你跑毒
為了方便大家由淺入深(懷疑作者開車但是沒有證據...),我們從大家比較熟悉的 synchronized 說起。對于 Java 使用者來說,synchronized 關鍵字是實現鎖的一種重要方式。
package com.aysaml.demo.test;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* SynchronizedDemo 示例
*
* @author wangning
* @date 2019-11-26
*/
public class SynchronizedDemo {
private static int count = 0;
private static void addCount() {
count++;
}
public static void main(String[] args) {
int loopCount = 1000;
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();
ExecutorService executorService = new ThreadPoolExecutor(10, 1000,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
for (int i = 0; i < loopCount; i++) {
Runnable r = SynchronizedDemo::addCount;
executorService.execute(r);
}
executorService.shutdown();
System.out.println(SynchronizedDemo.count);
}
}
措不及防的代碼粘貼,哈哈哈。
上面是比較經典的線程并發(fā)問題示例。運行這段代碼,得到的結果多種多樣,996、997、998.....
結果并不像我們預期的那樣是1000,遇到多線程資源競爭時我們可能第一反應的就是加 synchronized,簡單粗暴,于是變成下邊這樣:
private static synchronized void addCount() {
count++;
}
在累加方法加上 synchronized,就給這個方法加了鎖,如此,每次執(zhí)行的結果就符合了我們的預期。
Java 內置了一個反編譯工具 javap 可以反編譯字節(jié)碼文件,通過執(zhí)行命令 javap -verbose -p SynchronizedDemo.class 可以看到上述這個類的字節(jié)碼文件。

查找被加鎖的方法 addCount() 發(fā)現,synchronized 修飾方法 時是通過 ACC_SYNCHRONIZED 標記符號指定該方法是一個同步方法,從而執(zhí)行相應的同步調用。
同樣查看字節(jié)碼,可以知道 synchronized 修飾代碼塊 時 通過monitorenter 和 monitorexit 指令來解決同步問題,其中 monitorenter 指令指向同步代碼塊的開始位置,monitorexit 指令指明同步代碼塊的結束位置。
三、各種鎖的解釋,猥瑣發(fā)育撿槍撿子彈
這部分只是簡單說一下各種鎖以及相關概念,讓大家有一個簡單了解,其中相應的在 Java 中的實現會用較長的篇幅做介紹。
偏向鎖、輕量級鎖、重量級鎖
在程序第一次執(zhí)行到 synchronized 代碼塊的時候,鎖對象變成 偏向鎖 ,即偏向于第一個獲得它的線程的鎖。在程序第二次執(zhí)行到改代碼塊時,線程會判斷此時持有鎖的線程是否就是它自己,如果是就繼續(xù)往下面執(zhí)行。值得注意的是,在第一次執(zhí)行完同步代碼塊時,并不會釋放這個偏向鎖。從效率角度來看,如果第二次執(zhí)行同步代碼塊的線程一直是一個,并不需要重新做加鎖操作,沒有額外開銷,效率極高。
前面說的只有一個線程同步執(zhí)行代碼塊只是理想的狀態(tài)下(如果只有一個線程也不用考慮并發(fā)的問題了,雖然這么說有點不太嚴謹哈...),一旦有第二個線程加入 鎖競爭 ,偏向鎖就自動升級為 輕量級鎖 。而這里不同情況需值得注意:當第二個線程想要獲取鎖時,且這個鎖是偏向鎖時,會判斷當前持有鎖的線程是否仍然存活,如果該持有鎖的線程沒有存活,那么偏向鎖并不會升級為輕量級鎖 。什么是鎖競爭:即一個線程想要獲取另一個線程持有的鎖 。
在此狀態(tài)下各個線程繼續(xù)做鎖競爭,沒有搶到鎖的線程循環(huán)判斷是否能夠成功獲取鎖,這種狀態(tài)稱為 自旋 ,故輕量級鎖是一種 自旋鎖 。虛擬機中有個計數器用來記錄自旋次數,默認允許循環(huán)10次,這個值可以通過虛擬機參數-XX:PreBlockSpin來進行修改。如果鎖競爭特別嚴重,達到這個自旋次數最大的限度,輕量級鎖就會升級為重量級鎖。當其他線程嘗試獲取鎖的時候,發(fā)現現在的鎖是重量級鎖,則直接將自己掛起,等待將來被喚醒。
關于這塊的更多信息,可以參考《Synchronized與三種鎖態(tài)》
公平鎖、非公平鎖
當一個線程持有的鎖釋放時,其他線程按照先后順序,先申請的先得到鎖,那么這個鎖就是公平鎖。反之,如果后申請的線程有可能先獲取到鎖,就是非公平鎖 。
Java 中的 ReentrantLock 可以通過其構造函數來指定是否是公平鎖,默認是非公平鎖。一般來說,使用非公平鎖可以獲得較大的吞吐量,所以推薦優(yōu)先使用非公平鎖。

synchronized 就是一種非公平鎖。
樂觀鎖、悲觀鎖
先說悲觀鎖,即在讀數據的時候總認為其他線程會對數據進行修改,所以采取加鎖的形式,一旦本線程要讀取數據時,就加鎖,其他線程被阻塞,等待鎖的釋放。所以悲觀鎖總結為悲觀加鎖阻塞線程。
在讀數據時總認為其他線程不會對數據做修改,在更新數據時會判斷其他線程有沒有更新數據,如果有更新,則重新讀取,再次嘗試更新,循環(huán)上述步驟直到更新成功,即為樂觀鎖。
這樣來看樂觀鎖實際上是沒有鎖的,只是通過一種比較交換的方法來保證數據同步,總結為樂觀無鎖回滾重試。
CAS(比較和交換)
CAS, 英文直譯為 compare and swap,即比較和交換。上面樂觀鎖也說了,其實就是一種比較與交換的過程。
簡單描述一下就是:讀取到一個值為 A ,在要將這個值更新為B 之前,檢查是否等于 A (比較),如果是則將 A 更新為 B(交換) ,否則什么都不做。
通過這種方式,可以實現不必使用加鎖的方式,就能保證資源在多線程之間的同步,顯然,不阻塞線程,可以大大提高吞吐量。方式雖好,但是也存在問題。
-
ABA 問題,即如果一個值從 A 變?yōu)?B 再變回 A 時,這樣 CAS 就會認為值沒有發(fā)生變化。
- 對于這個問題,已經有了使用版本號的解決方式,即每次變量更新的時候變量的版本號都 +1,即由
A->B->A就變成了1A->2B->3A。
- 對于這個問題,已經有了使用版本號的解決方式,即每次變量更新的時候變量的版本號都 +1,即由
- 循環(huán)時間長開銷大,如果鎖的競爭比較激烈,就會導致 CAS 不斷的重復執(zhí)行,一直循環(huán),耗費 CPU 資源。
- 只能保證一個變量的同步,顯然,由于其特性,CAS 只能保證一個共享變量的原子操作。
可重入鎖
可重入鎖即允許多個線程多次獲取同一把鎖,那從鎖本身的角度來看,就是可以重新進入該鎖。比如有一個遞歸函數里面有加鎖操作,如果這個鎖不阻塞自己,就是可重入鎖,故也稱遞歸鎖 。
再看上面的偏向鎖、輕量級鎖、重量級鎖可以知道synchronized關鍵字加鎖是可重入的 ,不僅如此,JDK 中實現 Lock 接口的鎖都是可重入的 。感興趣的讀者可以自行了解怎么實現不可重入鎖,這里只講一下鎖的定義。
可中斷鎖
如果線程A持有鎖,線程B等待獲取該鎖。由于線程A持有鎖的時間過長,線程B不想繼續(xù)等待了,我們可以讓線程B中斷自己或者在別的線程里中斷它,這種就是可中斷鎖。
在 Java 中,synchronized就是不可中斷鎖,而Lock的實現類都是可中斷鎖。
獨享鎖、共享鎖
獨享鎖亦稱互斥鎖,排它鎖,容易理解這種鎖每次只允許一個線程持有。反之,就是共享鎖啦。
讀鎖、寫鎖
上面說獨享鎖和共享鎖,其實讀寫鎖就是其最典型的鎖。寫鎖是獨享鎖,讀鎖是共享鎖。在后面我們會著重說一下Java 中的讀寫鎖實現。
三、CAS 在 Java 中的實現,帶上這把 M4-CAS
通過上面對 CAS 的簡單介紹,相信大家對 CAS 也有了一個比較簡單的概念:通過比較和交換實現單個變量的線程安全 。
JDK 中對 CAS 的實現在 java.util.concurrent.atomic 包中:

| 類名 | 描述 |
|---|---|
| AtomicBoolean | 可以用原子方式更新的 boolean 值。 |
| AtomicInteger | 可以用原子方式更新的 int 值。 |
| AtomicIntegerArray | 可以用原子方式更新其元素的 int 數組。 |
| AtomicIntegerFieldUpdater | 基于反射的實用工具,可以對指定類的指定 volatile int 字段進行原子更新。 |
| AtomicLong | 可以用原子方式更新的 long 值。 |
| AtomicLongArray | 可以用原子方式更新其元素的 long 數組。 |
| AtomicLongFieldUpdater | 基于反射的實用工具,可以對指定類的指定 volatile long 字段進行原子更新。 |
| AtomicMarkableReference |
AtomicMarkableReference 維護帶有標記位的對象引用,可以原子方式對其進行更新。用來解決 ABA 問題,只關心有沒有被修改過。 |
| AtomicReference | 可以用原子方式更新的對象引用。 |
| AtomicReferenceArray | 可以用原子方式更新其元素的對象引用數組。 |
| AtomicReferenceFieldUpdater | 基于反射的實用工具,可以對指定類的指定 volatile 字段進行原子更新。 |
| AtomicStampedReference |
AtomicStampedReference 維護帶有整數“標志”的對象引用,可以用原子方式對其進行更新。用來解決 ABA 問題,與上面的 AtomicMarkableReference 相比,除了關系有沒有被修改過之外,還關心修改了幾次。 |
以 AtomicInteger 為例,看看它是如何保證對 int 的操作線程安全的。
package java.util.concurrent.atomic;
import java.util.function.IntUnaryOperator;
import java.util.function.IntBinaryOperator;
import sun.misc.Unsafe;
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// 使用 Unsafe.compareAndSwapInt 進行數據更新
private static final Unsafe unsafe = Unsafe.getUnsafe();
// 內存偏移量,即內存地址
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
/**
* 帶有初值的構造函數
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* 無參構造函數默認值為0
*/
public AtomicInteger() {
}
/**
* 獲得當前值
*
* @return the current value
*/
public final int get() {
return value;
}
/**
* 設置所給值,因為value是使用volatile關鍵字修飾,所以一經修改,其他線程會立即看到value的修改
*
* @param newValue the new value
*/
public final void set(int newValue) {
value = newValue;
}
/**
* 設置給定的值,通過調用Unsafe的延遲設置方法不保證結果被其他線程立即看到
*
* @param newValue the new value
* @since 1.6
*/
public final void lazySet(int newValue) {
unsafe.putOrderedInt(this, valueOffset, newValue);
}
/**
* 原子方式設置新值,返回舊值
*
* @param newValue the new value
* @return the previous value
*/
public final int getAndSet(int newValue) {
return unsafe.getAndSetInt(this, valueOffset, newValue);
}
/**
* 使用 CAS 方式設置新值,成功返回true
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that
* the actual value was not equal to the expected value.
*/
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 使用 CAS 方式更新值
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
*/
public final boolean weakCompareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 原子增加
*
* @return the previous value
*/
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
/**
* 原子減1
*
* @return the previous value
*/
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
/**
* 原子增加給定值
*
* @param delta the value to add
* @return the previous value
*/
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
/**
* Atomically increments by one the current value.
*
* @return the updated value
*/
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
/**
* Atomically decrements by one the current value.
*
* @return the updated value
*/
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
/**
* Atomically adds the given value to the current value.
*
* @param delta the value to add
* @return the updated value
*/
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
/**
* Atomically updates the current value with the results of
* applying the given function, returning the previous value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the previous value
* @since 1.8
*/
public final int getAndUpdate(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return prev;
}
/**
* Atomically updates the current value with the results of
* applying the given function, returning the updated value. The
* function should be side-effect-free, since it may be re-applied
* when attempted updates fail due to contention among threads.
*
* @param updateFunction a side-effect-free function
* @return the updated value
* @since 1.8
*/
public final int updateAndGet(IntUnaryOperator updateFunction) {
int prev, next;
do {
prev = get();
next = updateFunction.applyAsInt(prev);
} while (!compareAndSet(prev, next));
return next;
}
/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the previous value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the previous value
* @since 1.8
*/
public final int getAndAccumulate(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return prev;
}
/**
* Atomically updates the current value with the results of
* applying the given function to the current and given values,
* returning the updated value. The function should be
* side-effect-free, since it may be re-applied when attempted
* updates fail due to contention among threads. The function
* is applied with the current value as its first argument,
* and the given update as the second argument.
*
* @param x the update value
* @param accumulatorFunction a side-effect-free function of two arguments
* @return the updated value
* @since 1.8
*/
public final int accumulateAndGet(int x,
IntBinaryOperator accumulatorFunction) {
int prev, next;
do {
prev = get();
next = accumulatorFunction.applyAsInt(prev, x);
} while (!compareAndSet(prev, next));
return next;
}
/**
* Returns the String representation of the current value.
* @return the String representation of the current value
*/
public String toString() {
return Integer.toString(get());
}
/**
* Returns the value of this {@code AtomicInteger} as an {@code int}.
*/
public int intValue() {
return get();
}
/**
* Returns the value of this {@code AtomicInteger} as a {@code long}
* after a widening primitive conversion.
* @jls 5.1.2 Widening Primitive Conversions
*/
public long longValue() {
return (long)get();
}
/**
* Returns the value of this {@code AtomicInteger} as a {@code float}
* after a widening primitive conversion.
* @jls 5.1.2 Widening Primitive Conversions
*/
public float floatValue() {
return (float)get();
}
/**
* Returns the value of this {@code AtomicInteger} as a {@code double}
* after a widening primitive conversion.
* @jls 5.1.2 Widening Primitive Conversions
*/
public double doubleValue() {
return (double)get();
}
}
源碼中的注釋寫的很詳細,寫注釋寫到一半決定不做谷歌翻譯了...主要是通過 Unsafe 提供的 compareAndSwapInt(Object var1, long var2, int var4, int var5) 等方法來實現 CAS 原子操作,Unsafe 提供了執(zhí)行低級別、不安全操作的方法,如直接訪問系統(tǒng)內存資源、自主管理內存資源等。
CAS操作包含三個操作數---內存位置、預期原值及新值。執(zhí)行 CAS 操作的時候,將內存位置的值與預期原值比較,如果相匹配,那么處理器會自動將該位置值更新為新值,否則,處理器不做任何操作。我們都知道,CAS 是一條 CPU 的原子指令( cmpxchg 指令),不會造成所謂的數據不一致問題,Unsafe 提供的 CAS 方法(如 compareAndSwapXXX )底層實現即為 CPU 指令 cmpxchg 。
如果小伙伴對其感興趣可以參考 《Java魔法類:Unsafe應用解析》
四、AQS 給你穿上三級甲
AQS,全名 AbstractQueuedSynchronizer,直譯為抽象隊列同步器,是構建鎖或者其他同步組件的基礎框架,可以解決大部分同步問題。實現原理可以簡單理解為:同步狀態(tài)( state ) + FIFO 線程等待隊列 。
-
資源 state
AQS使用了一個 int 類型的成員變量 state 來表示同步狀態(tài),使用了volatile關鍵字來保證線程間的可見性,當 state > 0 時表示已經獲取了鎖,當 state = 0 時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態(tài)state進行操作,確保對state的操作是安全的。
? 而對于不同的鎖,state 也有不同的值:- 獨享鎖中 state =0 代表釋放了鎖,state = 1 代表獲取了鎖。
- 共享鎖中 state 即持有鎖的數量。
- 可重入鎖 state 即代表重入的次數。
- 讀寫鎖比較特殊,因 state 是 int 類型的變量,為 32 位,所以采取了中間切割的方式,高 16 位標識讀鎖的數量 ,低 16 位標識寫鎖的數量 。
FIFO 線程等待隊列
實現隊列的方式無外乎兩種,一是使用數組,二是使用 Node 。AQS 使用了 Node 的方式實現隊列。
static final class Node {
/** 標記一個節(jié)點是在共享模式,默認為共享模式 */
static final Node SHARED = new Node();
/** 標記一個節(jié)點為獨占模式 */
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/** 以此變量來表示當前線程的狀態(tài) */
volatile int waitStatus;
/** 前驅 */
volatile Node prev;
/** 后繼 */
volatile Node next;
/** 用于保存線程 */
volatile Thread thread;
/** 保存下一個處于等待狀態(tài)的Node */
Node nextWaiter;
/**
* 用來判斷是否是共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // 無參構造方法,默認為共享模式
}
Node(Thread thread, Node mode) { // 用于構造下一個等待線程Node節(jié)點
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // 用于構造帶有本線程狀態(tài)的Node
this.waitStatus = waitStatus;
this.thread = thread;
}
}
在 AQS 中定義了兩個節(jié)點,分別為頭尾節(jié):
/** 等待隊列的頭結點,作為隊列的初始化節(jié)點,只能通過setHead()方法設置值,
* 而這個方法將Node的變量值都置空,便于及時GC。當其有值時,必須保證waiteStatus為CANCELLED狀態(tài)。
*/
private transient volatile Node head;
/**
* 用于保存線程等待隊列的尾結點。
* 通過enq()方法設置值。
*/
private transient volatile Node tail;
這個隊列的結構見下圖:

AQS 的一堆方法,按照獲取鎖和解鎖的維度可以分為下面這樣:
獲取鎖相關方法
| 方法 | 描述 |
|---|---|
| acquire(int arg) | 獨占模式獲取鎖,忽略中斷。 |
| acquireInterruptibly(int arg) | 獨占模式獲取鎖,如果被中斷則中止。 |
| acquireShared(int arg) | 共享模式獲取鎖,忽略中斷。 |
| acquireSharedInterruptibly(int arg) | 共享模式獲取鎖,如果被中斷則中止。 |
| tryAcquire(int arg) | 嘗試在獨占模式獲取鎖。由子類自行實現。 |
| tryAcquireNanos(int arg, long nanosTimeout) | 嘗試在獨占模式獲取鎖,如果被中斷則中止,如果到了給定超時時間 nanosTimeout ,則會失敗。 |
| tryAcquireShared(int arg) | 嘗試在共享模式獲取鎖。 |
| tryAcquireSharedNanos(int arg, long nanosTimeout) | 嘗試在共享模式獲取鎖,如果被中斷則中止,如果到了給定超時時間 nanosTimeout ,則會失敗。 |
| addWaiter(Node mode) | 將當前線程加入到CLH隊列隊尾。 |
| acquireQueued(final Node node, int arg) | 當前線程會根據公平性原則來進行阻塞等待,直到獲取鎖為止;并且返回當前線程在等待過程中有沒有中斷過。 |
| selfInterrupt() | 產生一個中斷。 |
解鎖相關方法
| 方法 | 描述 |
|---|---|
| release(int arg) | 以獨占模式釋放對象。 |
| releaseShared(int arg) | 以共享模式釋放對象。 |
| tryRelease(int arg) | 試圖設置狀態(tài)來反映獨占模式下的一個釋放。由子類自行實現。 |
| tryReleaseShared(int arg) | 試圖設置狀態(tài)來反映共享模式下的一個釋放。 |
| unparkSuccessor(Node node) | 用來喚醒節(jié)點。 |
五、Lock 帶你吃雞
除了 synchronized 外,在 Java 中還有 Lock 接口的一系列實現來加鎖。

如上綠色虛線表示實現,WriteLock、ReadLock、ReentrantLock 都實現了 Lock 接口,三者分別對應讀鎖、寫鎖和可重入鎖,其中 ReadWriteLock 定義了讀鎖和寫鎖,ReentrantReadWriteLock 以靜態(tài)內部類的形式實現了讀寫鎖。
首先創(chuàng)建一個單例讀寫鎖:
package com.aysaml.demo.test;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 單例讀寫鎖
*
* @author wangning
* @date 2019-11-26
*/
public enum Locker {
instance;
private Locker() {
}
private static final ReadWriteLock lock = new ReentrantReadWriteLock();
public Lock writeLock() {
return lock.writeLock();
}
}
如此可以在上述的累加方法上面加鎖做同步:
private static void addCount() {
Lock locker = Locker.instance.writeLock();
locker.lock();
count++;
locker.unlock();
}
看下運行結果,如愿以償(總感覺這個詞用在這不太對,真是實在想不出什么詞了,哈哈,意思你們懂就好)。

兩種加鎖方式比較:
synchronized屬于互斥鎖,任何時候只允許一個線程的讀寫操作,其他線程必須等待;
ReadWriteLock允許多個線程獲得讀鎖,但只允許一個線程獲得寫鎖,效率相對較高。
看了上面的 Lock 接口的實現圖,我們知道在 Java 中鎖有三個重要實現,下面一一來看。
讀鎖寫鎖抽象隊列同步器三級甲
上面說了讀寫鎖屬于共享鎖,即允許同一時刻有多個線程獲取鎖。在一些業(yè)務中,讀的操作比寫的操作多,相比較 synchronized 而言,讀寫鎖采用 CAS 方式保證資源同步,所以使用讀寫鎖可以大大增加吞吐量。
在Java 中 ReentrantReadWriteLock 中以內部類的形式實現了讀寫鎖,如下:

再接著分別看他們的實現:
- ReadLock

- WriteLock

可以看到兩個鎖中的加鎖操作都有一個關鍵的東西 Sync :

Sync 是 ReentrantReadWriteLock 的一個抽象內部類,它繼承了 AbstractQueuedSynchronizer 并實現了共享與獨享方式的同步操作。讀寫鎖正好是一對共享&獨占鎖,而同步的隊列也有共享和獨占之分,那我們就從它們的加鎖和解鎖分別來看 AQS 的工作流程:
寫鎖(獨占式)
加鎖
public void lock() {
sync.acquire(1);
}
這是 WriteLock 的加鎖方法,可以看到加鎖實際上是調用了 Sync 的 acquire(int arg) 方法,而這個方法是在 AQS 中實現的,它使用 final 關鍵字來做修飾,在子類中不可重寫。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
那 acquire(int arg) 做了什么呢,可以看到執(zhí)行了四個方法:
- tryAcquire:嘗試獲取鎖,獲取成功則設置鎖狀態(tài)并返回 true ,否則返回 false 。需子類自行實現。
- addWaiter:將當前線程加入到 CLH 隊列隊尾。已有實現。
- acquireQueued:當前線程會根據公平性原則來進行阻塞等待,直到獲取鎖為止;并且返回當前線程在等待過程中有沒有中斷過。已有實現。
- selfInterrupt:產生中斷。已有實現。
前面我們說 AQS 由線程狀態(tài) state 和 線程等待隊列組成,AQS 加鎖解鎖的過程實際上就是對線程狀態(tài)的修改和等待隊列的出入隊列操作,而 AQS 的子類可以通過重寫 tryAcquire(int acquires) 方法來對 state 進行修改操作。于是就有 ReentrantReadWriteLock 中的 Sync 重寫了 tryAcquire 方法:
protected final boolean tryAcquire(int acquires) {
// 獲取當前線程
Thread current = Thread.currentThread();
// 拿到state變量,即鎖的個數
int c = getState();
// 獲得寫鎖的數量,前邊說了低16位表示寫鎖個數
int w = exclusiveCount(c);
// 若該線程已經持有鎖
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 如果寫鎖數量為0或者持有鎖的線程不是當前線程,返回 false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果寫入鎖數量大于最大數量(65535),跑出異常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// 如果寫線程數為0,并且當前線程需要阻塞那么就返回失??;或者如果通過CAS增加寫線程數失敗也返回失敗。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 設置當前線程為鎖的擁有者
setExclusiveOwnerThread(current);
return true;
}
再來看下 addWaiter 方法:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 通過 CAS 設置尾結點
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果不成功則多次嘗試
enq(node);
return node;
}
enq(Node node) 方法如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
可以看到 enq 方法使用了死循環(huán)的方式一致嘗試設置尾結點,直到成功。
如此入隊操作就可以簡單理解為:tail 指向新節(jié)點、新節(jié)點的 prev 指向當前最后的節(jié)點,當前最后一個節(jié)點的 next 指向當前節(jié)點。
解鎖
public void unlock() {
sync.release(1);
}
解鎖調用了 Sync 的 release 方法,下面看看這個方法都做了什么:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 喚醒節(jié)點
unparkSuccessor(h);
return true;
}
return false;
}
與加鎖類似,tryRelease(arg) 由 AQS 的子類自行重寫,
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
釋放鎖比較簡單,**釋放鎖的實現是通過 CAS 修改 waitStatus 為 0 來實現的,然后通過 LockSupport.unpark(s.thread) 喚醒線程 **。
為了方便理解,在這里總結一下 WriteLock 的工作流程圖:
? 加鎖操作

讀鎖 (共享式)
讀鎖的加鎖操作與寫鎖類似:
public void lock() {
sync.acquireShared(1);
}
調用自定義的 tryAcquireShared(arg) 方法獲取同步狀態(tài)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
如果獲取失敗,調用 doAcquireShared(int arg) 方法自旋方式獲取同步狀態(tài):
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
再看一下 tryAcquireShared(int unused) 方法:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 如果其他線程已經獲取了寫鎖,則當前線程獲取讀鎖失敗,進入等待狀態(tài)
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
在這個方法中可以知道,如果其他線程已經獲取了寫鎖,則當前線程獲取讀鎖失敗,進入等待狀態(tài)。如果當前線程獲取了寫鎖或者寫鎖未被獲取,則當前線程增加讀狀態(tài),成功獲取讀鎖。
六、結語
可能讀完之后小伙伴并沒有整明白,反而更懵逼了;或者根本沒有讀完。概念比較多,貼的代碼也比較多,可以配合其中幾個比較重要的圖去理解,幾個比較關鍵的點:鎖的分類思維導圖、AQS、CAS、寫鎖的加鎖實現過程圖。由于筆者水平所限,如上都是在學習的過程中總結、整理所得,僅供參考。
歡迎訪問個人博客 獲取更多知識分享。