原子類

一、原子類縱覽

類型 具體類
Atomic* 基本類型原子類 AtomicInteger、AtomicLong、AtomicBoolean
Atomic*Array 數(shù)組類型原子類 AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray
Atomic*Reference 引用類型原子類 AtomicReference、AtomicStampedReference、AtomicMarkableReference
Atomic*FieldUpdater 升級(jí)類型原子類 AtomicIntegerfieldupdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater
Adder 累加器 LongAdder、DoubleAdder
Accumulator 積累器 LongAccumulator、DoubleAccumulator
  • AtomicInteger 類常用方法
    1. public final int get() //獲取當(dāng)前的值
    2. public final int getAndSet(int newValue) //獲取當(dāng)前的值,并設(shè)置新的值
    3. public final int getAndIncrement() //獲取當(dāng)前的值,并自增
    4. public final int getAndDecrement() //獲取當(dāng)前的值,并自減
    5. public final int getAndAdd(int delta) //獲取當(dāng)前的值,并加上預(yù)期的值
    6. boolean compareAndSet(int expect, int update) //如果輸入的數(shù)值等于預(yù)期值,則以原子方式將該值更新為輸入值(update)
  • Atomic*Array 數(shù)組類型原子類
    1. AtomicIntegerArray:整形數(shù)組原子類
    2. AtomicLongArray:長(zhǎng)整形數(shù)組原子類
    3. AtomicReferenceArray :引用類型數(shù)組原子類
  • Atomic*Reference 引用類型原子類
    1. AtomicStampedReference:它是對(duì) AtomicReference 的升級(jí),在此基礎(chǔ)上還加了時(shí)間戳,用于解決 CAS 的 ABA 問題。
    2. AtomicMarkableReference:和 AtomicReference 類似,多了一個(gè)綁定的布爾值,可以用于表示該對(duì)象已刪除等場(chǎng)景。
  • Atomic*FieldUpdater 原子更新器
    如果我們之前已經(jīng)有了一個(gè)變量,比如是整型的 int,實(shí)際它并不具備原子性??墒悄疽殉芍?,這個(gè)變量已經(jīng)被定義好了,此時(shí)我們有沒有辦法可以讓它擁有原子性呢?辦法是有的,就是利用 Atomic*FieldUpdater,如果它是整型的,就使用 AtomicIntegerFieldUpdater 把已經(jīng)聲明的變量進(jìn)行升級(jí),這樣一來這個(gè)變量就擁有了 CAS 操作的能力。
    public class AtomicIntegerFieldUpdaterDemo implements Runnable{
       static Score math;
       static Score computer;
       public static AtomicIntegerFieldUpdater<Score> scoreUpdater 
          = AtomicIntegerFieldUpdater.newUpdater(Score.class, "score");
    
       @Override
       public void run() {
           for (int i = 0; i < 1000; i++) {
               computer.score++;
               scoreUpdater.getAndIncrement(math);
           }
       }
    
       public static class Score {
           volatile int score;
       }
    
       public static void main(String[] args) throws InterruptedException {
           math =new Score();
           computer =new Score();
           AtomicIntegerFieldUpdaterDemo2 r 
               = new AtomicIntegerFieldUpdaterDemo2();
           Thread t1 = new Thread(r);
           Thread t2 = new Thread(r);
           t1.start();
           t2.start();
           t1.join();
           t2.join();
           System.out.println("普通變量的結(jié)果:"+ computer.score);
           System.out.println("升級(jí)后的結(jié)果:"+ math.score);
       }
    }
    
    1. AtomicIntegerFieldUpdater:原子更新整形的更新器
    2. AtomicLongFieldUpdater:原子更新長(zhǎng)整形的更新器
    3. AtomicReferenceFieldUpdater:原子更新引用的更新器
  • Adder 加法器
    它里面有兩種加法器,分別叫作 LongAdder 和 DoubleAdder。
  • Accumulator 積累器
    最后一種叫 Accumulator 積累器,分別是 LongAccumulator 和 DoubleAccumulator。

