原子累加器

JDK1.8時(shí),java.util.concurrent.atomic包中提供了一個(gè)新的原子類:LongAdder。提供了原子累計(jì)值的方法。
根據(jù)Oracle官方文檔的介紹,LongAdder在高并發(fā)的場(chǎng)景下會(huì)比它的前輩————AtomicLong 具有更好的性能,代價(jià)是消耗更多的內(nèi)存空間:

  • 在并發(fā)量較低的環(huán)境下,線程沖突的概率比較小,自旋的次數(shù)不會(huì)很多。但是,高并發(fā)環(huán)境下,N個(gè)線程同時(shí)進(jìn)行自旋操作,會(huì)出現(xiàn)大量失敗并不斷自旋的情況,此時(shí)AtomicLong的自旋會(huì)成為瓶頸。
  • 這就是LongAdder引入的初衷——解決高并發(fā)環(huán)境下AtomicLong的自旋瓶頸問題。

在大數(shù)據(jù)處理過程,為了方便監(jiān)控,需要統(tǒng)計(jì)數(shù)據(jù),少不了原子計(jì)數(shù)器。為了盡量?jī)?yōu)化性能,需要采用高效的原子計(jì)數(shù)器。
在jdk8中,引入了LongAddr,非常適合多線程原子計(jì)數(shù)器。與AtomicLong做了一個(gè)測(cè)試,LongAdder在多線程環(huán)境中,原子自增長(zhǎng)性能要好很多。它常用于狀態(tài)采集、統(tǒng)計(jì)等場(chǎng)景。

  • AtomicLong


    image.png
  • LongAdder


    image.png
package com.conrrentcy.atomic;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;

public class LongAdderVSAtomicLongTest {

    public static void main(String[] args){
        testAtomicLongVSLongAdder(1, 10000000);
        testAtomicLongVSLongAdder(10, 10000000);
        testAtomicLongVSLongAdder(20, 10000000);
        testAtomicLongVSLongAdder(40, 10000000);
        testAtomicLongVSLongAdder(80, 10000000);
    }

    static void testAtomicLongVSLongAdder(final int threadCount, final int times){
        try {
            System.out.println("threadCount:" + threadCount + ", times:" + times);
            long start = System.currentTimeMillis();
            testLongAdder(threadCount, times);
            System.out.println("LongAdder elapse:" + (System.currentTimeMillis() - start) + "ms");

            long start2 = System.currentTimeMillis();
            testAtomicLong(threadCount, times);
            System.out.println("AtomicLong elapse:" + (System.currentTimeMillis() - start2) + "ms");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static void testAtomicLong(final int threadCount, final int times) throws InterruptedException {
        AtomicLong atomicLong = new AtomicLong();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    atomicLong.incrementAndGet();
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }

    static void testLongAdder(final int threadCount, final int times) throws InterruptedException {
        LongAdder longAdder = new LongAdder();
        List<Thread> list = new ArrayList<>();
        for (int i=0;i<threadCount;i++){
            list.add(new Thread(() -> {
                for (int j = 0; j<times; j++){
                    longAdder.add(1);
                }
            }));
        }

        for (Thread thread : list){
            thread.start();
        }

        for (Thread thread : list){
            thread.join();
        }
    }
}

LongAdder原理

LongAdder的原理是,在最初無(wú)競(jìng)爭(zhēng)時(shí),只更新base的值,當(dāng)有多線程競(jìng)爭(zhēng)時(shí)通過分段的思想,讓不同的線程更新不同的段,最后把這些段相加就得到了完整的LongAdder存儲(chǔ)的值。


image.png

AtomicLong中有個(gè)內(nèi)部變量value保存著實(shí)際的long值,所有的操作都是針對(duì)該變量進(jìn)行。也就是說,高并發(fā)環(huán)境下,value變量其實(shí)是一個(gè)熱點(diǎn),也就是N個(gè)線程競(jìng)爭(zhēng)一個(gè)熱點(diǎn)。
LongAdder的基本思路就是分散熱點(diǎn),將value值分散到一個(gè)數(shù)組中,不同線程會(huì)命中到數(shù)組的不同槽中,各個(gè)線程只對(duì)自己槽中的那個(gè)值進(jìn)行CAS操作,
這樣熱點(diǎn)就被分散了,沖突的概率就小很多。如果要獲取真正的long值,只要將各個(gè)槽中的變量值累加返回。
AtomicLong是多個(gè)線程針對(duì)單個(gè)熱點(diǎn)值value進(jìn)行原子操作。而LongAdder是每個(gè)線程擁有自己的槽,各個(gè)線程一般只對(duì)自己槽中的那個(gè)值進(jìn)行CAS操作。

LongAdder繼承自Striped64抽象類,Striped64中定義了Cell內(nèi)部類和各重要屬性。
唯一會(huì)制約AtomicLong高效的原因是高并發(fā),高并發(fā)意味著CAS的失敗幾率更高, 重試次數(shù)更多,越多線程重試,CAS失敗幾率又越高,變成惡性循環(huán),AtomicLong效率降低。
那怎么解決? LongAdder給了一個(gè)非常容易想到的解決方案:減少并發(fā),將單一value的更新壓力分擔(dān)到多個(gè)value中去,降低單個(gè)value的 “熱度”,分段更新。
這樣,線程數(shù)再多也會(huì)分擔(dān)到多個(gè)value上去更新,只需要增加value就可以降低 value的 “熱度” ,AtomicLong中的 惡性循環(huán)就解決了。
cells 就是這個(gè) “段” cell中的value 就是存放更新值的, 這樣,當(dāng)需要總數(shù)時(shí),把cells 中的value都累加一下就可以了。

Code 解析


    /**
     * Padded variant of AtomicLong supporting only raw accesses plus CAS.
     *
     * JVM intrinsics note: It would be possible to use a release-only
     * form of CAS here, if it were provided.
     */
    @jdk.internal.vm.annotation.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return VALUE.compareAndSet(this, cmp, val);
        }
        final void reset() {
            VALUE.setVolatile(this, 0L);
        }
        final void reset(long identity) {
            VALUE.setVolatile(this, identity);
        }
        final long getAndSet(long val) {
            return (long)VALUE.getAndSet(this, val);
        }

