20210411_AQS鎖互斥源碼學(xué)習(xí)筆記總結(jié)
1概述
AQS是一個(gè)用來(lái)構(gòu)建鎖和同步器的框架,Lock包中的鎖(ReentrantLock獨(dú)占模式、ReadWriteLock)、Semaphore共享模式、CoundDownLoatch、Jdk之前的FutureTask等均基于AQS來(lái)構(gòu)建。
本文基于源碼進(jìn)行相關(guān)知識(shí)點(diǎn)進(jìn)行總結(jié)。
1.1主要知識(shí)點(diǎn)
基于NoFaire非公平、重入鎖ReentrantLock,模擬3個(gè)線(xiàn)程,第一個(gè)線(xiàn)程比較耗時(shí)。
-
后續(xù)2個(gè)線(xiàn)程首先嘗試獲取鎖
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }2.1.如果tryAcquire獲取鎖成功,則執(zhí)行業(yè)務(wù)處理。
2.2.如果tryAcquire獲取鎖失敗,則創(chuàng)建獨(dú)占模式的Node節(jié)點(diǎn)會(huì)進(jìn)入行FIFO雙向隊(duì)列,即addWaiter。然后走基類(lèi)AQS中的acquireQueued(注意加到隊(duì)列中的節(jié)點(diǎn)都是按順序去獲取鎖,判斷是否是頭結(jié)點(diǎn))。
2.3.如果是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為head,則有一次機(jī)會(huì)再次嘗試獲取鎖tryAcquire,如果獲取鎖成功,則執(zhí)行業(yè)務(wù)處理。
否則,并Park阻塞。
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
等待第一個(gè)線(xiàn)程執(zhí)行完畢,會(huì)通知unPark同步器中的隊(duì)列首個(gè)線(xiàn)程節(jié)點(diǎn)。
加鎖lock源碼分析。
解鎖unlock源碼分析。

AQS數(shù)據(jù)結(jié)構(gòu)圖

2代碼示例
package com.kikop.myjuclockstudy.myaqs.myreentrantlock;
import com.kikop.util2.MyDateUtil;
import com.kikop.util2.RandomUtil;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author kikop
* @version 1.0
* @project Name: technicalskill
* @file Name: AtomicDemoTest
* @desc 功能描述 ReentrantLock:默認(rèn)非公平獨(dú)占鎖、可重入鎖、獨(dú)占鎖、可中斷鎖
* @date 2020/6/7
* @time 17:47
* @by IDE: IntelliJ IDEA
*/
public class MyReenLockSimpleTest {
// static存存在jvm元數(shù)據(jù)區(qū)
// 加入FIFO隊(duì)列策略:誰(shuí)先加入,完全由Cpu時(shí)間片切換決定
// FIFO隊(duì)列節(jié)點(diǎn):誰(shuí)是第一個(gè)頭節(jié)點(diǎn)先執(zhí)行:判斷前面是否有頭結(jié)點(diǎn)
private static Lock lock = new ReentrantLock(); // ReentrantLock:默認(rèn)非公平獨(dú)占鎖、可重入鎖、獨(dú)占鎖、可中斷鎖
// 線(xiàn)程個(gè)數(shù)
private static int THREAD_COUNT = 3;
// 臨界區(qū)資源
private static int globalVar = 0;
public static void inc() {
try {
System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申請(qǐng)鎖,即將加入FIFO隊(duì)列...");
lock.lock(); // 加鎖
System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申請(qǐng)鎖-->獲得鎖成功!");
System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":開(kāi)始業(yè)務(wù)邏輯執(zhí)行.");
String currentThreadName = Thread.currentThread().getName();
if ("T1".equalsIgnoreCase(currentThreadName)) { // 模擬第一個(gè)線(xiàn)程耗時(shí)較長(zhǎng)時(shí)間:1分鐘,后續(xù)線(xiàn)程將如隊(duì)列
Thread.sleep(3 * 60 * 1000);
} else {
Thread.sleep(RandomUtil.getSpecialRangeRandomValue(100));
}
globalVar++;
System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":完成業(yè)務(wù)邏輯執(zhí)行.");
} catch (InterruptedException e) {
System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---釋放鎖異常!");
e.printStackTrace();
} finally {
System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---釋放鎖!");
lock.unlock(); // 釋放鎖
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] threads = new Thread[THREAD_COUNT];
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i] =
new Thread(() -> {
inc();
}, String.format("T%s", i + 1)
);
}
for (int i = 0; i < THREAD_COUNT; i++) {
threads[i].start();
Thread.sleep(100);
}
TimeUnit.MINUTES.sleep(30);
System.out.println("Result:" + globalVar);
}
}
3NoFair源碼分析
3.1AQS同步器初始化
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
3.2雙向FIFO隊(duì)列結(jié)構(gòu)
首先,當(dāng)T2,T3線(xiàn)程入隊(duì)列后,(小技巧:斷點(diǎn)加到 lock.unlock() 上一行),Sync同步器節(jié)點(diǎn)結(jié)構(gòu)如下圖:

