[懷舊并發(fā)03]認識非阻塞的同步機制CAS

在研究線程池的執(zhí)行原理時,看到一段不斷循環(huán)重試的代碼,不理解它的原理,看注釋這是CAS的實現(xiàn),所以學會之后記錄下來。

鎖有什么劣勢

在多線程并發(fā)下,可以通過加鎖來保證線程安全性,但多個線程同時請求鎖,很多情況下避免不了要借助操作系統(tǒng),線程掛起和恢復(fù)會存在很大的開銷,并存在很長時間的中斷。一些細粒度的操作,例如同步容器,操作往往只有很少代碼量,如果存在鎖并且線程激烈地競爭,調(diào)度的代價很大。

總結(jié)來說,線程持有鎖,會讓其他需要鎖的線程阻塞,產(chǎn)生多種風險和開銷。加鎖是一種悲觀方法,線程總是設(shè)想在自己持有資源的同時,肯定有其他線程想要資源,不牢牢鎖住資源還不能放心呢。

在硬件的支持下,出現(xiàn)了非阻塞的同步機制,其中一種常用實現(xiàn)就是CAS。

什么是CAS

現(xiàn)代的處理器都包含對并發(fā)的支持,其中最通用的方法就是比較并交換(compare and swap),簡稱CAS。

CAS 操作包含三個操作數(shù) —— 內(nèi)存位置(V)、預(yù)期原值(A)和新值(B)。如果內(nèi)存位置的值與預(yù)期原值相匹配,那么處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論V值是否等于A值,都將返回V的原值。CAS 有效地說明了:我認為位置 V 應(yīng)該包含值 A;如果包含該值,則將 B 放到這個位置;否則,不要更改該位置,只告訴我這個位置現(xiàn)在的值即可。

當多個線程嘗試使用CAS同時更新一個變量,最終只有一個線程會成功,其他線程都會失敗。但和使用鎖不同,失敗的線程不會被阻塞,而是被告之本次更新操作失敗了,可以再試一次。此時,線程可以根據(jù)實際情況,繼續(xù)重試或者跳過操作,大大減少因為阻塞而損失的性能。所以,CAS是一種樂觀的操作,它希望每次都能成功地執(zhí)行更新操作。

public class SimulationCAS {
    private int value;

    public synchronized int get() {
        return value;
    }

    public synchronized boolean compareAndSet(int expectedValue, int newValue) {
        if (expectedValue == compareAndSwap(expectedValue, newValue)) {
            return true;
        }
        return false;
    }

    public synchronized int compareAndSwap(int expectedValue, int newValue) {
        int oldValue = value;
        if (oldValue == expectedValue) {
            value = newValue;
        }
        return oldValue;
    }
}

上面的代碼模擬了CAS的操作,其中compareAndSwap是CAS語義的體現(xiàn),compareAndSet對value進行了更新操作,并返回成功與否。

幾行代碼就實現(xiàn)了CAS,是不是覺得很簡單呢?但你要知道,CAS僅僅告訴你操作結(jié)果,操作失敗后一系列重試回退放棄等操作都要自己實現(xiàn),開發(fā)起來遠比使用鎖復(fù)雜。

Atom原子類

JVM是支持CAS的,體現(xiàn)在我們常用的Atom原子類,拿AtomicInteger分析一下源碼。

    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

對AtomicInteger進行+1操作,循環(huán)里,會將當前值和+1后的目標值傳入compareAndSet,直到成功才跳出方法。compareAndSet是不是很熟悉呢,接著來看看它的代碼。

// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();

 public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

compareAndSet調(diào)用了unsafe.compareAndSwapInt,這是一個native方法,原理就是調(diào)用硬件支持的CAS方法。看懂這個應(yīng)該就能明白Atom類的原理,其他方法的實現(xiàn)是類似的。

線程池里的CAS

有了CAS的基礎(chǔ)后,可以來研究那段我未看懂的代碼。

提交一個執(zhí)行任務(wù),線程池會嘗試增加一個工作線程去處理任務(wù)。下面是ThreadPoolExecutor里addWorker的一段代碼:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        //其他省略

在內(nèi)循環(huán)里,會調(diào)用compareAndIncrementWorkerCount方法增加一個工作線程,原理和AtomicInteger的getAndIncrement方法是一樣的。如果增加成功,直接跳出循環(huán),否則在檢查線程池狀態(tài)后,再次在內(nèi)循環(huán)調(diào)用compareAndIncrementWorkerCount,直到添加成功。

現(xiàn)在再看代碼,瞬間就明白了。


如有紕漏,請聯(lián)系我。歡迎留言和轉(zhuǎn)發(fā)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容