AQS源碼分析

一、前言

問題引入

@Controller
public class TradeController {
    @Autowired
    private TradeService tradeService;

    @RequestMapping("/order")
    public String order() {
        tradeService.decStock();
        return "success";
    }
}

@Service
public class TradeService {
    Logger logger = Logger.getLogger(TradeService.class);
    @Autowired
    JdbcTemplate jdbcTemplate;
    /**
     * 扣減庫存
     * @return
     */
    public String decStock() {
        Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
        if (stock <= 0) {
            logger.info("庫存不足,下單失??!");
            return "庫存不足,下單失??!";
        }
        stock--;
        jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
        logger.info("下單成功,當(dāng)前剩余庫存:" + stock);
        return "下單成功,當(dāng)前剩余庫存:" + stock;
    }
}

如上代碼,用Jmeter模擬30個(gè)請(qǐng)求同時(shí)下單,結(jié)果30個(gè)請(qǐng)求都下單成功,產(chǎn)生了超賣問題。

自定義AQS解決

下面實(shí)現(xiàn)自定義一個(gè)同步器來實(shí)現(xiàn)自定義鎖

/**
 * 2021/7/1
 * 自定義AQS實(shí)現(xiàn)
 */
public class MyLock {

    private volatile int state = 0;

    private Thread lockHolder;

    // 要用線程安全的隊(duì)列作為等待隊(duì)列,基于CAS實(shí)現(xiàn)
    // linkedBlockedQueue基于AQS實(shí)現(xiàn),不能用
    private ConcurrentLinkedDeque<Thread> waiters = new ConcurrentLinkedDeque<>();

    public int getState() {
        return state;
    }
    public void setState(int state) {
        this.state = state;
    }
    public Thread getLockHolder() {
        return lockHolder;
    }
    public void setLockHolder(Thread lockHolder) {
        this.lockHolder = lockHolder;
    }

    public void lock() {
        Thread currentThread = Thread.currentThread();
        if (acquire()) {
            return;
        }
        waiters.add(currentThread);
        // 自旋
        for (; ; ) {
            // 隊(duì)列里第一個(gè)線程才能搶鎖
            if (currentThread == waiters.peek() && acquire()) {
                // 隊(duì)列頭線程拿到鎖 踢出等待隊(duì)列
                waiters.poll();
                return;
            }
            // 阻塞當(dāng)前線程 放棄CPU使用權(quán)
            LockSupport.park();
        }
    }

    public void unLock() {
        if (Thread.currentThread() != lockHolder) {
            throw new RuntimeException("lockHolder is not current thread");
        }
        if (compareAndSwapState(getState(), 0)) {
            setLockHolder(null);
            // 喚醒隊(duì)列里第一個(gè)線程
            Thread first = waiters.peek();
            if (first != null) {
                LockSupport.unpark(first);
            }
        }
    }

    // 是否能加鎖成功
    private boolean acquire() {
        Thread currentThread = Thread.currentThread();
        if (getState() == 0) { // 同步器尚未被持有
            // 沒人排隊(duì)/自己是隊(duì)列頭,才能去嘗試原子操作改變state
            if ((waiters.size() == 0 || currentThread == waiters.peek()) && compareAndSwapState(0, 1)) {
                setLockHolder(currentThread);
                return true;
            }
        }
        return false;
    }


    // 利用Unsafe類實(shí)現(xiàn)原子操作改變值
    public final boolean compareAndSwapState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    private static final Unsafe unsafe = reflectGetUnsafe();
    // 偏移量
    private static final long stateOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset(MyLock.class.getDeclaredField("state"));
        } catch (Exception e) {
            throw new Error();
        }
    }

    // 反射獲取Unsafe類
    private static Unsafe reflectGetUnsafe() {
        try {
            Field field = Unsafe.class.getDeclaredField("theUnsafe");
            field.setAccessible(true);
            return (Unsafe) field.get(null);
        } catch (Exception e) {
            return null;
        }
    }
}
public class TradeService {
    Logger logger = Logger.getLogger(TradeService.class);
    @Autowired
    JdbcTemplate jdbcTemplate;
    // 單例 創(chuàng)建鎖對(duì)象
    MyLock myLock = new MyLock();
    public String decStock() {
        myLock.lock();  // 加鎖
        Integer stock = jdbcTemplate.queryForObject("select stock from goods_stock where id = 1", Integer.class);
        if (stock <= 0) {
            logger.info("庫存不足,下單失??!");
            myLock.unLock();    // 業(yè)務(wù)失敗 釋放鎖
            return "庫存不足,下單失敗!";
        }
        stock--;
        jdbcTemplate.update("update goods_stock set stock = ? where id = 1", stock);
        logger.info("下單成功,當(dāng)前剩余庫存:" + stock);
        myLock.unLock();    // 業(yè)務(wù)成功 釋放鎖
        return "下單成功,當(dāng)前剩余庫存:" + stock;
    }
}