        // VarHandle mechanics
        private static final VarHandle VALUE;
        static {
            try {
                MethodHandles.Lookup l = MethodHandles.lookup();
                VALUE = l.findVarHandle(Cell.class, "value", long.class);
            } catch (ReflectiveOperationException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
    }

    /** Number of CPUS, to place bound on table size */
    static final int NCPU = Runtime.getRuntime().availableProcessors();

    /**
     * Table of cells. When non-null, size is a power of 2.
     */
    transient volatile Cell[] cells;

    /**
     * Base value, used mainly when there is no contention, but also as
     * a fallback during table initialization races. Updated via CAS.
     */
    transient volatile long base;

    /**
     * Spinlock (locked via CAS) used when resizing and/or creating Cells.
     */
    transient volatile int cellsBusy;

@sun.misc.Contended

緩存模型

image.png

CPU和主內(nèi)存之間有好幾層緩存,因?yàn)榕ccpu的速度相比,訪問主內(nèi)存的速度是非常慢的。如果頻繁對(duì)同一個(gè)數(shù)據(jù)做運(yùn)算,每次都從內(nèi)存中加載,運(yùn)算完之后再寫回到主內(nèi)存中,將會(huì)嚴(yán)重拖累cpu的計(jì)算資源。因此,為了充分發(fā)揮CPU的計(jì)算性能和吞吐量,平衡CPU和主內(nèi)存之間的速度差距,現(xiàn)代CPU引入了一級(jí)緩存、二級(jí)緩存和三級(jí)緩存,結(jié)構(gòu)如下圖所示:


image.png

越靠近CPU的緩存存儲(chǔ)速度越快,但是容量也越小。所以一級(jí)緩存(L1 Cache)最小最快,并且緊靠著在使用它的CPU內(nèi)核。L2大一些,也慢一些,并且仍然只能被一個(gè)單獨(dú)的 CPU 核使用。L3在現(xiàn)代多核機(jī)器中更普遍,仍然更大,更慢,并且被單個(gè)插槽上的所有 CPU 核共享。最后,你擁有一塊主存,由全部插槽上的所有 CPU 核共享。
當(dāng)CPU運(yùn)算過程中需要加載數(shù)據(jù)時(shí),首先去L1去尋找需要的數(shù)據(jù),如果沒有則去L2尋找,接著從L3中尋找,如果都沒有,則從內(nèi)存中讀取數(shù)據(jù)。所以,如果某些數(shù)據(jù)需要經(jīng)常被訪問,那么這些數(shù)據(jù)存放在L1中的效率會(huì)最高。
而一般緩存未命中消耗的時(shí)鐘周期及時(shí)間數(shù)據(jù)如下:


image.png

緩存行Cache Line

緩存是由緩存行組成的。一個(gè)緩存行存儲(chǔ)字節(jié)的數(shù)量為2的倍數(shù),在不同的機(jī)器上,緩存行大小為32字節(jié)到256字節(jié)不等,目前通常為64字節(jié)。
緩存行是按塊加載到緩存中,一次性從內(nèi)存中加載或者寫入64字節(jié)的內(nèi)容。假如一個(gè)Java對(duì)象有8個(gè)long類型的成員變量,那么一個(gè)緩存行最多可以一次性加載這8個(gè)long類型的變量到內(nèi)存中。那么cpu在運(yùn)算中,如果需要這8個(gè)變量,就可以直接從緩存中讀取數(shù)據(jù),而不需要從主內(nèi)存中加載。

緩存一致性協(xié)議

由于一級(jí)緩存和二級(jí)緩存是被CPU核單獨(dú)擁有的,當(dāng)CPU核修改緩存中的內(nèi)容時(shí),緩存需要寫回主內(nèi)存,然后通知其他CPU核中對(duì)該緩存塊進(jìn)行失效處理。例如Core1和Core2并發(fā)讀寫同一個(gè)對(duì)象,該對(duì)象所屬的內(nèi)存地址塊,會(huì)被同時(shí)緩存到一級(jí)緩存中。當(dāng)Core1修改這個(gè)對(duì)象的變量時(shí),改動(dòng)會(huì)寫回主內(nèi)存,并且會(huì)讓Core2緩存的該對(duì)象的緩存行失效。

偽共享(False Sharing)

偽共享,翻譯為錯(cuò)誤的共享。下面的圖說明了偽共享的問題:


image.png
  • 假設(shè)Core1更新了X值,那么緩存中的緩存行和內(nèi)存中的值都會(huì)被改變,并且Core2的緩存行也都會(huì)失效,因?yàn)樗彺娴腦不是最新值了。那么Core2僅僅只是想讀X值,卻需要重新從主內(nèi)存去加載,從而被緩存未命中拖慢了讀取速度。
  • 假設(shè)在核心1上運(yùn)行的線程想更新變量X,同時(shí)核心2上的線程想要更新變量Y, 而這兩個(gè)變量在同一個(gè)緩存行中。每個(gè)線程都要去競(jìng)爭(zhēng)緩存行的所有權(quán)來更新變量。如果Core1獲得了所有權(quán),改變了X值,那么Core2中對(duì)應(yīng)的緩存行失效。而Core2去讀取Y時(shí),發(fā)現(xiàn)緩存未命中,需要重新讀取緩存行。然后當(dāng)Core2去更新Y值時(shí),也會(huì)使Core1的緩存行失效,從而使Core1引發(fā)一次緩存未命中。這會(huì)來來回回的經(jīng)過L3緩存,大大影響了性能。如果互相競(jìng)爭(zhēng)的核心位于不同的插槽,就要額外橫跨插槽連接,問題可能更加嚴(yán)重。