當(dāng)前執(zhí)行線(xiàn)程exclusiveOwnerThread為:T1,state=1表示當(dāng)前鎖被使用中。

查看此時(shí)的head節(jié)點(diǎn)(prev=null,waitStatus=-1,持有線(xiàn)程:null)

查看此時(shí)的head節(jié)點(diǎn)的next節(jié)點(diǎn)(prev=null,waitStatus=-1,持有線(xiàn)程:T2)

查看此時(shí)的head節(jié)點(diǎn)的next節(jié)點(diǎn)(prev=T2持有,waitStatus=-1,持有線(xiàn)程:T3)

查看此時(shí)的tail節(jié)點(diǎn)(prev=T2持有,waitStatus=0,持有線(xiàn)程:T3)
3.3加鎖lock源碼分析

static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 獲取鎖lock()失敗,非阻塞等待,釋放CPU資源,執(zhí)行 acquire(1)
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
3.3.1acquire
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
// 1.再次嘗試獲取鎖 tryAcquire
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // acquireQueued::for (;;) {,這里將阻塞,等待 unPark喚醒
// 2.嘗試獲取鎖失敗, 則addWaiter,將節(jié)點(diǎn)加到FIFO隊(duì)列( 默認(rèn)獨(dú)占節(jié)點(diǎn))
// 3.acquireQueued
selfInterrupt(); // 4.獲取鎖成功,線(xiàn)程進(jìn)行自我中斷
}
// NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3.3.1.1addWaiter
// AbstractQueuedSynchronizer.java
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 首次入隊(duì)初始化 init(),先構(gòu)建一個(gè)FIFO空節(jié)點(diǎn)
return node; // 返回當(dāng)前New節(jié)點(diǎn)
}
3.3.1.1.1enq
tail為null時(shí),表示首次,需完成 FIFO鏈表的初始化
第二次將參數(shù):Node節(jié)點(diǎn)入隊(duì)列。
private Node enq(final Node node) {
for (;;) { // 無(wú)限循環(huán)
Node t = tail;
if (t == null) { // 第一次循環(huán),Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else { // 第二次循環(huán)
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.3.1.2acquireQueued
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // FIFO隊(duì)列按順序,否則節(jié)點(diǎn)變化太頻繁,
// 判斷node的頭節(jié)點(diǎn)是否為head,若果是head且獲取鎖成功,則設(shè)置head=node
setHead(node);
p.next = null; // 斷開(kāi)head,help GC
failed = false;
return interrupted;
}
// FailedAcquire 未獲取到鎖
// 將pred前一節(jié)點(diǎn) waitStatus由默認(rèn)值改為 Node.SIGNAL
// 修改完成后則 parkAndCheckInterrupt
// 等待某個(gè)時(shí)刻被喚醒后,喚醒后執(zhí)行 Thread.interrupted,表示線(xiàn)程被中斷喚醒(同時(shí)清除標(biāo)志位)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.1.1.2.1shouldParkAfterFailedAcquire
// 該值默認(rèn)0從達(dá)到小,CANCELLED SIGNAL CONDITION PROPAGATE
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
這里我們需重點(diǎn)分析一下 隊(duì)列中除tail節(jié)點(diǎn) pred Node waitStatus的流轉(zhuǎn)流程,該值默認(rèn)0-->-1,表示后面節(jié)點(diǎn)需要喚醒。
// init:
// pred:前驅(qū)節(jié)點(diǎn),init head節(jié)點(diǎn):0
// node:當(dāng)前節(jié)點(diǎn):0
// exec result:
// pred:前驅(qū)節(jié)點(diǎn),init head節(jié)點(diǎn):0
// node:當(dāng)前節(jié)點(diǎn):0(最后一個(gè)節(jié)點(diǎn)都是0)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 設(shè)置頭節(jié)點(diǎn)waitStatus:-1,unlock時(shí)會(huì)用到,具體看3.2節(jié)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
3.1.1.2.2parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// 判斷當(dāng)前線(xiàn)程是否被中斷,并且清除中斷標(biāo)志位
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}
3.1.1.3selfInterrupt
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();
synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}
3.4解鎖unlock源碼分析