二、概述

抽象同步框架,可以用來實(shí)現(xiàn)一個(gè)依賴狀態(tài)的同步器

1.可以實(shí)現(xiàn)獨(dú)占與共享兩種模式:

  • 獨(dú)占:互斥,資源只能同時(shí)被一個(gè)線程占有,如ReentrantLock、Mutex

  • 共享:資源不互斥,可以被多個(gè)線程占有,如Semaphre、CountDownLatch

    一般來說同時(shí)只會(huì)實(shí)現(xiàn)一種模式,但也有ReentrantReadWriteLock同時(shí)實(shí)現(xiàn)獨(dú)占和共享兩種方式

2.核心屬性:

  • state:同步器的狀態(tài),即共享資源。訪問方式為:getState()、setState()、compareAndSetState()

  • exclusiveOwnerThread:當(dāng)前持有共享資源的線程

  • head:同步隊(duì)列頭

  • tail:同步隊(duì)列尾

3.特性:

  • 可中斷

  • 可重入

4.自定義AQS

一般自定義同步器的時(shí)候,只需要自定義共享資源的獲取和釋放方式。至于等待隊(duì)列的維護(hù),AQS已經(jīng)定義好了,不需要重寫。所以主要重寫以下幾種方法:

  • isHeldExclusively():該線程是否正在獨(dú)占資源。只有用到condition才需要去實(shí)現(xiàn)它。

  • tryAcquire(int):獨(dú)占方式。嘗試獲取資源,成功則返回true,失敗則返回false。

  • tryRelease(int):獨(dú)占方式。嘗試釋放資源,成功則返回true,失敗則返回false。

  • tryAcquireShared(int):共享方式。嘗試獲取資源。負(fù)數(shù)表示失?。?表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。

  • tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點(diǎn)返回true,否則返回false。

[圖片上傳失敗...(image-ed6b0e-1649076723271)]

三、源碼

waitStatus表示Node節(jié)點(diǎn)的等待狀態(tài)

waitStatus 判斷結(jié)果 說明
0 初始化狀態(tài) 該節(jié)點(diǎn)尚未被初始化完成
1 取消狀態(tài)(CANCELLED) 說明該線程中斷或者等待超時(shí),需要移除該線程,進(jìn)入該狀態(tài)后節(jié)點(diǎn)不會(huì)變化
-1 有效狀態(tài)(SIGNAL) 下一個(gè)節(jié)點(diǎn)等著自己?jiǎn)拘?。?jié)點(diǎn)入隊(duì)會(huì)把前繼節(jié)點(diǎn)狀態(tài)更新為SIGNAL
-2 有效狀態(tài)(CONDITION) 結(jié)點(diǎn)等待在Condition上,當(dāng)其他線程調(diào)用了Condition的signal()方法后,CONDITION狀態(tài)的結(jié)點(diǎn)將從等待隊(duì)列轉(zhuǎn)移到同步隊(duì)列中,等待獲取同步鎖。
-3 有效狀態(tài)(PROPAGATE) 共享模式下,自己不僅會(huì)喚醒后繼節(jié)點(diǎn),同時(shí)也可能喚醒后繼節(jié)點(diǎn)的后繼節(jié)點(diǎn)

AQS中阻塞隊(duì)列采用雙向鏈表結(jié)構(gòu),使用prev、next連接;而條件隊(duì)列采用單向鏈表,采用nextWaiter連接

nextWaiter狀態(tài)標(biāo)志 說明
SHARED(共享模式) = new Node() 立即喚醒下一個(gè)節(jié)點(diǎn)
EXCLUSIVE(獨(dú)占模式) = null 等待當(dāng)前線程執(zhí)行完再喚醒
其他非空值 根據(jù)條件決定怎么喚醒下一個(gè)節(jié)點(diǎn)

1.acquire()

