Java并發(fā)系列之Atomic

0. 分享背景

  • 回顧Java中并發(fā)編程的相關(guān)知識(shí)點(diǎn)
  • 了解其內(nèi)部實(shí)現(xiàn)機(jī)制原理
  • 總結(jié)并討論實(shí)際項(xiàng)目運(yùn)用

1. Atomic類

Java.util.concurrent中提供了atomic原子包,可以實(shí)現(xiàn)原子操作(atomic operation)

  • 相關(guān)類AtomicBoolean , AtomicInteger, AtomicLong, AtomicReference

2. AtomicBoolean Demo

Demo UML


未命名.jpg
  • 線程安全類SafePerson
public class SafePerson extends Person{
    private AtomicBoolean isReady = new AtomicBoolean(false);
    public void doWork() {
        Thread current = Thread.currentThread();

        if (isReady.compareAndSet(false, true)) {
            System.out.println(current.getId() + " Preparing....");
        }else {
            System.out.println(current.getId() + " Everything is ready, do something...");
        }
    }
}
  • 非線程安全類UnsafePerson
public class UnsafePerson extends Person{
    private boolean isReady = false;
    public void doWork() {
        Thread current = Thread.currentThread();
        if (!isReady) {
            System.out.println(current.getId() + " Preparing....");
            isReady = true;
        }else {
            System.out.println(current.getId() + " Everything is ready, do something...");
        }
    }
}
  • PersonRunnable類
public class PersonRunnable implements Runnable{
    private Person person;

    public PersonRunnable(Person person) {
        this.person = person;
    }
    public void run() {
        person.doWork();
    }
}
  • 驗(yàn)證Main方法
 public static void main( String[] args )
    {
        int threadCount = 10;
        Person unsafePerson = new UnsafePerson();
        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(threadCount);

        for (int i = 0 ; i < threadCount; i++) {
            fixedThreadPool.execute(new PersonRunnable(unsafePerson));
        }
    }
  • 測(cè)試UnsafePerson類執(zhí)行結(jié)果, 從執(zhí)行結(jié)果中可以看出,本來僅想執(zhí)行一次準(zhǔn)備工作的代碼,被多個(gè)線程所執(zhí)行,并且,每次能夠執(zhí)行的線程數(shù)也是不確定的。
9 Preparing....
13 Preparing....
12 Preparing....
10 Preparing....
11 Preparing....
16 Everything is ready, do something...
15 Everything is ready, do something...
14 Everything is ready, do something...
17 Everything is ready, do something...
18 Everything is ready, do something...
  • 測(cè)試SafePerson執(zhí)行結(jié)果,僅有單個(gè)線程能夠進(jìn)入并執(zhí)行準(zhǔn)備階段代碼
9 Preparing....
13 Everything is ready, do something...
10 Everything is ready, do something...
12 Everything is ready, do something...
15 Everything is ready, do something...
16 Everything is ready, do something...
10 Everything is ready, do something...
13 Everything is ready, do something...
11 Everything is ready, do something...
9 Everything is ready, do something...

3. AtomicBoolean 源碼跟蹤

  • JDK部分 sun/misc/Unsafe
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicBoolean.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     *
     * @param expect the expected value
     * @param update the new value
     * @return true if successful. False return indicates that
     * the actual value was not equal to the expected value.
     */
    public final boolean compareAndSet(boolean expect, boolean update) {
        int e = expect ? 1 : 0;
        int u = update ? 1 : 0;
        return unsafe.compareAndSwapInt(this, valueOffset, e, u);
    }
  • JVM Hotspot部分
    hotspot/src/share/vm/prims/unsafe.cpp
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
  UnsafeWrapper("Unsafe_CompareAndSwapInt");
  oop p = JNIHandles::resolve(obj);
  jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
  // e 期望值,x 要更新值
  return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

hotspot/src/share/vm/runtime/atomic.cpp

jbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {
  assert(sizeof(jbyte) == 1, "assumption.");
  uintptr_t dest_addr = (uintptr_t)dest;
  uintptr_t offset = dest_addr % sizeof(jint);
  volatile jint* dest_int = (volatile jint*)(dest_addr - offset);
  jint cur = *dest_int;
  jbyte* cur_as_bytes = (jbyte*)(&cur);
  jint new_val = cur; //期望值
  jbyte* new_val_as_bytes = (jbyte*)(&new_val);
  new_val_as_bytes[offset] = exchange_value;
  while (cur_as_bytes[offset] == compare_value) {
    jint res = cmpxchg(new_val, dest_int, cur);
    if (res == cur) break;
    cur = res;
    new_val = cur;
    new_val_as_bytes[offset] = exchange_value;
  }
  return cur_as_bytes[offset];
}

hotspot/src/os_cpu/linux_x86/vm

inline jint     Atomic::cmpxchg    (jint     exchange_value, volatile jint*     dest, jint     compare_value) {
  int mp = os::is_MP();
  __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                    : "=a" (exchange_value)
                    : "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
                    : "cc", "memory");
  return exchange_value;
}
  • "cmpxchgl"指令的作用,CAS

4. CAS原理(Compare And Swap)

4.1 CAS

CAS指令的三個(gè)操作數(shù),當(dāng)前內(nèi)存值,預(yù)期值,新值。
當(dāng)且僅當(dāng)預(yù)期值和內(nèi)存值相同時(shí),將內(nèi)存值修改為新值,否則什么都不做

123.jpg

4.2 CAS優(yōu)點(diǎn)

  • 非阻塞算法
  • 原子操作成本低

4.3 CAS不足

  1. ABA問題。
    如果一個(gè)值原來是A,變成了B,又變成了A,那么使用CAS進(jìn)行檢查時(shí)會(huì)發(fā)現(xiàn)它的值沒有發(fā)生變化,但是實(shí)際上卻變化了。ABA問題的解決思路就是使用版本號(hào)。在變量前面追加上版本號(hào),每次變量更新的時(shí)候把版本號(hào)加一,那么A-B-A 就會(huì)變成1A-2B-3A。(AKKA/GCC4.7引入STM)
    從Java1.5開始JDK的atomic包里提供了一個(gè)類AtomicStampedReference來解決ABA問題。這個(gè)類的compareAndSet方法作用是首先檢查當(dāng)前引用是否等于預(yù)期引用,并且當(dāng)前標(biāo)志是否等于預(yù)期標(biāo)志,如果全部相等,則以原子方式將該引用和該標(biāo)志的值設(shè)置為給定的更新值。

  2. 循環(huán)時(shí)間長(zhǎng)開銷大。自旋CAS如果長(zhǎng)時(shí)間不成功,會(huì)給CPU帶來非常大的執(zhí)行開銷。

  3. 只能保證一個(gè)共享變量的原子操作。JDK提供了AtomicReference類來保證引用對(duì)象之間的原子性,你可以把多個(gè)變量放在一個(gè)對(duì)象里來進(jìn)行CAS操作。

5.CPU如何保證原子性

關(guān)于CPU的鎖有如下3種:

1. 處理器自動(dòng)保證基本內(nèi)存操作的原子性

首先處理器會(huì)自動(dòng)保證基本的內(nèi)存操作的原子性。處理器保證從系統(tǒng)內(nèi)存當(dāng)中讀取或者寫入一個(gè)字節(jié)是原子的,意思是當(dāng)一個(gè)處理器讀取一個(gè)字節(jié)時(shí),其他處理器不能訪問這個(gè)字節(jié)的內(nèi)存地址。奔騰6和最新的處理器能自動(dòng)保證單處理器對(duì)同一個(gè)緩存行里進(jìn)行16/32/64位的操作是原子的,但是復(fù)雜的內(nèi)存操作處理器不能自動(dòng)保證其原子性,比如跨總線寬度,跨多個(gè)緩存行,跨頁(yè)表的訪問。但是處理器提供總線鎖定和緩存鎖定兩個(gè)機(jī)制來保證復(fù)雜內(nèi)存操作的原子性。

2. 使用總線鎖保證原子性

第一個(gè)機(jī)制是通過總線鎖保證原子性。如果多個(gè)處理器同時(shí)對(duì)共享變量進(jìn)行讀改寫(i++就是經(jīng)典的讀改寫操作)操作,那么共享變量就會(huì)被多個(gè)處理器同時(shí)進(jìn)行操作,這樣讀改寫操作就不是原子的,操作完之后共享變量的值會(huì)和期望的不一致,

