無鎖即無障礙的運(yùn)行, 所有線程都可以到達(dá)臨界區(qū), 接近于無等待.無鎖采用CAS(compare and swap)算法來處理線程沖突, 其原理如下
CAS原理
CAS包含3個(gè)參數(shù)CAS(V,E,N).V表示要更新的變量, E表示預(yù)期值, N表示新值.僅當(dāng)V值等于E值時(shí), 才會(huì)將V的值設(shè)為N, 如果V值和E值不同, 則說明已經(jīng)有其他線程做了更新, 則當(dāng)前線程什么都不做. 最后, CAS返回當(dāng)前V的真實(shí)值. CAS操作是抱著樂觀的態(tài)度進(jìn)行的, 它總是認(rèn)為自己可以成功完成操作.
當(dāng)多個(gè)線程同時(shí)使用CAS操作一個(gè)變量時(shí), 只有一個(gè)會(huì)勝出, 并成功更新, 其余均會(huì)失敗.失敗的線程不會(huì)被掛起,僅是被告知失敗, 并且允許再次嘗試, 當(dāng)然也允許失敗的線程放棄操作.基于這樣的原理, CAS操作即時(shí)沒有鎖,也可以發(fā)現(xiàn)其他線程對(duì)當(dāng)前線程的干擾, 并進(jìn)行恰當(dāng)?shù)奶幚?
CPU指令
另外, 雖然上述步驟繁多, 實(shí)際上CAS整一個(gè)操作過程是一個(gè)原子操作, 它是由一條CPU指令完成的,從指令層保證操作可靠, 不會(huì)被多線程干擾.
無鎖與volatile
當(dāng)給變量加了volatile關(guān)鍵字, 表示該變量對(duì)所有線程可見, 但不保證原子性.
AtomicInteger
// 取得當(dāng)前值
public final int get()
// 設(shè)置當(dāng)前值
public final void set(int newValue)
// 設(shè)置新值,并返回舊值
public final int getAndSet(int newValue)
// 如果當(dāng)前值為expect,則設(shè)置為u
public final boolean compareAndSet(int expect, int u)
// 當(dāng)前值加1,返回舊值
public final int getAndIncrement()
// 當(dāng)前值減1,返回舊值
public final int getAndDecrement()
// 當(dāng)前值增加delta,返回舊值
public final int getAndAdd(int delta)
// 當(dāng)前值加1,返回新值
public final int incrementAndGet()
// 當(dāng)前值減1,返回新值
public final int decrementAndGet()
// 當(dāng)前值增加delta,返回新值
public final int addAndGet(int delta)
// 封裝了一個(gè)int對(duì)其加減
private volatile int value;
.......
public final boolean compareAndSet(int expect, int update) {
// 通過unsafe 基于CPU的CAS指令來實(shí)現(xiàn), 可以認(rèn)為無阻塞.
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
.......
public final int getAndIncrement() {
for (;;) {
// 當(dāng)前值
int current = get();
// 預(yù)期值
int next = current + 1;
if (compareAndSet(current, next)) {
// 如果加成功了, 則返回當(dāng)前值
return current;
}
// 如果加失敗了, 說明其他線程已經(jīng)修改了數(shù)據(jù), 與期望不相符,
// 則繼續(xù)無限循環(huán), 直到成功. 這種樂觀鎖, 理論上只要等兩三個(gè)時(shí)鐘周期就可以設(shè)值成功
// 相比于直接通過synchronized獨(dú)占鎖的方式操作int, 要大大節(jié)約等待時(shí)間.
}
}
demo
使用10個(gè)線程打印0-10000, 最終得到結(jié)果10w.
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicIntegerDemo {
static AtomicInteger i = new AtomicInteger();
public static class AddThread implements Runnable {
public void run() {
for (int k = 0; k < 10000; k++) {
i.incrementAndGet();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts = new Thread[10];
for (int k = 0; k < 10; k++) {
ts[k] = new Thread(new AddThread());
}
for (int k = 0; k < 10; k++) {
ts[k].start();
}
for (int k = 0; k < 10; k++) {
ts[k].join();
}
System.out.println(i);
}
}
Unsafe
Unsafe類是在sun.misc包下, 可以用于一些非安全的操作,比如:
根據(jù)偏移量設(shè)置值, 線程park(), 底層的CAS操作等等.
// 獲取類實(shí)例中變量的偏移量
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
// 基于偏移量對(duì)值進(jìn)行操作
unsafe.compareAndSwapInt(this, valueOffset, expect, update);
主要接口
// 獲得給定對(duì)象偏移量上的int值
public native int getInt(Object o, long offset);
// 設(shè)置給定對(duì)象偏移量上的int值
public native void putInt(Object o, long offset, int x);
// 獲得字段在對(duì)象中的偏移量
public native long objectFieldOffset(Field f);
// 設(shè)置給定對(duì)象的int值,使用volatile語義
public native void putIntVolatile(Object o, long offset, int x);
// 獲得給定對(duì)象對(duì)象的int值,使用volatile語義
public native int getIntVolatile(Object o, long offset);
// 和putIntVolatile()一樣,但是它要求被操作字段就是volatile類型的
public native void putOrderedInt(Object o, long offset, int x);
AtomicReference
與AtomicInteger類似, 只是里面封裝了一個(gè)對(duì)象, 而不是int, 對(duì)引用進(jìn)行修改
主要接口
1 get()
2 set(V)
3 compareAndSet()
4 getAndSet(V)
demo
使用10個(gè)線程, 同時(shí)嘗試修改AtomicReference中的String, 最終只有一個(gè)線程可以成功.
import java.util.concurrent.atomic.AtomicReference;
public class AtomicReferenceTest {
public final static AtomicReference<String> attxnicStr = new AtomicReference<String>("abc");
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
try {
Thread.sleep(Math.abs((int) (Math.random() * 100)));
} catch (InterruptedException e) {
e.printStackTrace();
}
if (attxnicStr.compareAndSet("abc", "def")) {
System.out.println("Thread:" + Thread.currentThread().getId() + " change value to " + attxnicStr.get());
} else {
System.out.println("Thread:" + Thread.currentThread().getId() + " change failed!");
}
}
}.start();
}
}
}
AtomicStampedReference
也是封裝了一個(gè)引用, 主要解決ABA問題.
ABA問題
線程一準(zhǔn)備用CAS將變量的值由A替換為B, 在此之前線程二將變量的值由A替換為C, 線程三又將C替換為A, 然后線程一執(zhí)行CAS時(shí)發(fā)現(xiàn)變量的值仍然為A, 所以線程一CAS成功.
// 比較設(shè)置 參數(shù)依次為:期望值 寫入新值 期望時(shí)間戳 新時(shí)間戳
public boolean compareAndSet(V expectedReference,V newReference,int expectedStamp,int newStamp)
// 獲得當(dāng)前對(duì)象引用
public V getReference()
// 獲得當(dāng)前時(shí)間戳
public int getStamp()
// 設(shè)置當(dāng)前對(duì)象引用和時(shí)間戳
public void set(V newReference, int newStamp)
源碼分析
// 內(nèi)部封裝了一個(gè)Pair對(duì)象, 每次對(duì)對(duì)象操作的時(shí)候, stamp + 1
private static class Pair<T> {
final T reference;
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
// 進(jìn)行cas操作的時(shí)候, 會(huì)對(duì)比stamp的值
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
后臺(tái)使用多個(gè)線程對(duì)用戶充值, 要求只能充值一次
public class AtomicStampedReferenceDemo {
static AtomicStampedReference<Integer> money=new AtomicStampedReference<Integer>(19,0);
public staticvoid main(String[] args) {
//模擬多個(gè)線程同時(shí)更新后臺(tái)數(shù)據(jù)庫,為用戶充值
for(int i = 0 ; i < 3 ; i++) {
final int timestamp=money.getStamp();
newThread() {
public void run() {
while(true){
while(true){
Integerm=money.getReference();
if(m<20){
if(money.compareAndSet(m,m+20,timestamp,timestamp+1)){
System.out.println("余額小于20元,充值成功,余額:"+money.getReference()+"元");
break;
}
}else{
//System.out.println("余額大于20元,無需充值");
break ;
}
}
}
}
}.start();
}
//用戶消費(fèi)線程,模擬消費(fèi)行為
new Thread() {
publicvoid run() {
for(int i=0;i<100;i++){
while(true){
int timestamp=money.getStamp();
Integer m=money.getReference();
if(m>10){
System.out.println("大于10元");
if(money.compareAndSet(m, m-10,timestamp,timestamp+1)){
System.out.println("成功消費(fèi)10元,余額:"+money.getReference());
break;
}
}else{
System.out.println("沒有足夠的金額");
break;
}
}
try {Thread.sleep(100);} catch (InterruptedException e) {}
}
}
}.start();
}
}
AtomicIntegerArray
支持無鎖的數(shù)組
// 獲得數(shù)組第i個(gè)下標(biāo)的元素
public final int get(int i)
// 獲得數(shù)組的長度
public final int length()
// 將數(shù)組第i個(gè)下標(biāo)設(shè)置為newValue,并返回舊的值
public final int getAndSet(int i, int newValue)
// 進(jìn)行CAS操作,如果第i個(gè)下標(biāo)的元素等于expect,則設(shè)置為update,設(shè)置成功返回true
public final boolean compareAndSet(int i, int expect, int update)
// 將第i個(gè)下標(biāo)的元素加1
public final int getAndIncrement(int i)
// 將第i個(gè)下標(biāo)的元素減1
public final int getAndDecrement(int i)
// 將第i個(gè)下標(biāo)的元素增加delta(delta可以是負(fù)數(shù))
public final int getAndAdd(int i, int delta)
// 數(shù)組本身基地址
private static final int base = unsafe.arrayBaseOffset(int[].class);
// 封裝了一個(gè)數(shù)組
private final int[] array;
static {
// 數(shù)組中對(duì)象的寬度, int類型, 4個(gè)字節(jié), scale = 4;
int scale = unsafe.arrayIndexScale(int[].class);
if ((scale & (scale - 1)) != 0)
throw new Error("data type scale not a power of two");
// 前導(dǎo)0 : 一個(gè)數(shù)字轉(zhuǎn)為二進(jìn)制后, 他前面0的個(gè)數(shù)
// 對(duì)于4來講, 他就是00000000 00000000 00000000 00000100, 他的前導(dǎo)0 就是29
// 所以shift = 2
shift = 31 - Integer.numberOfLeadingZeros(scale);
}
// 獲取第i個(gè)元素
public final int get(int i) {
return getRaw(checkedByteOffset(i));
}
// 第i個(gè)元素, 在數(shù)組中的偏移量是多少
private long checkedByteOffset(int i) {
if (i < 0 || i >= array.length)
throw new IndexOutOfBoundsException("index " + i);
return byteOffset(i);
}
// base : 數(shù)組基地址, i << shift, 其實(shí)就是i * 4, 因?yàn)檫@邊是int array.
private static long byteOffset(int i) {
// i * 4 + base
return ((long) i << shift) + base;
}
// 根據(jù)偏移量從數(shù)組中獲取數(shù)據(jù)
private int getRaw(long offset) {
return unsafe.getIntVolatile(array, offset);
}
Demo
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicArrayDemo {
static AtomicIntegerArray arr = new AtomicIntegerArray(10);
public static class AddThread implements Runnable {
public void run() {
for (int k = 0; k < 10000; k++) {
arr.incrementAndGet(k % arr.length());
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread[] ts = new Thread[10];
for (int k = 0; k < 10; k++) {
ts[k] = new Thread(new AddThread());
}
for (int k = 0; k < 10; k++) {
ts[k].start();
}
for (int k = 0; k < 10; k++) {
ts[k].join();
}
System.out.println(arr);
}
}
跟之前的AtomicInteger沒太多區(qū)別,只是需要對(duì)于傳入的i需要先判斷在數(shù)組length范圍內(nèi),再轉(zhuǎn)換成內(nèi)存地址。類中其他方法類同。