二、以 AtomicInteger 為例,分析在 Java 中如何利用 CAS 實(shí)現(xiàn)原子操作?

  • getAndAdd方法
    //JDK 1.8實(shí)現(xiàn)
    public final int getAndAdd(int delta) {
       return unsafe.getAndAddInt(this, valueOffset, delta);
    }
    
    可以看出,里面使用了 Unsafe 這個(gè)類,并且調(diào)用了 unsafe.getAndAddInt 方法。所以這里需要簡(jiǎn)要介紹一下 Unsafe 類。
  • Unsafe
    Unsafe 其實(shí)是 CAS 的核心類。由于 Java 無法直接訪問底層操作系統(tǒng),而是需要通過 native 方法來實(shí)現(xiàn)。不過盡管如此,JVM 還是留了一個(gè)后門,在 JDK 中有一個(gè) Unsafe 類,它提供了硬件級(jí)別的原子操作,我們可以利用它直接操作內(nèi)存數(shù)據(jù)。
    public class AtomicInteger extends Number 
       implements java.io.Serializable {
      private static final Unsafe unsafe = Unsafe.getUnsafe();
      private static final long valueOffset;
    
      static {
          try {
              valueOffset = unsafe.objectFieldOffset
                  (AtomicInteger.class.getDeclaredField("value"));
          } catch (Exception ex) { throw new Error(ex); }
      }
    
      private volatile int value;
      public final int get() {return value;}
      ...
    }
    
    1. 首先還獲取了 Unsafe 實(shí)例,并且定義了 valueOffset
    2. static 代碼塊,這個(gè)代碼塊會(huì)在類加載的時(shí)候執(zhí)行,執(zhí)行時(shí)我們會(huì)調(diào)用 Unsafe 的 objectFieldOffset 方法,從而得到當(dāng)前這個(gè)原子類的 value 的偏移量,并且賦給 valueOffset 變量,這樣一來我們就獲取到了 value 的偏移量,它的含義是在內(nèi)存中的偏移地址,因?yàn)?Unsafe 就是根據(jù)內(nèi)存偏移地址獲取數(shù)據(jù)的原值的,這樣我們就能通過 Unsafe 來實(shí)現(xiàn) CAS 了。
    3. value 是用 volatile 修飾的,它就是我們?cè)宇惔鎯?chǔ)的值的變量,由于它被 volatile 修飾,我們就可以保證在多線程之間看到的 value 是同一份,保證了可見性。
  • Unsafe 中的 getAndAddInt 方法
    public final int getAndAddInt(Object var1, long var2, int var4) {
       int var5;
       do {
           var5 = this.getIntVolatile(var1, var2);
       } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
       return var5;
    }
    
    1. 首先我們看一下結(jié)構(gòu),它是一個(gè) do-while 循環(huán),所以這是一個(gè)死循環(huán),直到滿足循環(huán)的退出條件時(shí)才可以退出。
    2. do 后面的這一行代碼 var5 = this.getIntVolatile(var1, var2) 這是個(gè) native 方法,作用就是獲取在 var1 中的 var2 偏移處的值。
    3. 傳入的兩個(gè)參數(shù),第一個(gè)就是當(dāng)前原子類,第二個(gè)是我們最開始獲取到的 offset,這樣一來我們就可以獲取到當(dāng)前內(nèi)存中偏移量的值,并且保存到 var5 里面。此時(shí) var5 實(shí)際上代表當(dāng)前時(shí)刻下的原子類的數(shù)值。
    4. while 的退出條件,也就是 compareAndSwapInt 這個(gè)方法,它一共傳入了 4 個(gè)參數(shù),這 4 個(gè)參數(shù)是 var1、var2、var5、var5 + var4,為了方便理解,我們給它們?nèi)×诵铝俗兞棵謩e object、offset、expectedValue、newValue,具體含義如下:
      • 第一個(gè)參數(shù) object 就是將要操作的對(duì)象,傳入的是 this,也就是 atomicInteger 這個(gè)對(duì)象本身
      • 第二個(gè)參數(shù)是 offset,也就是偏移量,借助它就可以獲取到 value 的數(shù)值
      • 第三個(gè)參數(shù) expectedValue,代表“期望值”,傳入的是剛才獲取到的 var5
      • 最后一個(gè)參數(shù) newValue 是希望修改的數(shù)值 ,等于之前取到的數(shù)值 var5 再加上 var4,而 var4 就是我們之前所傳入的 delta,delta 就是我們希望原子類所改變的數(shù)值,比如可以傳入 +1,也可以傳入 -1
      • 所以 compareAndSwapInt 方法的作用就是,判斷如果現(xiàn)在原子類里 value 的值和之前獲取到的 var5 相等的話,那么就把計(jì)算出來的 var5 + var4 給更新上去,所以說這行代碼就實(shí)現(xiàn)了 CAS 的過程

