原子操作類-LongAdder、LongAccumulator、DoubleAdder及DoubleAccumulator

1.Striped64

This class maintains a lazily-initialized table of atomically
updated variables, plus an extra "base" field. The table size
is a power of two. Indexing uses masked per-thread hash codes.
Nearly all declarations in this class are package-private,
accessed directly by subclasses.

Table entries are of class Cell; a variant of AtomicLong padded
(via @sun.misc.Contended) to reduce cache contention. Padding
is overkill for most Atomics because they are usually
irregularly scattered in memory and thus don't interfere much
with each other. But Atomic objects residing in arrays will
tend to be placed adjacent to each other, and so will most
often share cache lines (with a huge negative performance
impact) without this precaution.

In part because Cells are relatively large, we avoid creating
them until they are needed.  When there is no contention, all
updates are made to the base field.  Upon first contention (a
failed CAS on base update), the table is initialized to size 2.
The table size is doubled upon further contention until
reaching the nearest power of two greater than or equal to the
number of CPUS. Table slots remain empty (null) until they are
needed.

該類維護一個原子更新變量的延遲初始化表,以及一個額外的"base"域。表大小為2的冪。索引使用線程哈希碼的掩碼。

表條目使用類Cell,AtomicLong的填充(@sun.misc.Contended)變體以減少緩存競爭。大多數原子變量是不規(guī)則分布所以填充是多余的,但是對于數組來說,原子對象會相鄰放置,因此在沒有預防措施時大多數情況下會共享緩存行(對性能產生巨大的負面影響)。

延遲創(chuàng)建直到需要的時候,部分原因是因為Cell相對較大。當沒有爭用時,所有更新都在base域進行。在第一次爭用時(base域CAS更新失?。砣萘砍跏蓟癁?。表大小會在進一步爭用時加倍,直到達到最接近于(大于等于)CPU數目的2的冪。表槽保持為null直到需要使用時。

A single spinlock ("cellsBusy") is used for initializing and
resizing the table, as well as populating slots with new Cells.
There is no need for a blocking lock; when the lock is not
available, threads try other slots (or the base).  During these
retries, there is increased contention and reduced locality,
which is still better than alternatives.

The Thread probe fields maintained via ThreadLocalRandom serve
as per-thread hash codes. We let them remain uninitialized as
zero (if they come in this way) until they contend at slot
0. They are then initialized to values that typically do not
often conflict with others.  Contention and/or table collisions
are indicated by failed CASes when performing an update
operation. Upon a collision, if the table size is less than
the capacity, it is doubled in size unless some other thread
holds the lock. If a hashed slot is empty, and lock is
available, a new Cell is created. Otherwise, if the slot
exists, a CAS is tried.  Retries proceed by "double hashing",
using a secondary hash (Marsaglia XorShift) to try to find a
free slot.

自旋鎖cellBusy用于初始化和擴容表,以及將新建的Cells填充到槽中。不需要阻塞鎖,當鎖不可用時,線程可以嘗試其他槽或者base。在這些重試期間,爭用會增加且局部性會降低,但是仍好于使用阻塞鎖的方案。

線程probe字段通過ThreadLocalRandom作為線程哈希值來維護。保持未初始化(0)直到在槽0發(fā)生爭用。然后就會被初始化為通常不會其他值沖突的值。爭用或表沖突發(fā)生在執(zhí)行更新操作CAS失敗時。碰撞時,如果表大小小于容量,則大小加倍,除非其他線程持有鎖。如果哈希槽為空,且鎖可用,則創(chuàng)建Cell。否則,如果操作已有元素,則嘗試CAS。使用雙重哈希進行重試,使用輔助的hash(Marsaglia XorShift)嘗試查找空閑的槽。

The table size is capped because, when there are more threads
than CPUs, supposing that each thread were bound to a CPU,
there would exist a perfect hash function mapping threads to
slots that eliminates collisions. When we reach capacity, we
search for this mapping by randomly varying the hash codes of
colliding threads.  Because search is random, and collisions
only become known via CAS failures, convergence can be slow,
and because threads are typically not bound to CPUS forever,
may not occur at all. However, despite these limitations,
observed contention rates are typically low in these cases.

It is possible for a Cell to become unused when threads that
once hashed to it terminate, as well as in the case where
doubling the table causes no thread to hash to it under
expanded mask.  We do not try to detect or remove such cells,
under the assumption that for long-running instances, observed
contention levels will recur, so the cells will eventually be
needed again; and for short-lived ones, it does not matter.

表大小是有限的,如果線程多于CPU數目時,通過隨機改變沖突線程的哈希碼來查找映射。因為搜索是隨機的,碰撞僅由CAS失敗導致,收斂可能變得很慢。但是,盡管存在這些限制,在這些情況下觀察到的爭用率通常較低。

Cell可能變?yōu)槭М斣浬⒘械皆撐恢玫木€程終止了,以及當表擴容時沒有線程散列到該位置。不會嘗試檢測或刪除這種cells,假設對于長期運行的實例,觀察到的競爭會重現,因此最終會需要這些cells;對于短期運行的實例,這無關緊要。

    /**
     * Returns the probe value for the current thread.
     * Duplicated from ThreadLocalRandom because of packaging restrictions.
     */
    static final int getProbe() {
        return UNSAFE.getInt(Thread.currentThread(), PROBE);
    }