acquire()是獲取共享資源的頂級(jí)入口,獲取到資源則直接返回,否則入等待隊(duì)列,直到獲取到共享資源,這個(gè)過程忽略中斷的影響(如果需要可中斷,可以調(diào)用acquireInterruptibly())。獲取到資源之后就可以執(zhí)行臨界區(qū)的代碼了

   public final void acquire(int arg) {
        // 嘗試獲取共享資源
        if (!tryAcquire(arg) &&
            // 
            acquireQueued(
                // 加入等待隊(duì)列尾部
                addWaiter(Node.EXCLUSIVE), arg)
           )
            // 如果在acquireQueued()被中斷過 這里自己補(bǔ)一個(gè)中斷 
            selfInterrupt();
    }

1.1tryAcquire()

AQS中,這個(gè)方法不能被直接調(diào)用,需要子類重寫。這里不定義為抽象方法的好處在于,獨(dú)占模式下只需重寫tryAcquire(),共享模式寫重寫tryAcquireShared(),如果定義為抽象方法則都需要實(shí)現(xiàn)。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

以ReentrantLock為例,非公平鎖中中,會(huì)直接CAS嘗試獲取共享資源,公平鎖中,會(huì)先檢查隊(duì)列中有沒有正在等待的線程,才去獲取共享資源

1.2addWaiter()

如果tryAcquire()加鎖失敗,會(huì)addWaiter(Node.EXCLUSIVE), arg),創(chuàng)建一個(gè)獨(dú)占模式的節(jié)點(diǎn)加入到隊(duì)列尾部

   private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);

        // 嘗試快速入隊(duì)
        // 這里直接進(jìn)行了一次CAS加到尾部的嘗試,失敗才去自旋 為什么要這樣呢?直接調(diào)用enq(node)效果似乎也一樣?
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
       // 自旋 空隊(duì)列則CAS地初始化隊(duì)列,隊(duì)列有節(jié)點(diǎn)就把節(jié)點(diǎn)掛到隊(duì)列尾部
        enq(node);
        return node;
    }

1.3acquireQueued()

把當(dāng)前節(jié)點(diǎn)加入隊(duì)列尾部之后,acquireQueued()

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 獲取自己的前驅(qū)節(jié)點(diǎn)
                final Node p = node.predecessor();
                // 如果前驅(qū)節(jié)點(diǎn)是頭結(jié)點(diǎn)(正持有資源的節(jié)點(diǎn)) 自己是第二個(gè)節(jié)點(diǎn),那么可能有機(jī)會(huì)獲得資源 
                // 嘗試獲取資源(非公平鎖直接獲取,公平鎖會(huì)先檢查隊(duì)列中有沒有線程在等待再獲取)
                // 第一次進(jìn)來就直接看能不能獲取資源 也可能是等待后,被喚醒(前驅(qū)節(jié)點(diǎn)釋放資源/中斷)循環(huán)走到這里
                if (p == head && tryAcquire(arg)) {
                    // 獲取到資源 把當(dāng)前節(jié)點(diǎn)設(shè)為頭結(jié)點(diǎn),前驅(qū)節(jié)點(diǎn)置空,保存的線程置空(頭結(jié)點(diǎn)沒必要保存線程了)
                    setHead(node);
                    // 去掉之前的頭節(jié)點(diǎn)
                    p.next = null; // help GC
                    failed = false;
                    
                    // 返回等待過程中有沒有被中斷過
                    return interrupted;
                }
                // 如果自己不是第二個(gè)節(jié)點(diǎn) 或者自己雖然是第二個(gè)節(jié)點(diǎn),但是由于非公平鎖,新來的線程可以不入隊(duì)列,直接獲取鎖,所以這里可能被其他線程搶先了
                // 檢查是否可以休息,找到可以安心休息的地方。將前驅(qū)節(jié)點(diǎn)waitStatus改為-1,即后續(xù)來喚醒自己
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // park 阻塞等待
                    // 檢查中斷標(biāo)志 如果是被中斷了而不是被unPark()
                    // 后面tryAcquire方法()獲取到資源之后會(huì)返回中斷標(biāo)志,acquire()里自行產(chǎn)生一個(gè)中斷標(biāo)志
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

