Java1.8新特性 LongAdder源碼學習

1.簡介

LongAdder是Java8中新的并發(fā)包類,相比較之前的原子類AtomicXXX,LongAdder在低并發(fā)的情況下性能和原子類基本持平,但在高并發(fā)的情況下性能優(yōu)于原子類

2.源碼分析

/**
 * One or more variables that together maintain an initially zero
 * {@code long} sum.  When updates (method {@link #add}) are contended
 * across threads, the set of variables may grow dynamically to reduce
 * contention. Method {@link #sum} (or, equivalently, {@link
 * #longValue}) returns the current total combined across the
 * variables maintaining the sum.
 *
 * <p>This class is usually preferable to {@link AtomicLong} when
 * multiple threads update a common sum that is used for purposes such
 * as collecting statistics, not for fine-grained synchronization
 * control.  Under low update contention, the two classes have similar
 * characteristics. But under high contention, expected throughput of
 * this class is significantly higher, at the expense of higher space
 * consumption.
 * @since 1.8
 * @author Doug Lea
*/
public class LongAdder extends Striped64 implements Serializable

注釋的意思是,1個或多個值的和被保存在一個初始化值為0的"sum"中,當調用add()方法的并發(fā)量很大時,那么存儲值的容器會動態(tài)擴容以降低碰撞幾率(類似hash碰撞),方法sum()返回并發(fā)操作后得到的最終結果。
通常情況下,LongAdder在統(tǒng)計數據更新時比AtomicLong更適用。但不應該用于細粒化的同步控制。LongAdder在低并發(fā)的情況下性能和AtomicLong基本持平,但在高并發(fā)的情況下性能優(yōu)于AtomicLong。

那我們就來看看add(long x)和sum()方法的具體實現吧!

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        //嘗試更新base,若并發(fā)不高,能更新成功,退出條件(cas更新)
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            /**
            * 嘗試第二次更新值,取得Cell[]中的一個值,getProbe() & m是
            * 取得一個隨機整數然后和m(數組大小-1)做與運算,得出的值在
            * 0~(as.length - 1)之間,更新成功則退出條件
            */
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                //更新失敗后進入
                longAccumulate(x, null, uncontended);
        }
    }
//這是存儲實體Cell的源碼,對CAS不了解的同學可以看看Atomic包
@sun.misc.Contended static final class Cell {
    volatile long value;
    Cell(long x) { value = x; }
    final boolean cas(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long valueOffset;
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> ak = Cell.class;
            valueOffset = UNSAFE.objectFieldOffset
                (ak.getDeclaredField("value"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        // 取得一個隨機整數,設置是否并發(fā)競爭為true
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); 
            h = getProbe();
            wasUncontended = true;
        }
        //是否碰撞,參考hashmap
        boolean collide = false; 
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                //如果數組指定位置為空,則賦值
                if ((a = as[(n - 1) & h]) == null) {
                    if (cellsBusy == 0) {       
                        Cell r = new Cell(x);
                        //創(chuàng)建Cell時,casCellsBusy()鎖住數組
                        if (cellsBusy == 0 && casCellsBusy()) {
                            boolean created = false;
                            try {               
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null &&
                                    (m = rs.length) > 0 &&
                                    rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;
                            }
                            if (created)
                                break;
                            continue;
                        }
                    }
                    //指定數組位置為null則說明沒有碰撞
                    collide = false;
                }
                else if (!wasUncontended)
                    wasUncontended = true;
                /**
                * a不為null,更新a的value,成功則退出。更新失敗則
                * 說明多個線程同時更新一個Cell,并發(fā)量大,碰撞幾率高
                * 可能需要擴容
                */
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                /**
                * 若數組大小大于CPU個數,則說明碰撞不是由于數組過小導致
                * 則重新嘗試更新數據
                */
                else if (n >= NCPU || cells != as)
                    collide = false;            
                else if (!collide)
                    collide = true;
                //擴容數組,擴容時鎖住數組
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   
                }
                h = advanceProbe(h);
            }
            //若cell[]為空,初始化
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base
        }
    }
//加鎖方法
final boolean casCellsBusy() {
    return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}

/**
* 返回各個線程操作總和,LongAdder在統(tǒng)計的時候如果有并發(fā)更新
* 可能導致統(tǒng)計的數據有誤差,因為volatile并不能保證并發(fā)安全
*/
public long sum() {
    Cell[] as = cells; Cell a;
    long sum = base;
    if (as != null) {
        for (int i = 0; i < as.length; ++i) {
            if ((a = as[i]) != null)
                sum += a.value;
        }
    }
    return sum;
}

3.總結

LongAdder在AtomicLong的基礎上將單點的更新壓力分散到各個節(jié)點,在低并發(fā)的時候通過對base的直接更新可以很好的保障和AtomicLong的性能基本保持一致,而在高并發(fā)的時候通過分散,讓各個線程操作不同的節(jié)點,提高了性能。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容