獲取的線程的threadLocalRandomProbe


    // The following three initially uninitialized fields are exclusively
    // managed by class java.util.concurrent.ThreadLocalRandom. These
    // fields are used to build the high-performance PRNGs in the
    // concurrent code, and we can not risk accidental false sharing.
    // Hence, the fields are isolated with @Contended.

    /** The current seed for a ThreadLocalRandom */
    @sun.misc.Contended("tlr")
    long threadLocalRandomSeed;

    /** Probe hash value; nonzero if threadLocalRandomSeed initialized */
    @sun.misc.Contended("tlr")
    int threadLocalRandomProbe;

    /** Secondary seed isolated from public ThreadLocalRandom sequence */
    @sun.misc.Contended("tlr")
    int threadLocalRandomSecondarySeed;

重點是longAccumulate方法:

    /**
     * Handles cases of updates involving initialization, resizing,
     * creating new Cells, and/or contention. See above for
     * explanation. This method suffers the usual non-modularity
     * problems of optimistic retry code, relying on rechecked sets of
     * reads.
     *
     * @param x the value
     * @param fn the update function, or null for add (this convention
     * avoids the need for an extra field or function in LongAdder).
     * @param wasUncontended false if CAS failed before call
     */
    final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); // force initialization
            h = getProbe();
            wasUncontended = true;
        }
        boolean collide = false;                // True if last slot nonempty
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       // Try to attach new Cell
                        Cell r = new Cell(x);   // Optimistically create
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               // Recheck under lock
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;           // Slot is now non-empty
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
                h = advanceProbe(h);
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }

分為以下幾個步驟:

  • step1.cells已經正常初始化過了(應對LongAdder中槽為空以及cas累加失敗,發(fā)生碰撞)
    step1-1.如果此時槽中并沒有Cell,則新建(使用cellsBusy鎖)
    step1-2.如果LongAdder發(fā)生cas累加碰撞,則標記wasUncontended為true,下一次就會使用advanceProbe重新找一個槽,并跳過此步驟
    step1-3.使用cas更新到指定的槽中(如果更新失敗,表明發(fā)生了碰撞,才會進行后面的步驟)
    step1-4.如果數組長度達到最大值或者當前cells已經擴容,則將collide置為false
    step1-5.如果collide為false,則置為true。
    step1-6.如果能運行到此,表明collide為true,此時會進行擴容
  • step2.表明cells數組未初始化,使用cellsBusy鎖將cells數組初始化為大小為2的數組,并將累加的數組放到其中的一個槽中
  • step3.前面兩步嘗試失?。碿ells未完成初始化,且其他線程正在對其初始化時),嘗試累加到base中
    @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }

總結

  • 1)發(fā)生碰撞時,使用雙重哈希重新隨機找一個槽(使用Marsaglia XorShift);對每一個線程使用不同的種子,每個線程都累加SEEDER_INCREMENT = 0xbb67ae8584caa73bL
  • 2)無爭用情況使用base累加,發(fā)生爭用時使用cells數組,并在適當情況下進行擴容。分擔了負擔
  • 3)對于偽共享的處理,使用@sun.misc.Contended標識Cell
  • 4)該技術同樣用在了ConcurrentHashMap的addCount中了