三、AtomicInteger 和 AtomicLong 存在的問題


每一個(gè)線程是運(yùn)行在自己的 core 中的,并且它們都有一個(gè)本地內(nèi)存是自己獨(dú)用的。在本地內(nèi)存下方,有兩個(gè) CPU 核心共用的共享內(nèi)存。

對(duì)于 AtomicLong 內(nèi)部的 value 屬性而言,也就是保存當(dāng)前 AtomicLong 數(shù)值的屬性,它是被 volatile 修飾的,所以它需要保證自身可見性。

這樣一來,每一次它的數(shù)值有變化的時(shí)候,它都需要進(jìn)行 flush 和 refresh。比如說,如果開始時(shí),ctr 的數(shù)值為 0 的話,那么如圖所示,一旦 core 1 把它改成 1 的話,它首先會(huì)在左側(cè)把這個(gè) 1 的最新結(jié)果給 flush 到下方的共享內(nèi)存。然后,再到右側(cè)去往上 refresh 到核心 2 的本地內(nèi)存。這樣一來,對(duì)于核心 2 而言,它才能感知到這次變化。

由于競(jìng)爭(zhēng)很激烈,這樣的 flush 和 refresh 操作耗費(fèi)了很多資源,而且 CAS 也會(huì)經(jīng)常失敗。

  • LongAdder 帶來的改進(jìn)和原理
    1. LongAdder 引入了分段累加的概念,內(nèi)部一共有兩個(gè)參數(shù)參與計(jì)數(shù):第一個(gè)叫作 base,它是一個(gè)變量,第二個(gè)是 Cell[] ,是一個(gè)數(shù)組。
    2. 其中的 base 是用在競(jìng)爭(zhēng)不激烈的情況下的,可以直接把累加結(jié)果改到 base 變量上。
    3. 當(dāng)競(jìng)爭(zhēng)激烈的時(shí)候,就要用到我們的 Cell[] 數(shù)組了。一旦競(jìng)爭(zhēng)激烈,各個(gè)線程會(huì)分散累加到自己所對(duì)應(yīng)的那個(gè) Cell[] 數(shù)組的某一個(gè)對(duì)象中,而不會(huì)大家共用同一個(gè)。
    4. LongAdder 會(huì)把不同線程對(duì)應(yīng)到不同的 Cell 上進(jìn)行修改,降低了沖突的概率,這是一種分段的理念,提高了并發(fā)性,這就和 Java 7 的 ConcurrentHashMap 的 16 個(gè) Segment 的思想類似。
    5. LongAdder 會(huì)通過計(jì)算出每個(gè)線程的 hash 值來給線程分配到不同的 Cell 上去,每個(gè) Cell 相當(dāng)于是一個(gè)獨(dú)立的計(jì)數(shù)器,這樣一來就不會(huì)和其他的計(jì)數(shù)器干擾,Cell 之間并不存在競(jìng)爭(zhēng)關(guān)系,所以在自加的過程中,就大大減少了剛才的 flush 和 refresh,以及降低了沖突的概率,這就是為什么 LongAdder 的吞吐量比 AtomicLong 大的原因,本質(zhì)是空間換時(shí)間,因?yàn)樗卸鄠€(gè)計(jì)數(shù)器同時(shí)在工作,所以占用的內(nèi)存也要相對(duì)更大一些。
    6. 那么 LongAdder 最終是如何實(shí)現(xiàn)多線程計(jì)數(shù)的呢?答案就在最后一步的求和 sum 方法,執(zhí)行 LongAdder.sum() 的時(shí)候,會(huì)把各個(gè)線程里的 Cell 累計(jì)求和,并加上 base,形成最終的總和。代碼如下:
      public long sum() {
         Cell[] as = cells; Cell a;
         long sum = base;
         if (as != null) {
             for (int i = 0; i < as.length; ++i) {
                 if ((a = as[i]) != null)
                     sum += a.value;
             }
         }
         return sum;
      }
      
    7. 在這個(gè) sum 方法中可以看到,思路非常清晰。先取 base 的值,然后遍歷所有 Cell,把每個(gè) Cell 的值都加上去,形成最終的總和。由于在統(tǒng)計(jì)的時(shí)候并沒有進(jìn)行加鎖操作,所以這里得出的 sum 不一定是完全準(zhǔn)確的,因?yàn)橛锌赡茉谟?jì)算 sum 的過程中 Cell 的值被修改了。
  • AtomicLong 可否被 LongAdder 替代
    不能,得區(qū)分場(chǎng)景
    LongAdder 只提供了 add、increment 等簡(jiǎn)單的方法,適合的是統(tǒng)計(jì)求和計(jì)數(shù)的場(chǎng)景,場(chǎng)景比較單一,而 AtomicLong 還具有 compareAndSet 等高級(jí)方法,可以應(yīng)對(duì)除了加減之外的更復(fù)雜的需要 CAS 的場(chǎng)景。

