2021-04-11_AQS鎖互斥源碼學(xué)習(xí)筆記總結(jié)

20210411_AQS鎖互斥源碼學(xué)習(xí)筆記總結(jié)

1概述

AQS是一個(gè)用來(lái)構(gòu)建鎖和同步器的框架,Lock包中的鎖(ReentrantLock獨(dú)占模式、ReadWriteLock)、Semaphore共享模式、CoundDownLoatch、Jdk之前的FutureTask等均基于AQS來(lái)構(gòu)建。

本文基于源碼進(jìn)行相關(guān)知識(shí)點(diǎn)進(jìn)行總結(jié)。

1.1主要知識(shí)點(diǎn)

  1. 基于NoFaire非公平、重入鎖ReentrantLock,模擬3個(gè)線(xiàn)程,第一個(gè)線(xiàn)程比較耗時(shí)。

  2. 后續(xù)2個(gè)線(xiàn)程首先嘗試獲取鎖

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    2.1.如果tryAcquire獲取鎖成功,則執(zhí)行業(yè)務(wù)處理。

    2.2.如果tryAcquire獲取鎖失敗,則創(chuàng)建獨(dú)占模式的Node節(jié)點(diǎn)會(huì)進(jìn)入行FIFO雙向隊(duì)列,即addWaiter。然后走基類(lèi)AQS中的acquireQueued(注意加到隊(duì)列中的節(jié)點(diǎn)都是按順序去獲取鎖,判斷是否是頭結(jié)點(diǎn))。

    2.3.如果是當(dāng)前節(jié)點(diǎn)的前驅(qū)節(jié)點(diǎn)為head,則有一次機(jī)會(huì)再次嘗試獲取鎖tryAcquire,如果獲取鎖成功,則執(zhí)行業(yè)務(wù)處理。

    否則,并Park阻塞。

    // C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
  1. 等待第一個(gè)線(xiàn)程執(zhí)行完畢,會(huì)通知unPark同步器中的隊(duì)列首個(gè)線(xiàn)程節(jié)點(diǎn)。

  2. 加鎖lock源碼分析。

  3. 解鎖unlock源碼分析。

image-20210411204836987.png

AQS數(shù)據(jù)結(jié)構(gòu)圖

image-20210411230021227.png

2代碼示例

package com.kikop.myjuclockstudy.myaqs.myreentrantlock;

import com.kikop.util2.MyDateUtil;
import com.kikop.util2.RandomUtil;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author kikop
 * @version 1.0
 * @project Name: technicalskill
 * @file Name: AtomicDemoTest
 * @desc 功能描述 ReentrantLock:默認(rèn)非公平獨(dú)占鎖、可重入鎖、獨(dú)占鎖、可中斷鎖
 * @date 2020/6/7
 * @time 17:47
 * @by IDE: IntelliJ IDEA
 */
public class MyReenLockSimpleTest {

    // static存存在jvm元數(shù)據(jù)區(qū)
    // 加入FIFO隊(duì)列策略:誰(shuí)先加入,完全由Cpu時(shí)間片切換決定
    // FIFO隊(duì)列節(jié)點(diǎn):誰(shuí)是第一個(gè)頭節(jié)點(diǎn)先執(zhí)行:判斷前面是否有頭結(jié)點(diǎn)
    private static Lock lock = new ReentrantLock(); // ReentrantLock:默認(rèn)非公平獨(dú)占鎖、可重入鎖、獨(dú)占鎖、可中斷鎖

    // 線(xiàn)程個(gè)數(shù)
    private static int THREAD_COUNT = 3;

    // 臨界區(qū)資源
    private static int globalVar = 0;


    public static void inc() {

        try {
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申請(qǐng)鎖,即將加入FIFO隊(duì)列...");
            lock.lock();  // 加鎖
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":申請(qǐng)鎖-->獲得鎖成功!");

            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":開(kāi)始業(yè)務(wù)邏輯執(zhí)行.");
            String currentThreadName = Thread.currentThread().getName();
            if ("T1".equalsIgnoreCase(currentThreadName)) { // 模擬第一個(gè)線(xiàn)程耗時(shí)較長(zhǎng)時(shí)間:1分鐘,后續(xù)線(xiàn)程將如隊(duì)列
                Thread.sleep(3 * 60 * 1000);
            } else {
                Thread.sleep(RandomUtil.getSpecialRangeRandomValue(100));
            }
            globalVar++;
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":完成業(yè)務(wù)邏輯執(zhí)行.");
        } catch (InterruptedException e) {
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---釋放鎖異常!");
            e.printStackTrace();
        } finally {
            System.out.println(MyDateUtil.getCurrentDateStrByDefaultFormat() + "@" + Thread.currentThread().getName() + ":---釋放鎖!");
            lock.unlock(); // 釋放鎖
        }
    }

    public static void main(String[] args) throws InterruptedException {

        Thread[] threads = new Thread[THREAD_COUNT];

        for (int i = 0; i < THREAD_COUNT; i++) {
            threads[i] =
                    new Thread(() -> {
                        inc();
                    }, String.format("T%s", i + 1)
                    );
        }

        for (int i = 0; i < THREAD_COUNT; i++) {
            threads[i].start();
            Thread.sleep(100);
        }

        TimeUnit.MINUTES.sleep(30);
        System.out.println("Result:" + globalVar);
    }
}

