CAS和AQS

[TOC]

CAS

全稱(Compare And Swap),比較交換

Unsafe類是CAS的核心類,提供硬件級(jí)別的原子操作。

// 對(duì)象、對(duì)象的地址、預(yù)期值、修改值
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

缺點(diǎn):

  1. 開銷大:在并發(fā)量比較高的情況下,如果反復(fù)嘗試更新某個(gè)變量,卻又一直更新不成功,會(huì)給CPU帶來較大的壓力
  2. ABA問題:當(dāng)變量從A修改為B在修改回A時(shí),變量值等于期望值A(chǔ),但是無法判斷是否修改,CAS操作在ABA修改后依然成功。
    • 如何避免:Java提供了AtomicStampedReference和AtomicMarkableReference來解決。AtomicStampedReference通過包裝[E,Integer]的元組來對(duì)對(duì)象標(biāo)記版本戳stamp,對(duì)于ABA問題其解決方案是加上版本號(hào),即在每個(gè)變量都加上一個(gè)版本號(hào),每次改變時(shí)加1,即A —> B —> A,變成1A —> 2B —> 3A。
  3. 不能保證代碼塊的原子性:CAS機(jī)制所保證的只是一個(gè)變量的原子性操作,而不能保證整個(gè)代碼塊的原子性。
public class Test {
    private static AtomicInteger atomicInteger = new AtomicInteger(100);
    private static AtomicStampedReference atomicStampedReference = new AtomicStampedReference(100,1);

    public static void main(String[] args) throws InterruptedException {

        //AtomicInteger
        Thread at1 = new Thread(new Runnable() {
            @Override
            public void run() {
                atomicInteger.compareAndSet(100,110);
                atomicInteger.compareAndSet(110,100);
            }
        });

        Thread at2 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(2);      // at1,執(zhí)行完
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("AtomicInteger:" + atomicInteger.compareAndSet(100,120));
            }
        });

        at1.start();
        at2.start();

        at1.join();
        at2.join();

        //AtomicStampedReference

        Thread tsf1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //讓 tsf2先獲取stamp,導(dǎo)致預(yù)期時(shí)間戳不一致
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 預(yù)期引用:100,更新后的引用:110,預(yù)期標(biāo)識(shí)getStamp() 更新后的標(biāo)識(shí)getStamp() + 1
                atomicStampedReference.compareAndSet(100,110,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
                atomicStampedReference.compareAndSet(110,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp() + 1);
            }
        });

        Thread tsf2 = new Thread(new Runnable() {
            @Override
            public void run() {
                int stamp = atomicStampedReference.getStamp();

                try {
                    TimeUnit.SECONDS.sleep(2);      //線程tsf1執(zhí)行完
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("AtomicStampedReference:" +atomicStampedReference.compareAndSet(100,120,stamp,stamp + 1));
            }
        });

        tsf1.start();
        tsf2.start();
    }

}

AQS(AbstractQueuedSynchronizer)

維護(hù)一個(gè)volatile int state(代表共享資源狀態(tài))和一個(gè)FIFO線程等待隊(duì)列。

模板方法基本分為三類:

  • 獨(dú)占鎖
  • 共享鎖
  • 釋放鎖

資源共享的方式

  1. Exclusive(獨(dú)占,只有一個(gè)線程能執(zhí)行,如ReentrantLock)
  2. Share(共享,多個(gè)線程可以同時(shí)執(zhí)行,如Semaphore/CountDownLatch)

同步隊(duì)列

AQS依靠同步隊(duì)列(一個(gè)FIFO的雙向隊(duì)列)來完成同步狀態(tài)的管理。當(dāng)當(dāng)前線程獲取狀態(tài)失敗后,同步器會(huì)將當(dāng)前線程以及等待信息構(gòu)造成一個(gè)節(jié)點(diǎn)(Node),并嘗試將他加入到同步隊(duì)列。Head節(jié)點(diǎn)不保存等待的線程信息,僅通過next指向隊(duì)列中第一個(gè)保存等待線程信息的Node。

雙向同步隊(duì)列

Node類

源碼(中字注釋)

static final class Node {
    /** 代表共享模式 */
    static final Node SHARED = new Node();
    /** 代表獨(dú)占模式 */
    static final Node EXCLUSIVE = null;

    /** 以下四個(gè)狀態(tài)解釋見下文等待狀態(tài) */
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;

    /** 標(biāo)識(shí)等待狀態(tài),通過CAS操作更新,原子操作不會(huì)被打斷*/
    volatile int waitStatus;