嘗試獲取資源失敗后,檢查自己是否可以安心休息,如果不能就找到一個(gè)可以安心休息的地方

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
         // 前驅(qū)節(jié)點(diǎn)的狀態(tài)已經(jīng)是SIGNAL,到時(shí)候會(huì)來喚醒自己,所以可以安心休息
        if (ws == Node.SIGNAL)
            return true;
        // 前驅(qū)節(jié)點(diǎn)狀態(tài)是被取消 
        if (ws > 0) {
            do {
                // 一直往前找沒有取消的節(jié)點(diǎn)
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 把自己掛到它后面
            pred.next = node;
        } else {
            // 走到這里意味著狀態(tài)是0/PROPAGATE 則把前驅(qū)節(jié)點(diǎn)狀態(tài)置為SIGNAL 
            // 這里就算成功了,也不會(huì)直接返回true開始休息,而是再一輪嘗試獲取資源,獲取不到再park
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    private final boolean parkAndCheckInterrupt() {
        // 等待
        LockSupport.park(this);
        // 被喚醒 返回自己是不是因?yàn)橹袛啾粏拘? 中斷信號(hào)能打斷一個(gè)等待中的線程,終止等待
        return Thread.interrupted();
    }

總結(jié):如果當(dāng)前是第二個(gè)節(jié)點(diǎn),就去嘗試獲取資源,如果成功了直接返回。否則的話,尋找一個(gè)合適的休息點(diǎn),開始等待,直到被前驅(qū)節(jié)點(diǎn)unPark()或被中斷,才繼續(xù)判斷自己是否是第二個(gè)節(jié)點(diǎn),去嘗試獲取資源。這個(gè)方法會(huì)返回等待過程中是否被中斷過,后續(xù)用來自行產(chǎn)生一個(gè)中斷。這也是不響應(yīng)中斷的核心,等待過程不可中斷,只有獲取到資源之后,才可以被中斷

1.4selfInterrupt()

自行產(chǎn)生一個(gè)中斷

1.5總結(jié)

1.先嘗試獲取資源

2.獲取不到則把當(dāng)前線程加入隊(duì)列尾,標(biāo)記獨(dú)占模式

3.檢查自己是否為第二個(gè)節(jié)點(diǎn),是則嘗試獲取鎖,不是則進(jìn)入等待,等輪到自己了,前驅(qū)節(jié)點(diǎn)會(huì)unPark自己,自己再去嘗試獲取資源。如果被中斷過,則返回出去

4.獲取資源成功,如果被中斷過,則產(chǎn)生一個(gè)中斷

2.release()

釋放資源的頂級(jí)入口,釋放完資源,會(huì)喚醒隊(duì)列中后一個(gè)正在等待的線程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            // 判斷頭節(jié)點(diǎn)不為空,狀態(tài)不為0
            if (h != null && h.waitStatus != 0)
                // 喚醒后繼節(jié)點(diǎn) 找到隊(duì)列里離head最近的一個(gè)沒取消的node,unpark恢復(fù)其運(yùn)行
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

2.1tryRelease()

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

2.2unparkSuccessor()

   private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        // 當(dāng)前結(jié)點(diǎn)狀態(tài)為負(fù) 則置為0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        // 一般要喚醒的就是后一個(gè)節(jié)點(diǎn),但是可能該節(jié)點(diǎn)已取消 所以要從尾結(jié)點(diǎn)一直往前找,找到真正有效的節(jié)點(diǎn)
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

疑問:這里為什么要從后往前找?

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            // 要掛上去的節(jié)點(diǎn) prev是一定能成功指向之前的尾結(jié)點(diǎn)的
            node.prev = pred;
            // CAS把尾結(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)
            if (compareAndSetTail(pred, node)) {
                // 但這里可能還沒有執(zhí)行到 此時(shí)之前的尾結(jié)點(diǎn)指向null
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

3.acquireShared()

共享模式下,嘗試獲取指定量的資源,獲取失敗則進(jìn)入等待隊(duì)列,直到獲取到資源。該過程忽略中斷

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

3.1tryAcquireShared()

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

負(fù)值:失敗,需要執(zhí)行doAcquireShared()進(jìn)入隊(duì)列

0:成功,但沒有剩余資源

正值:成功,還有剩余資源,其他線程還可以獲取

3.2doAcquireShared()

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        // 自己成為head
        setHead(node);
 
        // 如果還有剩余量 繼續(xù)喚醒下一個(gè)線程
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

3.3總結(jié)

先嘗試獲取資源,沒有獲取到則進(jìn)入隊(duì)列等待,直到獲取到資源。與獨(dú)占模式相比,自己拿到資源之后,還會(huì)繼續(xù)喚醒下一個(gè)線程

4.releaseShared()

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//嘗試釋放資源
        doReleaseShared();//喚醒后繼結(jié)點(diǎn)
        return true;
    }
    return false;
}

4.1tryReleaseShared()

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

4.2doReleaseShared()

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//喚醒后繼
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head發(fā)生變化
            break;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容