2.LongAdder

One or more variables that together maintain an initially zero long 
sum. When updates (method add(long)) are contended across 
threads, the set of variables may grow dynamically to reduce 
contention. Method sum() (or, equivalently, longValue()) returns 
the current total combined across the variables maintaining the 
sum.

This class is usually preferable to AtomicLong when multiple 
threads update a common sum that is used for purposes such as 
collecting statistics, not for fine-grained synchronization control. 
Under low update contention, the two classes have similar 
characteristics. But under high contention, expected throughput of 
this class is significantly higher, at the expense of higher space 
consumption.

LongAdders can be used with a ConcurrentHashMap to maintain 
a scalable frequency map (a form of histogram or multiset). For 
example, to add a count to a 
ConcurrentHashMap<String,LongAdder> freqs, initializing if not 
already present, you can use freqs.computeIfAbsent(k -> new 
LongAdder()).increment();

This class extends Number, but does not define methods such as 
equals, hashCode and compareTo because instances are 
expected to be mutated, and so are not useful as collection keys.

一個或多個變量共同維護初始為0的long型和。當更新(add方法)發(fā)生爭用時,變量集可能會增長以減少爭用。方法sum或者longValue返回變量集保存的當前總和。

當用于多線程更新收集統計信息而不是用于細粒度同步控制時,該類要優(yōu)于AtomicLong。在低更新爭用時,兩個類性能差不多。但在高爭用的情況下,該類的吞吐量更高,代價是空間消耗更高。

LongAdders可以與ConcurrentHashMap一同使用,以維護可伸縮的頻率映射(直方圖或multiset形式)。例如,ConcurrentHashMap<String,LongAdder> freqs,如果不存在則進行初始化freqs.computeIfAbsent(k -> new
LongAdder()).increment();

該類繼承自Number,但是沒有定義equals, hashCode 和 compareTo方法,因為該類實例是可變的,因此不能用作集合的鍵。

示例:

public class LongAdderDemo {
    private static final int MAX_THREADS = 10;
    private static final int TASK_COUNT = 20;
    private static final int TARGET_COUNT = 10000000;

    private AtomicLong acount = new AtomicLong(0L);
    private LongAdder lacount = new LongAdder();
    private long count = 0;

    private static CountDownLatch cdlatomic = new CountDownLatch(TASK_COUNT);
    private static CountDownLatch cdladdr = new CountDownLatch(TASK_COUNT);


    public class AtomicThread implements Runnable {
        protected String name;
        protected long starttime;

        public AtomicThread(long starttime) {
            this.starttime = starttime;
        }

        @Override
        public void run() {
            long i = 0;
            while (i < TARGET_COUNT) {
                acount.incrementAndGet();
                i++;
            }
            cdlatomic.countDown();
        }
    }

    public void testAtomic() throws InterruptedException {
        ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
        long starttime = System.currentTimeMillis();
        AtomicThread atomic = new AtomicThread(starttime);
        for (int i = 0; i < TASK_COUNT; i++) {
            exe.submit(atomic);
        }
        cdlatomic.await();
        exe.shutdown();
        long v = acount.get();
        long endtime = System.currentTimeMillis();
        System.out.println("AtomicThread spend:" + (endtime - starttime) + "ms" + " v" + v);
    }

    public class LongAdderThread implements Runnable {
        protected String name;
        protected long starttime;

        public LongAdderThread(long starttime) {
            this.starttime = starttime;
        }

        @Override
        public void run() {
            long i = 0;
            while (i < TARGET_COUNT) {
                lacount.increment();
                i++;
            }
            cdladdr.countDown();
        }

    }

    public void testLongAdder() throws InterruptedException {
        ExecutorService exe = Executors.newFixedThreadPool(MAX_THREADS);
        long starttime = System.currentTimeMillis();
        LongAdderThread atomic = new LongAdderThread(starttime);
        for (int i = 0; i < TASK_COUNT; i++) {
            exe.submit(atomic);
        }
        cdladdr.await();
        exe.shutdown();
        long v = lacount.sum();
        long endtime = System.currentTimeMillis();
        System.out.println("LongAdderThread spend:" + (endtime - starttime) + "ms" + " v" + v);
    }