偽共享解決辦法

緩存行填充

disrupter框架通過增加字段,補(bǔ)全緩存行來確保ring buffer的序列號(hào)不會(huì)和其他東西同時(shí)存在于一個(gè)緩存行中,從而解決偽共享的問題:
private long p1, p2, p3, p4, p5, p6, p7;
private final long indexMask = 0;
private long p8, p9, p10, p11, p12, p13, p14;

@sun.misc.Contended注解

在Java 8中,提供了@sun.misc.Contended注解來避免偽共享,原理是在使用此注解的對(duì)象或字段的前后各增加128字節(jié)大小的padding,使用2倍于大多數(shù)硬件緩存行的大小來避免相鄰扇區(qū)預(yù)取導(dǎo)致的偽共享沖突。

   /** The current seed for a ThreadLocalRandom */
   @sun.misc.Contended("tlr")
   long threadLocalRandomSeed;
   /** Probe hash value; nonzero if threadLocalRandomSeed initialized */
   @sun.misc.Contended("tlr")
   int threadLocalRandomProbe;
   /** Secondary seed isolated from public ThreadLocalRandom sequence */
   @sun.misc.Contended("tlr")
   int threadLocalRandomSecondarySeed;

例如,Thread中有三個(gè)變量threadLocalRandomSeed,threadLocalRandomProbe,threadLocalRandomSecondarySeed就是通過注解的方法來操作緩存行的。不同的是,這里是讓三個(gè)變量處于同一個(gè)緩存行,從而達(dá)到任意使一個(gè)變量改變,其他兩個(gè)值也必須重新加載的目的。