四、AtomicInteger 和 synchronized 的異同點(diǎn)

  • 原理不同
    1. synchronized 背后的 monitor 鎖,也就是 synchronized 原理,同步方法和同步代碼塊的背后原理會(huì)有少許差異,但總體思想是一致的:在執(zhí)行同步代碼之前,需要首先獲取到 monitor 鎖,執(zhí)行完畢后,再釋放鎖。
    2. 原子類保證線程安全的原理是利用了 CAS 操作。
  • 使用范圍不同
    1. synchronized 既可以修飾一個(gè)方法,又可以修飾一段代碼,相當(dāng)于可以根據(jù)我們的需要,非常靈活地去控制它的應(yīng)用范圍
    2. 對(duì)于原子類而言,它的使用范圍是比較局限的。因?yàn)橐粋€(gè)原子類僅僅是一個(gè)對(duì)象,不夠靈活,僅有少量的場(chǎng)景,例如計(jì)數(shù)器等場(chǎng)景,我們可以使用原子類
  • 粒度的區(qū)別
    原子變量的粒度是比較小的,它可以把競(jìng)爭(zhēng)范圍縮小到變量級(jí)別。通常情況下,synchronized 鎖的粒度都要大于原子變量的粒度。如果我們只把一行代碼用 synchronized 給保護(hù)起來的話,有一點(diǎn)殺雞焉用牛刀的感覺。
  • 性能區(qū)別
    1. synchronized 是一種典型的悲觀鎖,悲觀鎖的操作相對(duì)來講是比較重量級(jí)的。因?yàn)?synchronized 在競(jìng)爭(zhēng)激烈的情況下,會(huì)讓拿不到鎖的線程阻塞,但是悲觀鎖的開銷是固定的,也是一勞永逸的。隨著時(shí)間的增加,這種開銷并不會(huì)線性增長(zhǎng)
    2. 原子利用的是樂觀鎖,永遠(yuǎn)不會(huì)讓線程阻塞,雖然在短期內(nèi)的開銷不大,但是隨著時(shí)間的增加,它的開銷也是逐步上漲的

