1.簡介
LongAdder是Java8中新的并發(fā)包類,相比較之前的原子類AtomicXXX,LongAdder在低并發(fā)的情況下性能和原子類基本持平,但在高并發(fā)的情況下性能優(yōu)于原子類
2.源碼分析
/**
* One or more variables that together maintain an initially zero
* {@code long} sum. When updates (method {@link #add}) are contended
* across threads, the set of variables may grow dynamically to reduce
* contention. Method {@link #sum} (or, equivalently, {@link
* #longValue}) returns the current total combined across the
* variables maintaining the sum.
*
* <p>This class is usually preferable to {@link AtomicLong} when
* multiple threads update a common sum that is used for purposes such
* as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar
* characteristics. But under high contention, expected throughput of
* this class is significantly higher, at the expense of higher space
* consumption.
* @since 1.8
* @author Doug Lea
*/
public class LongAdder extends Striped64 implements Serializable
注釋的意思是,1個或多個值的和被保存在一個初始化值為0的"sum"中,當調用add()方法的并發(fā)量很大時,那么存儲值的容器會動態(tài)擴容以降低碰撞幾率(類似hash碰撞),方法sum()返回并發(fā)操作后得到的最終結果。
通常情況下,LongAdder在統(tǒng)計數據更新時比AtomicLong更適用。但不應該用于細粒化的同步控制。LongAdder在低并發(fā)的情況下性能和AtomicLong基本持平,但在高并發(fā)的情況下性能優(yōu)于AtomicLong。
那我們就來看看add(long x)和sum()方法的具體實現吧!
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
//嘗試更新base,若并發(fā)不高,能更新成功,退出條件(cas更新)
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
/**
* 嘗試第二次更新值,取得Cell[]中的一個值,getProbe() & m是
* 取得一個隨機整數然后和m(數組大小-1)做與運算,得出的值在
* 0~(as.length - 1)之間,更新成功則退出條件
*/
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//更新失敗后進入
longAccumulate(x, null, uncontended);
}
}
//這是存儲實體Cell的源碼,對CAS不了解的同學可以看看Atomic包
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 取得一個隨機整數,設置是否并發(fā)競爭為true
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current();
h = getProbe();
wasUncontended = true;
}
//是否碰撞,參考hashmap
boolean collide = false;
for (;;) {
Cell[] as; Cell a; int n; long v;
if ((as = cells) != null && (n = as.length) > 0) {
//如果數組指定位置為空,則賦值
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) {
Cell r = new Cell(x);
//創(chuàng)建Cell時,casCellsBusy()鎖住數組
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try {
Cell[] rs; int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue;
}
}
//指定數組位置為null則說明沒有碰撞
collide = false;
}
else if (!wasUncontended)
wasUncontended = true;
/**
* a不為null,更新a的value,成功則退出。更新失敗則
* 說明多個線程同時更新一個Cell,并發(fā)量大,碰撞幾率高
* 可能需要擴容
*/
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
/**
* 若數組大小大于CPU個數,則說明碰撞不是由于數組過小導致
* 則重新嘗試更新數據
*/
else if (n >= NCPU || cells != as)
collide = false;
else if (!collide)
collide = true;
//擴容數組,擴容時鎖住數組
else if (cellsBusy == 0 && casCellsBusy()) {
try {
if (cells == as) {
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue;
}
h = advanceProbe(h);
}
//若cell[]為空,初始化
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
//加鎖方法
final boolean casCellsBusy() {
return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
}
/**
* 返回各個線程操作總和,LongAdder在統(tǒng)計的時候如果有并發(fā)更新
* 可能導致統(tǒng)計的數據有誤差,因為volatile并不能保證并發(fā)安全
*/
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
3.總結
LongAdder在AtomicLong的基礎上將單點的更新壓力分散到各個節(jié)點,在低并發(fā)的時候通過對base的直接更新可以很好的保障和AtomicLong的性能基本保持一致,而在高并發(fā)的時候通過分散,讓各個線程操作不同的節(jié)點,提高了性能。