無鎖AtomicXXX源碼分析

無鎖即無障礙的運(yùn)行, 所有線程都可以到達(dá)臨界區(qū), 接近于無等待.無鎖采用CAS(compare and swap)算法來處理線程沖突, 其原理如下
CAS原理
CAS包含3個(gè)參數(shù)CAS(V,E,N).V表示要更新的變量, E表示預(yù)期值, N表示新值.僅當(dāng)V值等于E值時(shí), 才會(huì)將V的值設(shè)為N, 如果V值和E值不同, 則說明已經(jīng)有其他線程做了更新, 則當(dāng)前線程什么都不做. 最后, CAS返回當(dāng)前V的真實(shí)值. CAS操作是抱著樂觀的態(tài)度進(jìn)行的, 它總是認(rèn)為自己可以成功完成操作.

當(dāng)多個(gè)線程同時(shí)使用CAS操作一個(gè)變量時(shí), 只有一個(gè)會(huì)勝出, 并成功更新, 其余均會(huì)失敗.失敗的線程不會(huì)被掛起,僅是被告知失敗, 并且允許再次嘗試, 當(dāng)然也允許失敗的線程放棄操作.基于這樣的原理, CAS操作即時(shí)沒有鎖,也可以發(fā)現(xiàn)其他線程對(duì)當(dāng)前線程的干擾, 并進(jìn)行恰當(dāng)?shù)奶幚?
CPU指令
另外, 雖然上述步驟繁多, 實(shí)際上CAS整一個(gè)操作過程是一個(gè)原子操作, 它是由一條CPU指令完成的,從指令層保證操作可靠, 不會(huì)被多線程干擾.

無鎖與volatile
當(dāng)給變量加了volatile關(guān)鍵字, 表示該變量對(duì)所有線程可見, 但不保證原子性.

AtomicInteger

