
LongAdder.png
Cell
// 避免偽共享 jdk1.8后加入的
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
// CAS操作,設(shè)置value
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
sum
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
// base + N * cell.value
sum += a.value;
}
}
return sum;
}
reset
// reset base & cell.value
public void reset() {
Cell[] as = cells; Cell a;
base = 0L;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
a.value = 0L;
}
}
}
add
public void add(long x) {
Cell[] as; long b, v; int m; Cell a;
// 如果有cell的部分,說明要累加到cell
// 如果cell為空,那么累加base看看
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
// 如果cells還沒有初始化,直接進(jìn)入if
// 或cells的長(zhǎng)度為0
// 或當(dāng)前線程的cell位置為空,沒有被累加過
// 或者cell位置不為空,且累加失敗,有競(jìng)爭(zhēng)
// 以上情況,全部進(jìn)入下面的longAccumulate
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
longAccumulate(x, null, uncontended);
}
}
longAccumulate
final void longAccumulate(long x, LongBinaryOperator fn,
boolean wasUncontended) {
int h;
// 看下ThreadLocalRandom是否初始化
if ((h = getProbe()) == 0) {
ThreadLocalRandom.current(); // force initialization
h = getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty
// 自旋
for (;;) {
Cell[] as; Cell a; int n; long v;
// 如果cells有東西
if ((as = cells) != null && (n = as.length) > 0) {
// 如果線程對(duì)應(yīng)的坑位為空
if ((a = as[(n - 1) & h]) == null) {
// cellsbusy為0,代表現(xiàn)在cells穩(wěn)定,那么可以開始針對(duì)當(dāng)前線程增加坑位
// 換句話說,如果發(fā)現(xiàn)cellsbusy為1,坑位就沒機(jī)會(huì)增加了。
if (cellsBusy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
// 到這里,準(zhǔn)備正式開始,第一時(shí)間先鎖定cells,也就是先設(shè)置cellsbusy為1
if (cellsBusy == 0 && casCellsBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs; int m, j;
// 再次定位到坑位
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
// 將cell填充進(jìn)去,不要忘記這個(gè)坑位記錄了你要put的value
rs[j] = r;
created = true;
}
} finally {
// 最終釋放cellsbusy,也就是解鎖cells
cellsBusy = 0;
}
// 如果已經(jīng)成功創(chuàng)建坑位,那么退出自旋,否則繼續(xù)自旋
if (created)
break;
continue; // Slot is now non-empty
}
}
// 到這里說明cells正在被其他線程鎖定,記錄下有沖突的事實(shí),準(zhǔn)備去rehash
collide = false;
}
// 這里說明外部調(diào)用處在設(shè)置線程對(duì)應(yīng)的坑位時(shí)失敗,有競(jìng)爭(zhēng)線程
// wasUncontended的意義在于,如果有競(jìng)爭(zhēng),說明線程的probe沖突,準(zhǔn)備去rehash
// 對(duì)應(yīng)下面的advanceProbe,這樣再次自旋后,會(huì)去關(guān)注新的坑位,也就沒有沖突一說了
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
// 到這里,說明該線程的坑位已經(jīng)有了,直接累加就好,如果成功當(dāng)然最好了
else if (a.cas(v = a.value, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break;
// 如果上面累加失敗,看下cells是否已經(jīng)到了上限或cells已經(jīng)變更了,比如擴(kuò)容了。
// 這里也記錄下有沖突,準(zhǔn)備去rehash
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
// 設(shè)置沖突標(biāo)識(shí),準(zhǔn)備去rehash
else if (!collide)
collide = true;
// 到這里說明,就算有了cells,但是該位置上累加失敗,數(shù)組還可以擴(kuò)容
// 既然你不讓我加,競(jìng)爭(zhēng)這么厲害,那么擴(kuò)容試試看
// 當(dāng)然了,先鎖定cells
else if (cellsBusy == 0 && casCellsBusy()) {
try {
// 如果cells沒變化
if (cells == as) { // Expand table unless stale
// 容量擴(kuò)大一倍
Cell[] rs = new Cell[n << 1];
將舊的轉(zhuǎn)移到新的cells里面
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
// 解鎖cells
cellsBusy = 0;
}
collide = false;
// 擴(kuò)容完,再自旋看看,能否成功
continue; // Retry with expanded table
}
// 重新計(jì)算新的probe值以對(duì)應(yīng)到不同的下標(biāo)元素,然后重試。
h = advanceProbe(h);
}
// cells還未初始化或?yàn)榭?,先鎖定cells
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
// cells的大小必須是2的冪,好length-1&h進(jìn)行求余
Cell[] rs = new Cell[2];
// 設(shè)置h對(duì)應(yīng)的下標(biāo)位置
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
// 解鎖
cellsBusy = 0;
}
// 如果初始化完畢,退出自旋
if (init)
break;
}
// 說明cells還沒有初始化,但是cells被別人鎖定了
// 那么嘗試著加到base看看,如果成功,那么退出自旋
else if (casBase(v = base, ((fn == null) ? v + x :
fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}