3NoFair源碼分析

3.1AQS同步器初始化

static {
    try {
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}

3.2雙向FIFO隊(duì)列結(jié)構(gòu)

首先,當(dāng)T2,T3線(xiàn)程入隊(duì)列后,(小技巧:斷點(diǎn)加到 lock.unlock() 上一行),Sync同步器節(jié)點(diǎn)結(jié)構(gòu)如下圖:

image-20210411192219099.png

當(dāng)前執(zhí)行線(xiàn)程exclusiveOwnerThread為:T1,state=1表示當(dāng)前鎖被使用中。

image-20210411192507543.png

查看此時(shí)的head節(jié)點(diǎn)(prev=null,waitStatus=-1,持有線(xiàn)程:null)

image-20210411192723536.png

查看此時(shí)的head節(jié)點(diǎn)的next節(jié)點(diǎn)(prev=null,waitStatus=-1,持有線(xiàn)程:T2)

image-20210411192814560.png

查看此時(shí)的head節(jié)點(diǎn)的next節(jié)點(diǎn)(prev=T2持有,waitStatus=-1,持有線(xiàn)程:T3)

image-20210411192553272.png

查看此時(shí)的tail節(jié)點(diǎn)(prev=T2持有,waitStatus=0,持有線(xiàn)程:T3)

3.3加鎖lock源碼分析

image-20210411230105933.png
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1); // 獲取鎖lock()失敗,非阻塞等待,釋放CPU資源,執(zhí)行 acquire(1)
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

3.3.1acquire

// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    // 1.再次嘗試獲取鎖 tryAcquire
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // acquireQueued::for (;;) {,這里將阻塞,等待 unPark喚醒 
        // 2.嘗試獲取鎖失敗, 則addWaiter,將節(jié)點(diǎn)加到FIFO隊(duì)列( 默認(rèn)獨(dú)占節(jié)點(diǎn))
        // 3.acquireQueued
        selfInterrupt(); // 4.獲取鎖成功,線(xiàn)程進(jìn)行自我中斷
}
// NonfairSync
protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
// FairSync
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

3.3.1.1addWaiter

// AbstractQueuedSynchronizer.java

/**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
private transient volatile Node tail;

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node); // 首次入隊(duì)初始化 init(),先構(gòu)建一個(gè)FIFO空節(jié)點(diǎn)
    return node; // 返回當(dāng)前New節(jié)點(diǎn)
}
3.3.1.1.1enq

tail為null時(shí),表示首次,需完成 FIFO鏈表的初始化

第二次將參數(shù):Node節(jié)點(diǎn)入隊(duì)列。

private Node enq(final Node node) {
    for (;;) { // 無(wú)限循環(huán)
        Node t = tail;
        if (t == null) { // 第一次循環(huán),Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else { // 第二次循環(huán)
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

3.3.1.2acquireQueued

/**
 * Acquires in exclusive uninterruptible mode for thread already in
 * queue. Used by condition wait methods as well as acquire.
 *
 * @param node the node
 * @param arg the acquire argument
 * @return {@code true} if interrupted while waiting
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // FIFO隊(duì)列按順序,否則節(jié)點(diǎn)變化太頻繁,
                // 判斷node的頭節(jié)點(diǎn)是否為head,若果是head且獲取鎖成功,則設(shè)置head=node
                setHead(node);
                p.next = null; // 斷開(kāi)head,help GC
                failed = false;
                return interrupted;
            }
            // FailedAcquire 未獲取到鎖
            // 將pred前一節(jié)點(diǎn) waitStatus由默認(rèn)值改為 Node.SIGNAL
            // 修改完成后則 parkAndCheckInterrupt
            // 等待某個(gè)時(shí)刻被喚醒后,喚醒后執(zhí)行 Thread.interrupted,表示線(xiàn)程被中斷喚醒(同時(shí)清除標(biāo)志位)
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
3.1.1.2.1shouldParkAfterFailedAcquire
// 該值默認(rèn)0從達(dá)到小,CANCELLED SIGNAL CONDITION PROPAGATE
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED =  1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL    = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
 * waitStatus value to indicate the next acquireShared should
 * unconditionally propagate
 */