總結(jié)

1.緩存時(shí)為了平衡CPU和內(nèi)存之間的速度差異,cpu核獨(dú)自擁有L1、L2緩存,公用L3緩存,且越靠近CPU的緩存存儲(chǔ)速度越快、容量越小。
2.緩存中的內(nèi)容是按塊一次性從主內(nèi)存中加載到緩存行。
3.L1、L2緩存中的緩存行修改時(shí),會(huì)寫回到L3和主內(nèi)存,并且使其他核上的還緩存行失效。
4.多個(gè)CPU核一寫一讀,或者同時(shí)寫同一塊緩存行的時(shí)候,會(huì)導(dǎo)致偽共享問題。
5.假如存在多個(gè)線程同時(shí)修改一個(gè)類的不同字段的場(chǎng)景,必須考慮使用緩存行填充或者@sun.misc.Contended注解防止偽共享。

Cell數(shù)組

image.png

increase 流程

Increase() -> add(1L)

  public void add(long x) {
      Cell[] cs; long b, v; int m; Cell c;
      if ((cs = cells) != null || !casBase(b = base, b + x)) {
          boolean uncontended = true;
          if (cs == null || (m = cs.length - 1) < 0 ||
              (c = cs[getProbe() & m]) == null ||
              !(uncontended = c.cas(v = c.value, v + x)))
              longAccumulate(x, null, uncontended);
      }
  }
image.png

if ((as = cells) != null || !casBase(b = base, b + x)) {
如果cells不為空,說明有競(jìng)爭(zhēng), 如果直接更新失敗,說明有競(jìng)爭(zhēng),進(jìn)入到多核邏輯.

對(duì)于上面的,有興趣的可以看看是怎么找到指定的Cell的,在上面的a = as[getProbe() & m]中,其中m=數(shù)組的長(zhǎng)度-1,其實(shí)這里也是一個(gè)取余的運(yùn)算,而getProbe()這個(gè)方法是用于獲取當(dāng)前線程的threadLocalRandomProb(當(dāng)前本地線程探測(cè)值,初始值為0),其實(shí)也就是一個(gè)隨機(jī)數(shù)啊,然后對(duì)數(shù)組的長(zhǎng)度取余得到的就是對(duì)應(yīng)的數(shù)組的索引,首次調(diào)用這個(gè)方法是數(shù)組的第一個(gè)元素,如果數(shù)組的第一個(gè)元素為null,那么就說明沒有找到對(duì)應(yīng)的Cell;
對(duì)于取余運(yùn)算,舉個(gè)簡(jiǎn)單的例子吧,對(duì)于2的n次方取余,比如隨機(jī)數(shù)9要對(duì)4進(jìn)行取余,我們可以9&(4-1)=9&3=1001&0011=1,利用位運(yùn)算取余了解一下;

Probe是什么

ConcurrentHashMap 在累加鍵值對(duì)個(gè)數(shù)的 addCount 函數(shù)中,使用 ThreadLocalRandom.getProbe() 得到線程的探針哈希值。
在這里,這個(gè)探針哈希值的作用是哈希線程,將線程和數(shù)組中的不用元素對(duì)應(yīng)起來,盡量避免線程爭(zhēng)用同一數(shù)組元素。探針哈希值和 map 里使用的哈希值的區(qū)別是,當(dāng)線程發(fā)生數(shù)組元素爭(zhēng)用后,可以改變線程的探針哈希值,讓線程去使用另一個(gè)數(shù)組元素,而 map 中 key 對(duì)象的哈希值,由于有定位 value 的需求,所以它是一定不能變的。
那么這個(gè)探針哈希值是在哪計(jì)算的呢?帶著這個(gè)問題我們繼續(xù)往下看。
ThreadLocalRandom.getProbe() 方法如下:

/**
 * Returns the probe value for the current thread without forcing
 * initialization. Note that invoking ThreadLocalRandom.current()
 * can be used to force initialization on zero return.
 */
static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
  }
 
 // Unsafe mechanics
  private static final sun.misc.Unsafe UNSAFE;
  ...
  private static final long PROBE;
  ...
  static {
      try {
          UNSAFE = sun.misc.Unsafe.getUnsafe();
          Class<?> tk = Thread.class;
          ...
          PROBE = UNSAFE.objectFieldOffset
              (tk.getDeclaredField("threadLocalRandomProbe"));
          ...
      } catch (Exception e) {
          throw new Error(e);
        }
    }  

