在上一章中,我們知道多線程操作共享資源時,會出現(xiàn)三個問題:可見性、有序性以及原子性。
一般情況下,我們采用synchronized同步鎖(獨占鎖、互斥鎖),即同一時間只有一個線程能夠修改共享變量,其他線程必須等待。但是這樣的話就相當于單線程,體現(xiàn)不出來多線程的優(yōu)勢。
那么我們有沒有另一種方式來解決這三個問題呢?
在上一章中,我們提到了一個volatile關鍵字,它可以解決可見性和有序性的問題。而且如果操作的共享變量是基本數(shù)據類型,并且同一時間只對變量進行讀取或者寫入的操作,那么原子性問題也得到了解決,就不會產生多線程問題了。
但是通常,我們都要先讀取共享變量,然后操作共享變量,最后寫入共享變量,那么這個時候怎么保證整個操作的原子性呢?一種解決方式就是CAS技術。
CAS(Compare and Swap)即比較并交換。在講解這個之前,先了解兩個重要概念:悲觀鎖與樂觀鎖。
一. 悲觀鎖與樂觀鎖
- 悲觀鎖: 假定會發(fā)生并發(fā)沖突,即共享資源會被某個線程更改。所以當某個線程獲取共享資源時,會阻止別的線程獲取共享資源。也稱獨占鎖或者互斥鎖,例如java中的synchronized同步鎖。
- 樂觀鎖: 假設不會發(fā)生并發(fā)沖突,只有在最后更新共享資源的時候會判斷一下在此期間有沒有別的線程修改了這個共享資源。如果發(fā)生沖突就重試,直到沒有沖突,更新成功。CAS就是一種樂觀鎖實現(xiàn)方式。
悲觀鎖會阻塞其他線程。樂觀鎖不會阻塞其他線程,如果發(fā)生沖突,采用死循環(huán)的方式一直重試,直到更新成功。
二. CAS的實現(xiàn)原理
CAS的原理很簡單,包含三個值當前內存值(V)、預期原來的值(A)以及期待更新的值(B)。
如果內存位置V的值與預期原值A相匹配,那么處理器會自動將該位置值更新為新值B,返回true。否則處理器不做任何操作,返回false。
實現(xiàn)CAS最重要的一點,就是比較和交換操作的一致性,否則就會產生歧義。
比如當前線程比較成功后,準備更新共享變量值的時候,這個共享變量值被其他線程更改了,那么CAS函數(shù)必須返回false。
要實現(xiàn)這個需求,java中提供了Unsafe類,它提供了三個函數(shù),分別用來操作基本類型int和long,以及引用類型Object
public final native boolean compareAndSwapObject
(Object obj, long valueOffset, Object expect, Object update);
public final native boolean compareAndSwapInt
(Object obj, long valueOffset, int expect, int update);
public final native boolean compareAndSwapLong
(Object obj, long valueOffset, long expect, long update);
參數(shù)的意義:
- obj 和 valueOffset:表示這個共享變量的內存地址。這個共享變量是obj對象的一個成員屬性,valueOffset表示這個共享變量在obj類中的內存偏移量。所以通過這兩個參數(shù)就可以直接在內存中修改和讀取共享變量值。
- expect: 表示預期原來的值。
- update: 表示期待更新的值。
接下來我們來看看java并發(fā)框架下的atomic包是如何使用CAS的。
三. JUC并發(fā)框架下的原子類(atomic)
調用JUC并發(fā)框架下原子類的方法時,不需要考慮多線程問題。那么我們分析它是怎么解決多線程問題的。以AtomicInteger類為例
3.1 成員變量
// 通過它來實現(xiàn)CAS操作的。因為是int類型,所以調用它的compareAndSwapInt方法
private static final Unsafe unsafe = Unsafe.getUnsafe();
// value這個共享變量在AtomicInteger對象上內存偏移量,
// 通過它直接在內存中修改value的值,compareAndSwapInt方法中需要這個參數(shù)
private static final long valueOffset;
// 通過靜態(tài)代碼塊,在AtomicInteger類加載時就會調用
static {
try {
// 通過unsafe類,獲取value變量在AtomicInteger對象上內存偏移量
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
// 共享變量,AtomicInteger就保證了對它多線程操作的安全性。
// 使用volatile修飾,解決了可見性和有序性問題。
private volatile int value;
有三個重要的屬性:
- unsafe: 通過它實現(xiàn)CAS操作,因為共享變量是int類型,所以調用compareAndSwapInt方法。
- valueOffset: 共享變量value在AtomicInteger對象上內存偏移量
- value: 共享變量,使用volatile修飾,解決了可見性和有序性問題。
3.2 重要方法
3.2.1 get與set方法
// 直接讀取。因為是volatile關鍵子修飾的,總是能看到(任意線程)對這個volatile變量最新的寫入
public final int get() {
return value;
}
// 直接寫入。因為是volatile關鍵子修飾的,所以它修改value變量也會立即被別的線程讀取到。
public final void set(int newValue) {
value = newValue;
}
因為value變量是volatile關鍵字修飾的,它總是能讀取(任意線程)對這個volatile變量最新的寫入。它修改value變量也會立即被別的線程讀取到。
3.2.2 compareAndSet方法
// 如果value變量的當前值(內存值)等于期望值(expect),那么就把update賦值給value變量,返回true。
// 如果value變量的當前值(內存值)不等于期望值(expect),就什么都不做,返回false。
// 這個就是CAS操作,使用unsafe.compareAndSwapInt方法,保證整個操作過程的原子性
public final boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
通過調用unsafe的compareAndSwapInt方法實現(xiàn)CAS函數(shù)的。但是CAS函數(shù)只能保證比較并交換操作的原子性,但是更新操作并不一定會執(zhí)行。比如我們想讓共享變量value自增。
共享變量value自增是三個操作,1.讀取value值,2.計算value+1的值,3.將value+1的值賦值給value。分析這三個操作:
- 讀取value值,因為value變量是volatile關鍵字修飾的,能夠讀取到任意線程對它最后一次修改的值,所以沒問題。
- 計算value+1的值:這個時候就有問題了,可能在計算這個值的時候,其他線程更改了value值,因為沒有加同步鎖,所以其他線程可以更改value值。
- 將value+1的值賦值給value: 使用CAS函數(shù),如果返回false,說明在當前線程讀取value值到調用CAS函數(shù)方法前,共享變量被其他線程修改了,那么value+1的結果值就不是我們想要的了,因為要重新計算。
3.2.3 getAndAddInt方法
public final int getAndAddInt(Object obj, long valueOffset, int var) {
int expect;
// 利用循環(huán),直到更新成功才跳出循環(huán)。
do {
// 獲取value的最新值
expect = this.getIntVolatile(obj, valueOffset);
// expect + var表示需要更新的值,如果compareAndSwapInt返回false,說明value值被其他線程更改了。
// 那么就循環(huán)重試,再次獲取value最新值expect,然后再計算需要更新的值expect + var。直到更新成功
} while(!this.compareAndSwapInt(obj, valueOffset, expect, expect + var));
// 返回當前線程在更改value成功后的,value變量原先值。并不是更改后的值
return expect;
}
這個方法在Unsafe類中,利用do_while循環(huán),先利用當前值,計算更新值,然后通過compareAndSwapInt方法設置value變量,如果compareAndSwapInt方法返回失敗,表示value變量的值被別的線程更改了,所以循環(huán)獲取value變量最新值,再通過compareAndSwapInt方法設置value變量。直到設置成功。跳出循環(huán),返回更新前的值。
// 將value的值當前值的基礎上加1,并返回當前值
public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}
// 將value的值當前值的基礎上加-1,并返回當前值
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
// 將value的值當前值的基礎上加delta,并返回當前值
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
// 將value的值當前值的基礎上加1,并返回更新后的值(即當前值加1)
public final int incrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
}
// 將value的值當前值的基礎上加-1,并返回更新后的值(即當前值加-1)
public final int decrementAndGet() {
return unsafe.getAndAddInt(this, valueOffset, -1) - 1;
}
// 將value的值當前值的基礎上加delta,并返回更新后的值(即當前值加delta)
public final int addAndGet(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta) + delta;
}
都是利用unsafe.getAndAddInt方法實現(xiàn)的。
四.重要示例
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
class Data {
AtomicInteger num;
public Data(int num) {
this.num = new AtomicInteger(num);
}
public int getAndDecrement() {
return num.getAndDecrement();
}
}
class MyRun implements Runnable {
private Data data;
// 用來記錄所有賣出票的編號
private List<Integer> list;
private CountDownLatch latch;
public MyRun(Data data, List<Integer> list, CountDownLatch latch) {
this.data = data;
this.list = list;
this.latch = latch;
}
@Override
public void run() {
try {
action();
} finally {
// 釋放latch共享鎖
latch.countDown();
}
}
// 進行買票操作,注意這里沒有使用data.num>0作為判斷條件,直到賣完線程退出。
// 那么做會導致這兩處使用了共享變量data.num,那么做多線程同步時,就要考慮更多條件。
// 這里只for循環(huán)了5次,表示每個線程只賣5張票,并將所有賣出去編號存入list集合中。
public void action() {
for (int i = 0; i < 5; i++) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
int newNum = data.getAndDecrement();
System.out.println("線程"+Thread.currentThread().getName()+" num=="+newNum);
list.add(newNum);
}
}
}
public class ThreadTest {
public static void startThread(Data data, String name, List<Integer> list,CountDownLatch latch) {
Thread t = new Thread(new MyRun(data, list, latch), name);
t.start();
}
public static void main(String[] args) {
// 使用CountDownLatch來讓主線程等待子線程都執(zhí)行完畢時,才結束
CountDownLatch latch = new CountDownLatch(6);
long start = System.currentTimeMillis();
// 這里用并發(fā)list集合
List<Integer> list = new CopyOnWriteArrayList();
Data data = new Data(30);
startThread(data, "t1", list, latch);
startThread(data, "t2", list, latch);
startThread(data, "t3", list, latch);
startThread(data, "t4", list, latch);
startThread(data, "t5", list, latch);
startThread(data, "t6", list, latch);
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
// 處理一下list集合,進行排序和翻轉
Collections.sort(list);
Collections.reverse(list);
System.out.println(list);
long time = System.currentTimeMillis() - start;
// 輸出一共花費的時間
System.out.println("\n主線程結束 time=="+time);
}
}
結果輸出
線程t1 num==30
線程t2 num==29
線程t3 num==28
線程t5 num==26
線程t4 num==27
線程t6 num==25
線程t1 num==24
線程t2 num==23
線程t6 num==22
線程t4 num==19
線程t5 num==20
線程t3 num==21
線程t2 num==18
線程t3 num==13
線程t5 num==14
線程t1 num==15
線程t6 num==17
線程t4 num==16
線程t2 num==12
線程t1 num==9
線程t5 num==10
線程t3 num==11
線程t4 num==7
線程t6 num==8
線程t5 num==5
線程t4 num==4
線程t1 num==6
線程t2 num==3
線程t3 num==2
線程t6 num==1
[30, 29, 28, 27, 26, 25, 24, 23, 22, 21, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
主線程結束 time==58
我們使用AtomicInteger,代替同步鎖來解決多線程安全的。