前言
CAS(compare and swap, 比較并交換),是原子操作的一種,可用于在多線程編程中實(shí)現(xiàn)不被打斷的數(shù)據(jù)交換操作,從而避免多線程同時(shí)改寫某一數(shù)據(jù)時(shí)由于執(zhí)行順序不確定性以及中斷的不可預(yù)知性產(chǎn)生的數(shù)據(jù)不一致問題。
簡(jiǎn)單來說,CAS可以保證多線程對(duì)數(shù)據(jù)寫操作時(shí)數(shù)據(jù)的一致性。
CAS的思想:三個(gè)參數(shù),一個(gè)當(dāng)前內(nèi)存值V、舊的預(yù)期值A(chǔ)、即將更新的值B,當(dāng)且僅當(dāng)預(yù)期值A(chǔ)和內(nèi)存值V相同時(shí),將內(nèi)存值修改為B并返回true,否則什么都不做,并返回false。
數(shù)據(jù)不一致問題
一個(gè)n++的問題
public class Case {
public volatile int n;
public void add() {
n++;
}
}
通過javap -verbose Case查看add方法的字節(jié)碼指令
public void add();
flags: ACC_PUBLIC
Code:
stack=3, locals=1, args_size=1
0: aload_0
1: dup
2: getfield #2 // Field n:I
5: iconst_1
6: iadd
7: putfield #2 // Field n:I
10: return
可以看到n++的操作分為以下三步
-
getfield獲得n的值 -
idd進(jìn)行加1操作 -
putfield把累加后的值寫回給n
變量n通過volatile修飾,可保證其在線程之間的可見性,但不能保證該3個(gè)指令的原子執(zhí)行。例如:線程A、B同時(shí)操作n++,n應(yīng)該等于4,但結(jié)果為3。
n++數(shù)據(jù)不一致.png
解決辦法
- 在
add上添加synchronized修飾解決。但性能較差 - 將n的類型由
int改為AtomicInterger原子變量
原子變量的實(shí)現(xiàn)
原子變量是如何保證數(shù)據(jù)的一致性?
答案:CAS
以AtomicInterger為例,
public class AtomicInteger extends Number implements java.io.Serializable {
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
// 變量偏移地址
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
public final int get() {
return value;
}
- Unsafe,是JDK中的一個(gè)內(nèi)部類,由于Java方法無法直接訪問底層系統(tǒng),需要通過本地(native)方法來訪問,Unsafe相當(dāng)于一個(gè)后門,基于該類可以直接操作特定內(nèi)存的數(shù)據(jù)。
- 變量valueOffset,表示該變量值在內(nèi)存中的偏移地址,因?yàn)閁nsafe就是根據(jù)內(nèi)存偏移地址獲取數(shù)據(jù)的。
- 變量value用volatile修飾,保證了多線程之間的內(nèi)存可見性。
看看AtomicInteger如何實(shí)現(xiàn)并發(fā)下的累加操作:
// delta:增加的值
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
調(diào)用unsafe.getAndAddInt方法
// unsafe.getAndAddInt
// var1:原子變量對(duì)象
// var2:變量值的偏移地址
// var4:增加的值
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
// 根據(jù)偏移地址var2,從對(duì)象var1中獲得變量值
var5 = this.getIntVolatile(var1, var2);
// 比較上一步獲得的變量值var5與當(dāng)前內(nèi)存中的變量值
// 若相同,則修改內(nèi)存值var5+var4,否則繼續(xù)循環(huán)
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
假設(shè)線程A和線程B同時(shí)執(zhí)行g(shù)etAndAdd操作(分別跑在不同CPU上):
-
AtomicInteger里面的value原始值為3,即主內(nèi)存中AtomicInteger的value為3,根據(jù)Java內(nèi)存模型,線程A和線程B各自持有一份value的副本,值為3。 - 線程A通過
getIntVolatile(var1, var2)拿到value值3,這時(shí)線程A被掛起。 - 線程B也通過
getIntVolatile(var1, var2)方法獲取到value值3,運(yùn)氣好,線程B沒有被掛起,并執(zhí)行compareAndSwapInt方法比較內(nèi)存值也為3,成功修改內(nèi)存值為2。 - 這時(shí)線程A恢復(fù),執(zhí)行
compareAndSwapInt方法比較,發(fā)現(xiàn)自己手里的值(3)和內(nèi)存的值(2)不一致,說明該值已經(jīng)被其它線程提前修改過了,那只能重新來一遍了。 - 重新獲取value值,因?yàn)樽兞縱alue被volatile修飾,所以其它線程對(duì)它的修改,線程A總是能夠看到,線程A繼續(xù)執(zhí)行
compareAndSwapInt進(jìn)行比較替換,直到成功。
整個(gè)過程中,利用CAS保證了對(duì)于value的修改的并發(fā)安全
Unsafe類的CAS實(shí)現(xiàn)
繼續(xù)深入看看Unsafe類中的compareAndSwapInt方法實(shí)現(xiàn)。
public final native boolean compareAndSwapInt(Object paramObject, long paramLong, int paramInt1, int paramInt2);
Unsafe類中的compareAndSwapInt,是一個(gè)本地方法,該方法的實(shí)現(xiàn)位于unsafe.cpp中
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj);
// 1. 獲得變量value在內(nèi)存中的地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
// 2. 通過Atomic::cmpxchg實(shí)現(xiàn)比較替換
// x是即將更新的值,e為原內(nèi)存的值
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END
下面再來看看操作系統(tǒng)中Atomic::cmpxchg的實(shí)現(xiàn)
下面重點(diǎn)了解下LOCK_IF_MP和LOCK前綴就行了,其他我也不懂
- Linux的x86實(shí)現(xiàn):
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP(); //判斷是否是多處理器
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
#define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
- Windows的x86實(shí)現(xiàn):
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::isMP(); //判斷是否是多處理器
_asm {
mov edx, dest
mov ecx, exchange_value
mov eax, compare_value
LOCK_IF_MP(mp)
cmpxchg dword ptr [edx], ecx
}
}
// Adding a lock prefix to an instruction on MP machine
// VC++ doesn't like the lock prefix to be on a single line
// so we can't insert a label after the lock prefix.
// By emitting a lock prefix, we can define a label after it.
#define LOCK_IF_MP(mp) __asm cmp mp, 0 \
__asm je L0 \
__asm _emit 0xF0 \
__asm L0:
LOCK_IF_MP根據(jù)當(dāng)前系統(tǒng)是否為多核處理器決定是否為cmpxchg指令添加lock前綴。
- 如果是多處理器,為cmpxchg指令添加lock前綴。
- 反之,就省略lock前綴。(單處理器會(huì)不需要lock前綴提供的內(nèi)存屏障效果)
intel手冊(cè)對(duì)lock前綴的說明如下:
- 確保后續(xù)指令執(zhí)行的原子性。
在Pentium及之前的處理器中,帶有l(wèi)ock前綴的指令在執(zhí)行期間會(huì)鎖住總線,使得其它處理器暫時(shí)無法通過總線訪問內(nèi)存,很顯然,這個(gè)開銷很大。在新的處理器中,Intel使用緩存鎖定來保證指令執(zhí)行的原子性,緩存鎖定將大大降低lock前綴指令的執(zhí)行開銷。 - 禁止該指令與前面和后面的讀寫指令重排序。
- 把寫緩沖區(qū)的所有數(shù)據(jù)刷新到內(nèi)存中。
上面的第2點(diǎn)和第3點(diǎn)所具有的內(nèi)存屏障效果,保證了CAS同時(shí)具有volatile讀和volatile寫的內(nèi)存語義。
CAS的缺點(diǎn)
CAS存在三大問題:ABA問題,循環(huán)時(shí)間長(zhǎng)開銷大和只能保證一個(gè)共享變量的原子操作。其中ABA問題作為第三部分重點(diǎn)說明下。
循環(huán)時(shí)間長(zhǎng)開銷大
自旋CAS如果長(zhǎng)時(shí)間不成功,會(huì)給CPU帶來非常大的執(zhí)行開銷。如果JVM能支持處理器提供的pause指令那么效率會(huì)有一定的提升,pause指令有兩個(gè)作用,第一它可以延遲流水線執(zhí)行指令(de-pipeline),使CPU不會(huì)消耗過多的執(zhí)行資源,延遲的時(shí)間取決于具體實(shí)現(xiàn)的版本,在一些處理器上延遲時(shí)間是零。第二它可以避免在退出循環(huán)的時(shí)候因內(nèi)存順序沖突(memory order violation)而引起CPU流水線被清空(CPU pipeline flush),從而提高CPU的執(zhí)行效率。
只能保證一個(gè)共享變量的原子操作
當(dāng)對(duì)一個(gè)共享變量執(zhí)行操作時(shí),可以使用自旋CAS的方式來保證原子操作,但是對(duì)多個(gè)共享變量操作時(shí),自旋CAS就無法保證操作的原子性,這個(gè)時(shí)候就可以用鎖,或者有一個(gè)取巧的辦法,就是把多個(gè)共享變量合并成一個(gè)共享變量來操作。比如有兩個(gè)共享變量i=2,j=a,合并一下ij=2a,然后用CAS來操作ij。從Java1.5開始JDK提供了AtomicReference類來保證引用對(duì)象之間的原子性,可以把多個(gè)變量放在一個(gè)對(duì)象里來進(jìn)行CAS操作。
ABA問題
ABA問題:CAS需要在操作值的時(shí)候檢查下值有沒有發(fā)生變化,如果沒有發(fā)生變化則更新,但是如果一個(gè)值原來是A,變成了B,又變成了A,那么使用CAS進(jìn)行檢查時(shí)會(huì)認(rèn)為它的值沒有發(fā)生變化,但是實(shí)際上卻變化了。
一開始,我不明白ABA問題會(huì)有什么隱患,值沒發(fā)生變化會(huì)有什么影響嗎?
下面通過一個(gè)例子來說明ABA問題的隱患:

現(xiàn)有一個(gè)用單向鏈表實(shí)現(xiàn)的堆棧,棧頂為A,這時(shí)線程T1已經(jīng)知道A.next為B,然后希望用CAS將棧頂替換為B:
head.compareAndSet(A,B);
在T1執(zhí)行上面這條指令之前,線程T2介入,將A、B出棧,再push D、C、A,此時(shí)堆棧結(jié)構(gòu)如下圖,而對(duì)象B此時(shí)處于游離狀態(tài):

此時(shí)輪到線程T1執(zhí)行CAS操作,檢測(cè)發(fā)現(xiàn)棧頂仍為A,所以CAS成功,棧頂變?yōu)锽,但實(shí)際上B.next為null,所以此時(shí)的情況變?yōu)椋?br>

其中堆棧中只有B一個(gè)元素,C和D組成的鏈表不再存在于堆棧中,平白無故就把C、D丟掉了。
以上就是由于ABA問題帶來的隱患,各種樂觀鎖的實(shí)現(xiàn)中通常都會(huì)用版本戳version來對(duì)記錄或?qū)ο髽?biāo)記,避免并發(fā)操作帶來的問題,在Java中,
AtomicStampedReference<E>也實(shí)現(xiàn)了這個(gè)作用,它通過包裝[E,Integer]的元組來對(duì)對(duì)象標(biāo)記版本戳stamp,從而避免ABA問題。例如下面的代碼分別用AtomicInteger和AtomicStampedReference來對(duì)初始值為100的原子整型變量進(jìn)行更新,AtomicInteger會(huì)成功執(zhí)行CAS操作,而加上版本戳的AtomicStampedReference對(duì)于ABA問題會(huì)執(zhí)行CAS失?。?p>
public class ABA {
private static AtomicInteger atomicInt = new AtomicInteger(100);
private static AtomicStampedReference atomicStampedRef = new AtomicStampedReference(100, 0);
public static void main(String[] args) throws InterruptedException {
Thread intT1 = new Thread(new Runnable() {
public void run() {
atomicInt.compareAndSet(100, 101);
atomicInt.compareAndSet(101, 100);
}
});
Thread intT2 = new Thread(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
boolean c3 = atomicInt.compareAndSet(100, 101);
System.out.println(c3); // true
}
});
intT1.start();
intT2.start();
intT1.join();
intT2.join();
Thread refT1 = new Thread(new Runnable() {
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
}
atomicStampedRef.compareAndSet(100, 101, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
atomicStampedRef.compareAndSet(101, 100, atomicStampedRef.getStamp(), atomicStampedRef.getStamp() + 1);
}
});
Thread refT2 = new Thread(new Runnable() {
public void run() {
int stamp = atomicStampedRef.getStamp();
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
}
boolean c3 = atomicStampedRef.compareAndSet(100, 101, stamp, stamp + 1);
System.out.println(c3); // false
}
});
refT1.start();
refT2.start();
}
}