// 取得當(dāng)前值
public final int get() 
// 設(shè)置當(dāng)前值
public final void set(int newValue)
// 設(shè)置新值,并返回舊值
public final int getAndSet(int newValue)
// 如果當(dāng)前值為expect,則設(shè)置為u
public final boolean compareAndSet(int expect, int u)
// 當(dāng)前值加1,返回舊值
public final int getAndIncrement()
// 當(dāng)前值減1,返回舊值
public final int getAndDecrement() 
// 當(dāng)前值增加delta,返回舊值
public final int getAndAdd(int delta)
// 當(dāng)前值加1,返回新值
public final int incrementAndGet() 
// 當(dāng)前值減1,返回新值
public final int decrementAndGet() 
// 當(dāng)前值增加delta,返回新值
public final int addAndGet(int delta)
// 封裝了一個(gè)int對(duì)其加減
    private volatile int value;
    .......
    public final boolean compareAndSet(int expect, int update) {
    // 通過unsafe 基于CPU的CAS指令來實(shí)現(xiàn), 可以認(rèn)為無阻塞.
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    .......
    public final int getAndIncrement() {
        for (;;) {
        // 當(dāng)前值
            int current = get();
        // 預(yù)期值
            int next = current + 1;
            if (compareAndSet(current, next)) {
        // 如果加成功了, 則返回當(dāng)前值
                return current;
        }
        // 如果加失敗了, 說明其他線程已經(jīng)修改了數(shù)據(jù), 與期望不相符,
        // 則繼續(xù)無限循環(huán), 直到成功. 這種樂觀鎖, 理論上只要等兩三個(gè)時(shí)鐘周期就可以設(shè)值成功
        // 相比于直接通過synchronized獨(dú)占鎖的方式操作int, 要大大節(jié)約等待時(shí)間.
        }
    }

demo
使用10個(gè)線程打印0-10000, 最終得到結(jié)果10w.

import java.util.concurrent.atomic.AtomicInteger;

public class AtomicIntegerDemo {
    static AtomicInteger i = new AtomicInteger();

    public static class AddThread implements Runnable {
        public void run() {
            for (int k = 0; k < 10000; k++) {
                i.incrementAndGet();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0; k < 10; k++) {
            ts[k] = new Thread(new AddThread());
        }
        for (int k = 0; k < 10; k++) {
            ts[k].start();
        }
        for (int k = 0; k < 10; k++) {
            ts[k].join();
        }
        System.out.println(i);
    }
}

Unsafe

Unsafe類是在sun.misc包下, 可以用于一些非安全的操作,比如:
根據(jù)偏移量設(shè)置值, 線程park(), 底層的CAS操作等等.

 // 獲取類實(shí)例中變量的偏移量
 valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
 // 基于偏移量對(duì)值進(jìn)行操作
 unsafe.compareAndSwapInt(this, valueOffset, expect, update);

主要接口

// 獲得給定對(duì)象偏移量上的int值
public native int getInt(Object o, long offset);
// 設(shè)置給定對(duì)象偏移量上的int值
public native void putInt(Object o, long offset, int x);
// 獲得字段在對(duì)象中的偏移量
public native long objectFieldOffset(Field f);
// 設(shè)置給定對(duì)象的int值,使用volatile語義
public native void putIntVolatile(Object o, long offset, int x);
// 獲得給定對(duì)象對(duì)象的int值,使用volatile語義
public native int getIntVolatile(Object o, long offset);
// 和putIntVolatile()一樣,但是它要求被操作字段就是volatile類型的
public native void putOrderedInt(Object o, long offset, int x);

AtomicReference
與AtomicInteger類似, 只是里面封裝了一個(gè)對(duì)象, 而不是int, 對(duì)引用進(jìn)行修改

主要接口

1 get()
2 set(V)
3 compareAndSet()
4 getAndSet(V)

demo
使用10個(gè)線程, 同時(shí)嘗試修改AtomicReference中的String, 最終只有一個(gè)線程可以成功.

import java.util.concurrent.atomic.AtomicReference;

public class AtomicReferenceTest {
    public final static AtomicReference<String> attxnicStr = new AtomicReference<String>("abc");

    public static void main(String[] args) {
        for (int i = 0; i < 10; i++) {
            new Thread() {
                public void run() {
                    try {
                        Thread.sleep(Math.abs((int) (Math.random() * 100)));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if (attxnicStr.compareAndSet("abc", "def")) {
                        System.out.println("Thread:" + Thread.currentThread().getId() + " change value to " + attxnicStr.get());
                    } else {
                        System.out.println("Thread:" + Thread.currentThread().getId() + " change failed!");
                    }
                }
            }.start();
        }
    }
}

AtomicStampedReference
也是封裝了一個(gè)引用, 主要解決ABA問題.

ABA問題

線程一準(zhǔn)備用CAS將變量的值由A替換為B, 在此之前線程二將變量的值由A替換為C, 線程三又將C替換為A, 然后線程一執(zhí)行CAS時(shí)發(fā)現(xiàn)變量的值仍然為A, 所以線程一CAS成功.

// 比較設(shè)置 參數(shù)依次為:期望值 寫入新值 期望時(shí)間戳 新時(shí)間戳
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
// 獲得當(dāng)前對(duì)象引用
public V getReference()
// 獲得當(dāng)前時(shí)間戳
public int getStamp()
// 設(shè)置當(dāng)前對(duì)象引用和時(shí)間戳
public void set(V newReference, int newStamp)

源碼分析

// 內(nèi)部封裝了一個(gè)Pair對(duì)象, 每次對(duì)對(duì)象操作的時(shí)候, stamp + 1
    private static class Pair<T> {
        final T reference;
        final int stamp;
        private Pair(T reference, int stamp) {
            this.reference = reference;
            this.stamp = stamp;
        }
        static <T> Pair<T> of(T reference, int stamp) {
            return new Pair<T>(reference, stamp);
        }
    }

    private volatile Pair<V> pair;

    // 進(jìn)行cas操作的時(shí)候, 會(huì)對(duì)比stamp的值
    public boolean compareAndSet(V   expectedReference,
                                 V   newReference,
                                 int expectedStamp,
                                 int newStamp) {
        Pair<V> current = pair;
        return
            expectedReference == current.reference &&
            expectedStamp == current.stamp &&
            ((newReference == current.reference &&
              newStamp == current.stamp) ||
             casPair(current, Pair.of(newReference, newStamp)));
    }

后臺(tái)使用多個(gè)線程對(duì)用戶充值, 要求只能充值一次

public class AtomicStampedReferenceDemo {
 static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0);
    public staticvoid main(String[] args) {
        //模擬多個(gè)線程同時(shí)更新后臺(tái)數(shù)據(jù)庫,為用戶充值
        for(int i = 0 ; i < 3 ; i++) {
            final int timestamp=money.getStamp();
            newThread() {  
                public void run() { 
                    while(true){
                       while(true){
                           Integerm=money.getReference();
                            if(m<20){
                         if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){
                          System.out.println("余額小于20元,充值成功,余額:"+money.getReference()+"元");
                                    break;
                                }
                            }else{
                               //System.out.println("余額大于20元,無需充值");
                                break ;
                             }
                       }
                    }
                } 
            }.start();
         }
        
       //用戶消費(fèi)線程,模擬消費(fèi)行為
        new Thread() { 
             publicvoid run() { 
                for(int i=0;i<100;i++){
                   while(true){
                        int timestamp=money.getStamp();
                        Integer m=money.getReference();
                        if(m>10){
                             System.out.println("大于10元");
                            if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){
                             System.out.println("成功消費(fèi)10元,余額:"+money.getReference());
                                 break;
                             }
                        }else{
                           System.out.println("沒有足夠的金額");
                             break;
                        }
                    }
                    try {Thread.sleep(100);} catch (InterruptedException e) {}
                 }
            } 
        }.start(); 
    }
 }