可以看到 PROBE 表示的是 Thread 類 threadLocalRandomProbe 字段的偏移量。所以 getProbe 方法的功能就是簡(jiǎn)單的返回當(dāng)前線程 threadLocalRandomProbe 字段的值。
接著去 Thread 類看看這個(gè) threadLocalRandomProbe 字段,

 /** Probe hash value; nonzero if threadLocalRandomSeed initialized */
  @sun.misc.Contended("tlr")
  int threadLocalRandomProbe;

Thread 類僅僅是定義了這個(gè)字段,并沒有將其初始化,其初始化工作由 ThreadLocalRandom 類來做。
ThreadLocalRandom 類的 localInit 方法完成初始化工作,

 /**
   * Initialize Thread fields for the current thread.  Called only
   * when Thread.threadLocalRandomProbe is zero, indicating that a
   * thread local seed value needs to be generated. Note that even
   * though the initialization is purely thread-local, we need to
   * rely on (static) atomic generators to initialize the values.
   */
  static final void localInit() {
    // probeGenerator 是一個(gè) AtomicInteger 類型
    // PROBE_INCREMENT 是一個(gè)靜態(tài)常量,值為 0x9e3779b9
      int p = probeGenerator.addAndGet(PROBE_INCREMENT);
      int probe = (p == 0) ? 1 : p; // skip 0
      long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
      Thread t = Thread.currentThread(); // 獲取當(dāng)前線程
      // 通過 Unsafe 對(duì)象初始化當(dāng)前線程的 threadLocalRandomSeed 字段
      UNSAFE.putLong(t, SEED, seed);
      // 通過 Unsafe 對(duì)象初始化當(dāng)前線程的 threadLocalRandomProbe 字段
      UNSAFE.putInt(t, PROBE, probe);
  }

SEED 和 PROBE 類似,它表示的是 Thread 類 threadLocalRandomSeed 字段的偏移量。
在 ThreadLocalRandom 類的這個(gè) localInit 方法里,同時(shí)初始化了當(dāng)前線程的 threadLocalRandomSeed 字段和 threadLocalRandomProbe 字段。
所以在 Thread 類 threadLocalRandomProbe 字段上的注釋中說:nonzero if threadLocalRandomSeed initialized。就是說如果 threadLocalRandomSeed 字段被初始化了,threadLocalRandomProbe 字段就非零。因?yàn)樗鼈z是同時(shí)被初始化的。
除此之外,也可以通過 ThreadLocalRandom 類的 advanceProbe 方法更改當(dāng)前線程 threadLocalRandomProbe 的值。

 /**
   * Pseudo-randomly advances and records the given probe value for the
   * given thread.
   */
  static final int advanceProbe(int probe) {
      probe ^= probe << 13;   // xorshift
      probe ^= probe >>> 17;
      probe ^= probe << 5;
      UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
      return probe;
  }

ConcurrentHashMap 里的 fullAddCount 方法會(huì)調(diào)用 ThreadLocalRandom.localInit() 初始化當(dāng)前線程的探針哈希值;當(dāng)發(fā)生線程爭(zhēng)用后,也會(huì)調(diào)用 ThreadLocalRandom.advanceProbe(h) 更改當(dāng)前線程的探針哈希值
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
...
h = ThreadLocalRandom.advanceProbe(h);
...
}

longAccumulate

final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {

if ((as = cells) != null && (n = as.length) > 0) {
// 不是第一次進(jìn)入

` if ((a = as[(n - 1) & h]) == null) {`
image.png
    //還沒分配cell
            if (cellsBusy == 0 && casCellsBusy()) {
               //獲得鎖
                  if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                      //再次檢查
                  }
           }
    }
image.png
              else if (c.cas(v = c.value,
                               (fn == null) ? v + x : fn.applyAsLong(v, x)))
                    break;
                else if (n >= NCPU || cells != cs)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == cs)        // Expand table unless stale
                            cells = Arrays.copyOf(cs, n << 1);
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);

h = advanceProbe(h); // 總是不成功,改變探針,讓線程落入另外一個(gè)cell

} else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
// cells 不存在 或者未加鎖 或者 未新建 ->第一次進(jìn)入


image.png

}else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))

//再嘗試一下累加
}
最終拿到多線程,用同一個(gè)LongAdder對(duì)象進(jìn)行累加的結(jié)果。非原子操作,但是保證最終正確。

    public long sum() {
        Cell[] cs = cells;
        long sum = base;
        if (cs != null) {
            for (Cell c : cs)
                if (c != null)
                    sum += c.value;
        }
        return sum;
    }

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容