原因是有可能多個(gè)處理器同時(shí)從各自的緩存中讀取變量i,分別進(jìn)行加一操作,然后分別寫入系統(tǒng)內(nèi)存當(dāng)中。那么想要保證讀改寫共享變量的操作是原子的,就必須保證CPU1讀改寫共享變量的時(shí)候,CPU2不能操作緩存了該共享變量?jī)?nèi)存地址的緩存。

處理器使用總線鎖就是來解決這個(gè)問題的。所謂總線鎖就是使用處理器提供的一個(gè)LOCK#信號(hào),當(dāng)一個(gè)處理器在總線上輸出此信號(hào)時(shí),其他處理器的請(qǐng)求將被阻塞住,那么該處理器可以獨(dú)占使用共享內(nèi)存。

3. 使用緩存鎖保證原子性

第二個(gè)機(jī)制是通過緩存鎖定保證原子性。在同一時(shí)刻我們只需保證對(duì)某個(gè)內(nèi)存地址的操作是原子性即可,但總線鎖定把CPU和內(nèi)存之間通信鎖住了,這使得鎖定期間,其他處理器不能操作其他內(nèi)存地址的數(shù)據(jù),所以總線鎖定的開銷比較大,最近的處理器在某些場(chǎng)合下使用緩存鎖定代替總線鎖定來進(jìn)行優(yōu)化。

頻繁使用的內(nèi)存會(huì)緩存在處理器的L1,L2和L3高速緩存里,那么原子操作就可以直接在處理器內(nèi)部緩存中進(jìn)行,并不需要聲明總線鎖,在奔騰6和最近的處理器中可以使用“緩存鎖定”的方式來實(shí)現(xiàn)復(fù)雜的原子性。所謂“緩存鎖定”就是如果緩存在處理器緩存行中內(nèi)存區(qū)域在LOCK操作期間被鎖定,當(dāng)它執(zhí)行鎖操作回寫內(nèi)存時(shí),處理器不在總線上聲言LOCK#信號(hào),而是修改內(nèi)部的內(nèi)存地址,并允許它的緩存一致性機(jī)制來保證操作的原子性,因?yàn)榫彺嬉恢滦詸C(jī)制會(huì)阻止同時(shí)修改被兩個(gè)以上處理器緩存的內(nèi)存區(qū)域數(shù)據(jù),當(dāng)其他處理器回寫已被鎖定的緩存行的數(shù)據(jù)時(shí)會(huì)起緩存行無效,在例1中,當(dāng)CPU1修改緩存行中的i時(shí)使用緩存鎖定,那么CPU2就不能同時(shí)緩存了i的緩存行。

但是有兩種情況下處理器不會(huì)使用緩存鎖定。第一種情況是:當(dāng)操作的數(shù)據(jù)不能被緩存在處理器內(nèi)部,或操作的數(shù)據(jù)跨多個(gè)緩存行(cache line),則處理器會(huì)調(diào)用總線鎖定。第二種情況是:有些處理器不支持緩存鎖定。對(duì)于Inter486和奔騰處理器,就算鎖定的內(nèi)存區(qū)域在處理器的緩存行中也會(huì)調(diào)用總線鎖定。

以上兩個(gè)機(jī)制我們可以通過Inter處理器提供了很多LOCK前綴的指令來實(shí)現(xiàn)。比如位測(cè)試和修改指令BTS,BTR,BTC,交換指令XADD,CMPXCHG和其他一些操作數(shù)和邏輯指令,比如ADD(加),OR(或)等,被這些指令操作的內(nèi)存區(qū)域就會(huì)加鎖,導(dǎo)致其他處理器不能同時(shí)訪問它。

6. 總結(jié)

  • 同步計(jì)數(shù)器等
  • 簡(jiǎn)單類型的共享變量操作
  • 結(jié)合volatile構(gòu)建其他樂觀鎖
  • read-modify-write來實(shí)現(xiàn)Lock-Free算法
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容