public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) { // 釋放鎖成功(本質(zhì)就是修改標(biāo)志位)
Node h = head; // 每次都是從head節(jié)點(diǎn)開(kāi)始遍歷
if (h != null && h.waitStatus != 0) // h.waitStatus=-1
unparkSuccessor(h);
return true;
}
return false;
}
3.4.1tryRelease
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // c==0,表示可以釋放鎖了
if (Thread.currentThread() != getExclusiveOwnerThread()) // 不能釋放別的線(xiàn)程的鎖
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null); // 清空 AQS.exclusiveThread
}
setState(c); //設(shè)置AQS.state=0
return free;
}
3.4.1.1.1unparkSuccessor
T1線(xiàn)程業(yè)務(wù)處理完成,喚醒后繼節(jié)點(diǎn),這里即T2線(xiàn)程。
// node為當(dāng)前AQS的head頭節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus; // 頭結(jié)點(diǎn) waitStatus當(dāng)前為:-1
if (ws < 0) // 設(shè)置head Node waitStatus:-1--> 0,表示后繼節(jié)點(diǎn)喚醒流程引導(dǎo)完成
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next; // waitStatus 初始化完成后,基本上都是 -1 -1 0。
if (s == null || s.waitStatus > 0) { // 防止節(jié)點(diǎn)線(xiàn)程自我取消了。
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 這里喚醒線(xiàn)程 AQS 中的第一個(gè)非空節(jié)點(diǎn)T2
LockSupport.unpark(s.thread);
}
public static void unpark(Thread thread) {
if (thread != null)
UNSAFE.unpark(thread); // 回到:3.1.1.2.2,即return Thread.interrupted();
}
4總結(jié)
4.1公平與非公平鎖本質(zhì)區(qū)別分析
公平與非公平鎖本質(zhì)就是在調(diào)用基類(lèi)AQS中的acquire方法內(nèi)部tryAcquire方法,會(huì)根據(jù)子類(lèi)AQS子類(lèi)Sync、NonfairSync去調(diào)用不同的實(shí)現(xiàn)。
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
公平鎖:tryAcquire:hasQueuedPredecessors
// FairSync
final void lock() {
acquire(1);
}
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
非公平鎖:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
4.2鎖同步
AQS數(shù)據(jù)結(jié)構(gòu)為FIFO雙向鏈表。
條件變量Condition的wait,signal等價(jià)于Jdk原生對(duì)象Object的wait,Notify,NotifyAll。
ConditionObject可構(gòu)建多個(gè)等待隊(duì)列,Lock(同步隊(duì)列Q1)和Condition(條件等待隊(duì)列Q2),其實(shí)就是兩個(gè)隊(duì)列的互相移動(dòng)。