AtomicIntegerArray
支持無鎖的數(shù)組

// 獲得數(shù)組第i個(gè)下標(biāo)的元素
public final int get(int i)
// 獲得數(shù)組的長度
public final int length()
// 將數(shù)組第i個(gè)下標(biāo)設(shè)置為newValue,并返回舊的值
public final int getAndSet(int i, int newValue)
// 進(jìn)行CAS操作,如果第i個(gè)下標(biāo)的元素等于expect,則設(shè)置為update,設(shè)置成功返回true
public final boolean compareAndSet(int i, int expect, int update)
// 將第i個(gè)下標(biāo)的元素加1
public final int getAndIncrement(int i)
// 將第i個(gè)下標(biāo)的元素減1
public final int getAndDecrement(int i)
// 將第i個(gè)下標(biāo)的元素增加delta(delta可以是負(fù)數(shù))
public final int getAndAdd(int i, int delta)
// 數(shù)組本身基地址
    private static final int base = unsafe.arrayBaseOffset(int[].class);

    // 封裝了一個(gè)數(shù)組
    private final int[] array;

    static {
        // 數(shù)組中對(duì)象的寬度, int類型, 4個(gè)字節(jié), scale = 4;
        int scale = unsafe.arrayIndexScale(int[].class);
        if ((scale & (scale - 1)) != 0)
            throw new Error("data type scale not a power of two");
        // 前導(dǎo)0 : 一個(gè)數(shù)字轉(zhuǎn)為二進(jìn)制后, 他前面0的個(gè)數(shù)
        // 對(duì)于4來講, 他就是00000000 00000000 00000000 00000100, 他的前導(dǎo)0 就是29
        // 所以shift = 2
        shift = 31 - Integer.numberOfLeadingZeros(scale);
    }

    // 獲取第i個(gè)元素
    public final int get(int i) {
        return getRaw(checkedByteOffset(i));
    }

    // 第i個(gè)元素, 在數(shù)組中的偏移量是多少
    private long checkedByteOffset(int i) {
        if (i < 0 || i >= array.length)
            throw new IndexOutOfBoundsException("index " + i);

        return byteOffset(i);
    }

    // base : 數(shù)組基地址, i << shift, 其實(shí)就是i * 4, 因?yàn)檫@邊是int array.
    private static long byteOffset(int i) {
        // i * 4 + base
        return ((long) i << shift) + base;
    }

    // 根據(jù)偏移量從數(shù)組中獲取數(shù)據(jù)
    private int getRaw(long offset) {
        return unsafe.getIntVolatile(array, offset);
    }

Demo

import java.util.concurrent.atomic.AtomicIntegerArray;

public class AtomicArrayDemo {
    static AtomicIntegerArray arr = new AtomicIntegerArray(10);

    public static class AddThread implements Runnable {
        public void run() {
            for (int k = 0; k < 10000; k++) {
                arr.incrementAndGet(k % arr.length());
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] ts = new Thread[10];
        for (int k = 0; k < 10; k++) {
            ts[k] = new Thread(new AddThread());
        }
        for (int k = 0; k < 10; k++) {
            ts[k].start();
        }
        for (int k = 0; k < 10; k++) {
            ts[k].join();
        }
        System.out.println(arr);
    }
}

跟之前的AtomicInteger沒太多區(qū)別,只是需要對(duì)于傳入的i需要先判斷在數(shù)組length范圍內(nèi),再轉(zhuǎn)換成內(nèi)存地址。類中其他方法類同。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容