    public static void main(String[] args) throws InterruptedException {
        LongAdderDemo demo = new LongAdderDemo();
        demo.testAtomic();
        demo.testLongAdder();
    }
}

結果:

AtomicThread spend:3807ms v200000000
LongAdderThread spend:727ms v200000000

可見LongAdder明顯要比AtomicLong要高效。

構造器:

    public LongAdder() {
    }

增加:

    public void increment() {
        add(1L);
    }
    public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

add的步驟總結:

  • step1.cells不為null表示已發(fā)送爭用,為null時無爭用,無爭用首先嘗試累加到base上面:casBase
  • step2.as == null || (m = as.length - 1) < 0表示cells數組還未初始化; (a = as[getProbe() & m]) == null表示槽位空。對于這兩種情況,需要調用 longAccumulate(x, null, uncontended);進行初始化和創(chuàng)建Cell處理
  • step3.如果cells[]已經初始化過了,并且對應槽的Cell已經存在,則直接累加到該槽

求和:

    public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

3.LongAccumulator

One or more variables that together maintain a running long value 
updated using a supplied function. When updates (method 
accumulate(long)) are contended across threads, the set of 
variables may grow dynamically to reduce contention. Method 
get() (or, equivalently, longValue()) returns the current value 
across the variables maintaining updates.

This class is usually preferable to AtomicLong when multiple 
threads update a common value that is used for purposes such 
as collecting statistics, not for fine-grained synchronization 
control. Under low update contention, the two classes have 
similar characteristics. But under high contention, expected 
throughput of this class is significantly higher, at the expense of 
higher space consumption.

The order of accumulation within or across threads is not 
guaranteed and cannot be depended upon, so this class is only 
applicable to functions for which the order of accumulation does 
not matter. The supplied accumulator function should be side-
effect-free, since it may be re-applied when attempted updates 
fail due to contention among threads. The function is applied with 
the current value as its first argument, and the given update as 
the second argument. For example, to maintain a running 
maximum value, you could supply Long::max along with 
Long.MIN_VALUE as the identity.

Class LongAdder provides analogs of the functionality of this 
class for the common special case of maintaining counts and 
sums. The call new LongAdder() is equivalent to new 
LongAccumulator((x, y) -> x + y, 0L.

This class extends Number, but does not define methods such as 
equals, hashCode and compareTo because instances are 
expected to be mutated, and so are not useful as collection keys.

使用一個函數維護一個long值。當發(fā)生爭用時,變量集會動態(tài)增長。get方法(longValue)返回當前值。

累積的順序無法保證,也不應該依賴順序,因此該類只適用于對順序無要求的函數。函數應該是無副作用,當發(fā)生爭用失敗時可以重新調用它。該函數將當前作為第一個參數,并將給定更新作為第二個參數。

也即accumulatorFunction要滿足結合律和交換律。

    public LongAccumulator(LongBinaryOperator accumulatorFunction,
                           long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
    }
    public void accumulate(long x) {
        Cell[] as; long b, v, r; int m; Cell a;
        if ((as = cells) != null ||
            (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended =
                  (r = function.applyAsLong(v = a.value, x)) == v ||
                  a.cas(v, r)))
                longAccumulate(x, function, uncontended);
        }
    }
    public long get() {
        Cell[] as = cells; Cell a;
        long result = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    result = function.applyAsLong(result, a.value);
            }
        }
        return result;
    }

使用示例:

public class LongAccumulatorDemo {

    // 找出最大值
    public static void main(String[] args) throws InterruptedException {
        LongAccumulator accumulator = new LongAccumulator(Long::max, Long.MIN_VALUE);
        Thread[] ts = new Thread[1000];

        for (int i = 0; i < 1000; i++) {
            ts[i] = new Thread(() -> {
                Random random = new Random();
                long value = random.nextLong();
                accumulator.accumulate(value); // 比較value和上一次的比較值,然后存儲較大者
            });
            ts[i].start();
        }
        for (int i = 0; i < 1000; i++) {
            ts[i].join();
        }
        System.out.println(accumulator.longValue());
    }
}
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容