static final int PROPAGATE = -3;

volatile int waitStatus;

這里我們需重點(diǎn)分析一下 隊(duì)列中除tail節(jié)點(diǎn) pred Node waitStatus的流轉(zhuǎn)流程,該值默認(rèn)0-->-1,表示后面節(jié)點(diǎn)需要喚醒。

// init:
// pred:前驅(qū)節(jié)點(diǎn),init head節(jié)點(diǎn):0
// node:當(dāng)前節(jié)點(diǎn):0
// exec result:
// pred:前驅(qū)節(jié)點(diǎn),init head節(jié)點(diǎn):0
// node:當(dāng)前節(jié)點(diǎn):0(最后一個(gè)節(jié)點(diǎn)都是0)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 設(shè)置頭節(jié)點(diǎn)waitStatus:-1,unlock時(shí)會(huì)用到,具體看3.2節(jié)
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
3.1.1.2.2parkAndCheckInterrupt
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
// 判斷當(dāng)前線(xiàn)程是否被中斷,并且清除中斷標(biāo)志位
public static boolean interrupted() {
    return currentThread().isInterrupted(true);
}

3.1.1.3selfInterrupt

static void selfInterrupt() {
    Thread.currentThread().interrupt();
}
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();

    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

3.4解鎖unlock源碼分析

image-20210411230148669.png
public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) { // 釋放鎖成功(本質(zhì)就是修改標(biāo)志位)
        Node h = head; // 每次都是從head節(jié)點(diǎn)開(kāi)始遍歷
        if (h != null && h.waitStatus != 0) // h.waitStatus=-1
            unparkSuccessor(h);
        return true;
    }
    return false;
}

3.4.1tryRelease

protected final boolean tryRelease(int releases) {
    int c = getState() - releases; // c==0,表示可以釋放鎖了
    if (Thread.currentThread() != getExclusiveOwnerThread()) // 不能釋放別的線(xiàn)程的鎖
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null); // 清空 AQS.exclusiveThread
    }
    setState(c); //設(shè)置AQS.state=0
    return free;
}
3.4.1.1.1unparkSuccessor

T1線(xiàn)程業(yè)務(wù)處理完成,喚醒后繼節(jié)點(diǎn),這里即T2線(xiàn)程。

// node為當(dāng)前AQS的head頭節(jié)點(diǎn)
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus; // 頭結(jié)點(diǎn) waitStatus當(dāng)前為:-1
    if (ws < 0)  // 設(shè)置head Node waitStatus:-1--> 0,表示后繼節(jié)點(diǎn)喚醒流程引導(dǎo)完成
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next; // waitStatus 初始化完成后,基本上都是 -1 -1 0。
    if (s == null || s.waitStatus > 0) { // 防止節(jié)點(diǎn)線(xiàn)程自我取消了。
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null) // 這里喚醒線(xiàn)程 AQS 中的第一個(gè)非空節(jié)點(diǎn)T2
        LockSupport.unpark(s.thread);
}
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread); // 回到:3.1.1.2.2,即return Thread.interrupted();
}

4總結(jié)

4.1公平與非公平鎖本質(zhì)區(qū)別分析

公平與非公平鎖本質(zhì)就是在調(diào)用基類(lèi)AQS中的acquire方法內(nèi)部tryAcquire方法,會(huì)根據(jù)子類(lèi)AQS子類(lèi)Sync、NonfairSync去調(diào)用不同的實(shí)現(xiàn)。

// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

公平鎖:tryAcquire:hasQueuedPredecessors

// FairSync
final void lock() {
    acquire(1);
}
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
/**
 * Fair version of tryAcquire.  Don't grant access unless
 * recursive call or no waiters or is first.
 */
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

非公平鎖:

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
// C:\Program Files\Java\jdk1.8.0_60\src.zip!\java\util\concurrent\locks\AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
/**
 * Performs non-fair tryLock.  tryAcquire is implemented in
 * subclasses, but both need nonfair try for trylock method.
 */
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

4.2鎖同步

AQS數(shù)據(jù)結(jié)構(gòu)為FIFO雙向鏈表。

條件變量Condition的wait,signal等價(jià)于Jdk原生對(duì)象Object的wait,Notify,NotifyAll。

ConditionObject可構(gòu)建多個(gè)等待隊(duì)列,Lock(同步隊(duì)列Q1)和Condition(條件等待隊(duì)列Q2),其實(shí)就是兩個(gè)隊(duì)列的互相移動(dòng)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容