    /** 當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn) */
    volatile Node prev;

    /** 當(dāng)前節(jié)點(diǎn)的后置節(jié)點(diǎn) */
    volatile Node next;

    /** 該節(jié)點(diǎn)關(guān)聯(lián)的線程(未能獲取鎖,進(jìn)入等待的線程) */
    volatile Thread thread;

    /** 指向下一個(gè)在某個(gè)條件上等待的節(jié)點(diǎn),或者指向 SHARE 節(jié)點(diǎn),表明當(dāng)前處于共享模式*/
    Node nextWaiter;

    /**
     * 判斷是否處于共享模式
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    /** 
     * 返回當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn) 
     * 會(huì)做對(duì)前置節(jié)點(diǎn)空值判斷 
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}
等待狀態(tài):

等待狀態(tài)的修改是CAS原子操作

  • CANCELED: 1,因?yàn)榈却瑫r(shí) (timeout)或者中斷(interrupt),節(jié)點(diǎn)會(huì)被置為取消狀態(tài)。處于取消狀態(tài)的節(jié)點(diǎn)不會(huì)再去競(jìng)爭(zhēng)鎖,也就是說不會(huì)再被阻塞。節(jié)點(diǎn)會(huì)一直保持取消狀態(tài),而不會(huì)轉(zhuǎn)換為其他狀態(tài)。處于 CANCELED 的節(jié)點(diǎn)會(huì)被移出隊(duì)列,被 GC 回收。
  • SIGNAL: -1,表明當(dāng)前的后繼結(jié)點(diǎn)正在或者將要被阻塞(通過使用 LockSupport.pack 方法),因此當(dāng)前的節(jié)點(diǎn)被釋放(release)或者被取消時(shí)(cancel)時(shí),要喚醒它的后繼結(jié)點(diǎn)(通過 LockSupport.unpark 方法)。
  • CONDITION: -2,表明當(dāng)前節(jié)點(diǎn)在條件隊(duì)列中,因?yàn)榈却硞€(gè)條件而被阻塞。
  • PROPAGATE: -3,在共享模式下,可以認(rèn)為資源有多個(gè),因此當(dāng)前線程被喚醒之后,可能還有剩余的資源可以喚醒其他線程。該狀態(tài)用來表明后續(xù)節(jié)點(diǎn)會(huì)傳播喚醒的操作。需要注意的是只有頭節(jié)點(diǎn)才可以設(shè)置為該狀態(tài)(This is set (for head node only) in doReleaseShared to ensure propagation continues, even if other operations have since intervened.)。
  • 0:新創(chuàng)建的節(jié)點(diǎn)會(huì)處于這種狀態(tài)

鎖的獲取與釋放:

獲取獨(dú)占鎖
獲取獨(dú)占鎖
  • acquire方法
public final void acquire(int arg) {
    // 首先嘗試獲取鎖,如果獲取失敗,會(huì)先調(diào)用 addWaiter 方法創(chuàng)建節(jié)點(diǎn)并追加到隊(duì)列尾部
    // 然后調(diào)用 acquireQueued 阻塞或者循環(huán)嘗試獲取鎖
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
        // 在 acquireQueued 中,如果線程是因?yàn)橹袛喽顺龅淖枞麪顟B(tài)會(huì)返回 true
        // 這里的 selfInterrupt 主要是為了恢復(fù)線程的中斷狀態(tài)
        selfInterrupt();
    }
}
釋放獨(dú)占鎖
釋放獨(dú)占鎖

? 在獨(dú)占模式中,鎖的釋放由于沒有其他線程競(jìng)爭(zhēng),相對(duì)簡(jiǎn)單。鎖釋放失敗的原因是由于該線程本身不擁有鎖,而非多線程競(jìng)爭(zhēng)。鎖釋放成功后會(huì)檢查后置節(jié)點(diǎn)的狀態(tài),找到合適的節(jié)點(diǎn),調(diào)用unparkSuccessor方法喚醒該節(jié)點(diǎn)所關(guān)聯(lián)的線程。

  • release方法
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // waitStatus 為 0,證明是初始化的空隊(duì)列或者后繼結(jié)點(diǎn)已經(jīng)被喚醒了
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
獲取共享鎖
獲取共享鎖
  • acquireShared方法

    通過該方法可以申請(qǐng)鎖

public final void acquireShared(int arg) {
    // 如果返回結(jié)果小于0,證明沒有獲取到共享資源
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
  • doAcquireShared
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
釋放共享鎖

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容