五、Java 8 中 Adder 和 Accumulator 有什么區(qū)別

  • Adder 的介紹
    對(duì)于 Adder 而言,比如最典型的 LongAdder,在高并發(fā)下 LongAdder 比 AtomicLong 效率更高,因?yàn)閷?duì)于 AtomicLong 而言,它只適合用于低并發(fā)場(chǎng)景,否則在高并發(fā)的場(chǎng)景下,由于 CAS 的沖突概率大,會(huì)導(dǎo)致經(jīng)常自旋,影響整體效率。
    而 LongAdder 引入了分段鎖的概念,當(dāng)競(jìng)爭(zhēng)不激烈的時(shí)候,所有線程都是通過 CAS 對(duì)同一個(gè) Base 變量進(jìn)行修改,但是當(dāng)競(jìng)爭(zhēng)激烈的時(shí)候,LongAdder 會(huì)把不同線程對(duì)應(yīng)到不同的 Cell 上進(jìn)行修改,降低了沖突的概率,從而提高了并發(fā)性。
  • Accumulator 的介紹
    Accumulator 和 Adder 非常相似,實(shí)際上 Accumulator 就是一個(gè)更通用版本的 Adder,比如 LongAccumulator 是 LongAdder 的功能增強(qiáng)版,因?yàn)?LongAdder 的 API 只有對(duì)數(shù)值的加減,而 LongAccumulator 提供了自定義的函數(shù)操作。
    public class LongAccumulatorDemo {
        public static void main(String[] args) 
                                            throws InterruptedException {
            // 首先新建了一個(gè) LongAccumulator,同時(shí)給它傳入了兩個(gè)參數(shù)
            LongAccumulator accumulator 
                               = new LongAccumulator((x, y) -> x + y, 0);
            // 然后又新建了一個(gè) 8 線程的線程池
            ExecutorService executor 
                               = Executors.newFixedThreadPool(8);
            // 利用整形流也就是 IntStream 往線程池中提交了從 1 ~ 9 這 9 個(gè)任務(wù)
            IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));
            Thread.sleep(2000);
            System.out.println(accumulator.getThenReset());
        }
    }
    
    1. 這段代碼的運(yùn)行結(jié)果是 45,代表 0+1+2+3+...+8+9=45 的結(jié)果
    2. LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y, 0); 我們傳入了兩個(gè)參數(shù):
      • 第一個(gè)參數(shù)是二元表達(dá)式;
      • 第二個(gè)參數(shù)是 x 的初始值,傳入的是 0。在二元表達(dá)式中,x 是上一次計(jì)算的結(jié)果(除了第一次的時(shí)候需要傳入),y 是本次新傳入的值
    3. 當(dāng)執(zhí)行 accumulator.accumulate(1) 的時(shí)候,首先要知道這時(shí)候 x 和 y 是什么,第一次執(zhí)行時(shí), x 是 LongAccumulator 構(gòu)造函數(shù)中的第二個(gè)參數(shù),也就是 0,而第一次執(zhí)行時(shí)的 y 值就是本次 accumulator.accumulate(1) 方法所傳入的 1;然后根據(jù)表達(dá)式 x+y,計(jì)算出 0+1=1,這個(gè)結(jié)果會(huì)賦值給下一次計(jì)算的 x,而下一次計(jì)算的 y 值就是 accumulator.accumulate(2) 傳入的 2,所以下一次的計(jì)算結(jié)果是 1+2=3
    4. IntStream.range(1, 10).forEach(i -> executor.submit(() -> accumulator.accumulate(i))); 這一行語(yǔ)句中實(shí)際上利用了整型流,分別給線程池提交了從 1 ~ 9 這 9 個(gè)任務(wù),相當(dāng)于執(zhí)行了:
      accumulator.accumulate(1);
      accumulator.accumulate(2);
      accumulator.accumulate(3);
      ...
      accumulator.accumulate(8);
      accumulator.accumulate(9);
    5. 那么根據(jù)上面的這個(gè)推演,就可以得出它的內(nèi)部運(yùn)行,這也就意味著,LongAccumulator 執(zhí)行了:
      0+1=1;
      1+2=3;
      3+3=6;
      6+4=10;
      10+5=15;
      15+6=21;
      21+7=28;
      28+8=36;
      36+9=45;
    6. 這里需要指出的是,這里的加的順序是不固定的,并不是說會(huì)按照順序從 1 開始逐步往上累加,它也有可能會(huì)變,比如說先加 5、再加 3、再加 6。但總之,由于加法有交換律,所以最終加出來的結(jié)果會(huì)保證是 45。這就是這個(gè)類的一個(gè)基本的作用和用法。
    7. 拓展功能
      我們繼續(xù)看一下它的功能強(qiáng)大之處。舉幾個(gè)例子,剛才我們給出的表達(dá)式是 x + y,其實(shí)同樣也可以傳入 x * y,或者寫一個(gè) Math.min(x, y),相當(dāng)于求 x 和 y 的最小值。同理,也可以去求 Math.max(x, y),相當(dāng)于求一個(gè)最大值。根據(jù)業(yè)務(wù)的需求來選擇就可以了。代碼如下:
      LongAccumulator counter = new LongAccumulator((x, y) -> x + y, 0);
      LongAccumulator result = new LongAccumulator((x, y) -> x * y, 0);
      LongAccumulator min = new LongAccumulator((x, y) -> Math.min(x, y), 0);
      LongAccumulator max = new LongAccumulator((x, y) -> Math.max(x, y), 0);
    8. 在這里為什么不用 for 循環(huán)呢?
      確實(shí),用 for 循環(huán)也能滿足需求,但是用 for 循環(huán)的話,它執(zhí)行的時(shí)候是串行,它一定是按照 0+1+2+3+...+8+9 這樣的順序相加的,但是 LongAccumulator 的一大優(yōu)勢(shì)就是可以利用線程池來為它工作。一旦使用了線程池,那么多個(gè)線程之間是可以并行計(jì)算的,效率要比之前的串行高得多。這也是為什么剛才說它加的順序是不固定的,因?yàn)槲覀儾⒉荒鼙WC各個(gè)線程之間的執(zhí)行順序,所能保證的就是最終的結(jié)果是確定的。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容