一、cas自旋原理
1、概念
CAS的全稱(chēng)是Compare-And-Swap,它是CPU并發(fā)原語(yǔ),原語(yǔ)的執(zhí)行必須是連續(xù)的,在執(zhí)行過(guò)程中不允許被中斷,也就是說(shuō)CAS是一條CPU的原子指令,不會(huì)造成所謂的數(shù)據(jù)不一致性問(wèn)題,是線(xiàn)程安全的。CAS并發(fā)原語(yǔ)體現(xiàn)在Java語(yǔ)言中就是sun.misc.Unsafe類(lèi)的各個(gè)方法,調(diào)用UnSafe類(lèi)中的CAS方法。從其命名可以發(fā)現(xiàn),其本質(zhì)就是比較和替換。
2、手動(dòng)實(shí)現(xiàn)一個(gè)自旋鎖
private static int num = 0;
public static boolean add(int source, int target) {
int count = 0;
while (true) {
if (num == source) {
num = target;
return true;
} else {
count++;
if (count == 10) {
return false;
}
}
}
}
public static void main(String[] args) {
//線(xiàn)程?hào)艡?,等待所有線(xiàn)程準(zhǔn)備完畢后執(zhí)行
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
//內(nèi)部使用ReentrantLock重入鎖
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
boolean flag = add(0, 1);
if (flag) {
System.out.println(Thread.currentThread().getName() + "更新成功==================");
} else {
System.out.println(Thread.currentThread().getName() + "更新失敗");
}
}).start();
}
}
結(jié)果:只有一條更新成功
Thread-0更新失敗
Thread-7更新失敗
Thread-6更新失敗
Thread-5更新失敗
Thread-4更新失敗
Thread-3更新失敗
Thread-8更新成功==================
Thread-2更新失敗
Thread-1更新失敗
Thread-9更新失敗
3、底層核心
sun.misc.Unsafe是CAS的底層核心類(lèi),Unsafe類(lèi)中所有方法都是native修飾的,也就是說(shuō)Unsafe類(lèi)中的方法都直接調(diào)用操作系統(tǒng)底層資源執(zhí)行相應(yīng)任務(wù)。
以ava.util.concurrent.atomic.AtomicInteger的getAndIncrement方法源碼分析
/**
* 當(dāng)前值自增1
**/
public final int getAndIncrement() {
//valueOffset系統(tǒng)偏移量
return unsafe.getAndAddInt(this, valueOffset, 1);
}
/**
* 獲取當(dāng)前值var5,并加var4
**/
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
//獲取主內(nèi)存當(dāng)前值var5
var5 = this.getIntVolatile(var1, var2);
//cas循環(huán)等待替換,var5+var4是替換后的值
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
4、CAS缺點(diǎn)
1)循環(huán)時(shí)間長(zhǎng)CPU開(kāi)銷(xiāo)大
2)只能保證一個(gè)共享變量的原子操作
3)會(huì)引發(fā)ABA問(wèn)題
5、ABA問(wèn)題介紹及解決
簡(jiǎn)單通過(guò)代碼實(shí)現(xiàn)下ABA問(wèn)題,線(xiàn)程Thread-0先將num修改為了1,然后又將num修改成了0;線(xiàn)程Thread-1則認(rèn)為當(dāng)前num一直沒(méi)有經(jīng)過(guò)改變,而將其修改成了10。這里我們可以發(fā)現(xiàn)問(wèn)題,這時(shí)候num雖然值仍然是0,但是其實(shí)已經(jīng)不是最開(kāi)始那個(gè)0了,這樣在某些情況下就會(huì)導(dǎo)致問(wèn)題。
private static int num = 0;
public static boolean add(int source, int target) {
int count = 0;
while (true) {
if (num == source) {
num = target;
return true;
} else {
count++;
if (count == 10) {
return false;
}
}
}
}
public static void main(String[] args) {
new Thread(() -> {
//將數(shù)據(jù)更新為1
if (add(0, 1)) {
System.out.println(Thread.currentThread().getName() + "更新num為1成功");
} else {
System.out.println(Thread.currentThread().getName() + "更新num為1失敗");
}
//將數(shù)據(jù)更新為0
if (add(1, 0)) {
System.out.println(Thread.currentThread().getName() + "更新num為0成功");
} else {
System.out.println(Thread.currentThread().getName() + "更新num為0失敗");
}
}).start();
new Thread(() -> {
//將數(shù)據(jù)更新為10
if (add(0, 10)) {
System.out.println(Thread.currentThread().getName() + "更新num為10成功");
} else {
System.out.println(Thread.currentThread().getName() + "更新num為10失敗");
}
}).start();
}
結(jié)果:
Thread-0更新num為1成功
Thread-0更新num為0成功
Thread-1更新num為10成功
上面我自行實(shí)現(xiàn)的自旋鎖過(guò)程,下面看一個(gè)atomic原子類(lèi)的實(shí)現(xiàn)。非常簡(jiǎn)單
public static void main(String[] args) {
AtomicInteger atomicInteger = new AtomicInteger(0);
new Thread(()->{
atomicInteger.compareAndSet(0,1);
atomicInteger.compareAndSet(1,0);
}).start();
new Thread(()->{
boolean b = atomicInteger.compareAndSet(0, 10);
if (b){
System.out.println("更新為10成功");
}else{
System.out.println("更新為10失敗");
}
}).start();
}
結(jié)果:
更新為10成功
ABA問(wèn)題的解決:其實(shí)問(wèn)題的本質(zhì)原因在于我們的樂(lè)觀鎖只比較了值是否相等,可以通過(guò)增加其他屬性的比較,例如時(shí)間戳、版本號(hào)等。這里我們采用AtomicStampedReference類(lèi)解決該問(wèn)題。
/**
* 構(gòu)造方法
* @param initialRef 初始值
* @param initialStamp 初始版本戳
*/
public AtomicStampedReference(V initialRef, int initialStamp) {
pair = Pair.of(initialRef, initialStamp);
}
/**
* CAS方法
* @param expectedReference 初始值
* @param newReference 替換值
* @param expectedStamp 初始版本戳
* @param newStamp 新版本戳
* @return
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
AtomicStampedReference.Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, AtomicStampedReference.Pair.of(newReference, newStamp)));
}
實(shí)例:
public static void main(String[] args) {
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(0, 0);
new Thread(() -> {
atomicStampedReference.compareAndSet(0, 1, 0, 1);
atomicStampedReference.compareAndSet(1, 0, 1, 2);
}).start();
new Thread(() -> {
boolean b = atomicStampedReference.compareAndSet(0, 1, 0, 1);
if (b) {
System.out.println("更新為10成功");
} else {
System.out.println("更新為10失敗");
}
}).start();
}
結(jié)果:
更新為10失敗
二、ReentrantLock可重入鎖
在上一篇基礎(chǔ)概念中,我們使用ReentrantLock實(shí)現(xiàn)了線(xiàn)程同步問(wèn)題,代碼如下:
/**
* 庫(kù)存
*/
static class Inventory {
//初始化ReentrantLock實(shí)例
Lock lock = new ReentrantLock();
//庫(kù)存數(shù)量
private int num = 100;
//增加庫(kù)存
public void add(int n) {
//加鎖
lock.lock();
try {
num += n;
System.out.println("增加庫(kù)存后的數(shù)量=" + num);
} finally {
//釋放鎖
lock.unlock();
}
}
//減少庫(kù)存
public void sub(int n) {
//加鎖
lock.lock();
try {
num -= n;
System.out.println("減少庫(kù)存后的數(shù)量=" + num);
} finally {
//釋放鎖
lock.unlock();
}
}
}
public static void main(String[] args) {
Inventory inventory = new Inventory();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
inventory.add(1);
}).start();
}
for (int i = 0; i < 100; i++) {
new Thread(() -> {
inventory.sub(1);
}).start();
}
}
1、代碼內(nèi)部依賴(lài)關(guān)系
我們借ReentrantLock看下java內(nèi)鎖的底層結(jié)構(gòu),后續(xù)我們進(jìn)行逐個(gè)節(jié)點(diǎn)的分析

2、接下來(lái)我們來(lái)分析下底層原理:
ReentrantLock位于java.util.concurrent.locks包下,其實(shí)中包含三個(gè)內(nèi)部類(lèi)。
Syn:繼承AbstractQueuedSynchronizer(AQS),用于實(shí)現(xiàn)同步機(jī)制。
FairSync:公平鎖對(duì)象,繼承Syn。
NonfairSync:非公平鎖對(duì)象,繼承Syn。

2.1 AbstractQueuedSynchronizer(AQS)
用來(lái)構(gòu)建鎖或其他同步組件的框架,是JDK中實(shí)現(xiàn)并發(fā)編程的核心,它提供了一個(gè)基于FIFO隊(duì)列,平時(shí)我們工作中經(jīng)常用到的ReentrantLock,CountDownLatch等都是基于它來(lái)實(shí)現(xiàn)的。
分析其源碼,有兩個(gè)內(nèi)部類(lèi)

Node:同步隊(duì)列的模型
ConditionObject:等待隊(duì)列的模型
逐一看下其內(nèi)部源碼:
Node源碼:
static final class Node {
// 模式,分為共享與獨(dú)占
// 共享模式
static final Node SHARED = new Node();
// 獨(dú)占模式
static final Node EXCLUSIVE = null;
// 結(jié)點(diǎn)狀態(tài)
// CANCELLED,值為1,表示當(dāng)前的線(xiàn)程被取消
// SIGNAL,值為-1,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)包含的線(xiàn)程需要運(yùn)行,也就是unpark
// CONDITION,值為-2,表示當(dāng)前節(jié)點(diǎn)在等待condition,也就是在condition隊(duì)列中
// PROPAGATE,值為-3,表示當(dāng)前場(chǎng)景下后續(xù)的acquireShared能夠得以執(zhí)行
// 值為0,表示當(dāng)前節(jié)點(diǎn)在sync隊(duì)列中,等待著獲取鎖
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 結(jié)點(diǎn)狀態(tài)
volatile int waitStatus;
// 前驅(qū)結(jié)點(diǎn)
volatile Node prev;
// 后繼結(jié)點(diǎn)
volatile Node next;
// 結(jié)點(diǎn)所對(duì)應(yīng)的線(xiàn)程
volatile Thread thread;
// 下一個(gè)等待者
Node nextWaiter;
// 結(jié)點(diǎn)是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 獲取前驅(qū)結(jié)點(diǎn),若前驅(qū)結(jié)點(diǎn)為空,拋出異常
final Node predecessor() throws NullPointerException {
// 保存前驅(qū)結(jié)點(diǎn)
Node p = prev;
if (p == null) // 前驅(qū)結(jié)點(diǎn)為空,拋出異常
throw new NullPointerException();
else // 前驅(qū)結(jié)點(diǎn)不為空,返回
return p;
}
// 無(wú)參構(gòu)造函數(shù)
Node() { // Used to establish initial head or SHARED marker
}
// 構(gòu)造函數(shù)
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 構(gòu)造函數(shù)
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
ConditionObject 源碼
實(shí)現(xiàn)了condition接口,關(guān)于condition的學(xué)習(xí)請(qǐng)看下一小節(jié):三、Condition條件等待與通知
// 內(nèi)部類(lèi)
public class ConditionObject implements Condition, java.io.Serializable {
// 版本號(hào)
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
// condition隊(duì)列的頭結(jié)點(diǎn)
private transient Node firstWaiter;
/** Last node of condition queue. */
// condition隊(duì)列的尾結(jié)點(diǎn)
private transient Node lastWaiter;
/**
* 構(gòu)造函數(shù)
*/
public ConditionObject() { }
/**
* 添加新的waiter到wait隊(duì)列
*/
private Node addConditionWaiter() {
// 保存尾結(jié)點(diǎn)
Node t = lastWaiter;
// 尾結(jié)點(diǎn)不為空,并且尾結(jié)點(diǎn)的狀態(tài)不為CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除狀態(tài)不為CONDITION的結(jié)點(diǎn),對(duì)firstWaiter和lastWaiter重新賦值
unlinkCancelledWaiters();
// 將最后一個(gè)結(jié)點(diǎn)重新賦值給t
t = lastWaiter;
}
// 新建一個(gè)結(jié)點(diǎn)
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 尾結(jié)點(diǎn)為空
if (t == null)
// 設(shè)置condition隊(duì)列的頭結(jié)點(diǎn)
firstWaiter = node;
else
// 設(shè)置為節(jié)點(diǎn)的nextWaiter域?yàn)閚ode結(jié)點(diǎn)
t.nextWaiter = node;
// 更新condition隊(duì)列的尾結(jié)點(diǎn)
lastWaiter = node;
return node;
}
/**
* 轉(zhuǎn)移first節(jié)點(diǎn)到sync隊(duì)列
*/
private void doSignal(Node first) {
// 循環(huán)
do {
// 將下一個(gè)節(jié)點(diǎn)設(shè)為首節(jié)點(diǎn),如果為空
if ( (firstWaiter = first.nextWaiter) == null)
// 設(shè)置尾結(jié)點(diǎn)為空
lastWaiter = null;
// 設(shè)置first結(jié)點(diǎn)的nextWaiter域
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 將結(jié)點(diǎn)從condition隊(duì)列轉(zhuǎn)移到sync隊(duì)列失敗并且condition隊(duì)列中的頭結(jié)點(diǎn)不為空,一直循環(huán)
}
/**
* 轉(zhuǎn)移所有等待隊(duì)列的節(jié)點(diǎn)到同步隊(duì)列
*/
private void doSignalAll(Node first) {
// condition隊(duì)列的頭結(jié)點(diǎn)尾結(jié)點(diǎn)都設(shè)置為空
lastWaiter = firstWaiter = null;
// 循環(huán)
do {
// 獲取first結(jié)點(diǎn)的nextWaiter域結(jié)點(diǎn)
Node next = first.nextWaiter;
// 設(shè)置first結(jié)點(diǎn)的nextWaiter域?yàn)榭? first.nextWaiter = null;
// 將first結(jié)點(diǎn)從condition隊(duì)列轉(zhuǎn)移到sync隊(duì)列
transferForSignal(first);
// 重新設(shè)置first
first = next;
} while (first != null);
}
/**
* 過(guò)濾掉狀態(tài)不為CONDITION的節(jié)點(diǎn)
* 對(duì)firstWaiter和lastWaiter重新賦值
**/
private void unlinkCancelledWaiters() {
// 保存condition隊(duì)列頭結(jié)點(diǎn)
Node t = firstWaiter;
Node trail = null;
while (t != null) {
// 下一個(gè)結(jié)點(diǎn)
Node next = t.nextWaiter;
// t結(jié)點(diǎn)的狀態(tài)不為CONDTION狀態(tài)
if (t.waitStatus != Node.CONDITION) {
// 設(shè)置t節(jié)點(diǎn)的額nextWaiter域?yàn)榭? t.nextWaiter = null;
if (trail == null) // trail為空
// 重新設(shè)置condition隊(duì)列的頭結(jié)點(diǎn)
firstWaiter = next;
else
// 設(shè)置trail結(jié)點(diǎn)的nextWaiter域?yàn)閚ext結(jié)點(diǎn)
trail.nextWaiter = next;
if (next == null) // next結(jié)點(diǎn)為空
// 設(shè)置condition隊(duì)列的尾結(jié)點(diǎn)
lastWaiter = trail;
}
else // t結(jié)點(diǎn)的狀態(tài)為CONDTION狀態(tài)
// 設(shè)置trail結(jié)點(diǎn)
trail = t;
// 設(shè)置t結(jié)點(diǎn)
t = next;
}
}
/**
* 實(shí)現(xiàn)Condition接口的signal方法
*/
public final void signal() {
if (!isHeldExclusively()) // 不被當(dāng)前線(xiàn)程獨(dú)占,拋出異常
throw new IllegalMonitorStateException();
// 保存condition隊(duì)列頭結(jié)點(diǎn)
Node first = firstWaiter;
if (first != null) // 頭結(jié)點(diǎn)不為空
// 喚醒一個(gè)等待線(xiàn)程
doSignal(first);
}
/**
* 實(shí)現(xiàn)Condition的signalAll方法,喚醒所有線(xiàn)程
*/
public final void signalAll() {
if (!isHeldExclusively()) // 不被當(dāng)前線(xiàn)程獨(dú)占,拋出異常
throw new IllegalMonitorStateException();
// 保存condition隊(duì)列頭結(jié)點(diǎn)
Node first = firstWaiter;
if (first != null) // 頭結(jié)點(diǎn)不為空
// 喚醒所有等待線(xiàn)程
doSignalAll(first);
}
/**
* 與await()區(qū)別在于,使用await方法,調(diào)用interrupt()中斷后會(huì)報(bào)錯(cuò),而該方法不會(huì)報(bào)錯(cuò)。
*/
public final void awaitUninterruptibly() {
// 添加一個(gè)結(jié)點(diǎn)到等待隊(duì)列
Node node = addConditionWaiter();
// 獲取釋放的狀態(tài)
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) { //
// 阻塞當(dāng)前線(xiàn)程
LockSupport.park(this);
if (Thread.interrupted()) // 當(dāng)前線(xiàn)程被中斷
// 設(shè)置interrupted狀態(tài)
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //
selfInterrupt();
}
/**
* 等待,當(dāng)前線(xiàn)程在接到信號(hào)或被中斷之前一直處于等待狀態(tài)
*/
public final void await() throws InterruptedException {
// 當(dāng)前線(xiàn)程被中斷,拋出異常
if (Thread.interrupted())
throw new InterruptedException();
// 將當(dāng)前線(xiàn)程包裝成Node,尾插入到等待隊(duì)列中
Node node = addConditionWaiter();
// 釋放當(dāng)前線(xiàn)程所占用的lock,在釋放的過(guò)程中會(huì)喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 當(dāng)前線(xiàn)程進(jìn)入到等待狀態(tài)
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 檢查結(jié)點(diǎn)等待時(shí)的中斷類(lèi)型
break;
}
// 自旋等待獲取到同步狀態(tài)(即獲取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 處理被中斷的情況
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 等待,當(dāng)前線(xiàn)程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 等待,當(dāng)前線(xiàn)程在接到信號(hào)、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 等待,當(dāng)前線(xiàn)程在接到信號(hào)、被中斷或到達(dá)指定等待時(shí)間之前一直處于等待狀態(tài)。此方法在行為上等
* 效于:awaitNanos(unit.toNanos(time)) > 0
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
// 1. 將當(dāng)前線(xiàn)程包裝成Node,尾插入到等待隊(duì)列中
Node node = addConditionWaiter();
// 2. 釋放當(dāng)前線(xiàn)程所占用的lock,在釋放的過(guò)程中會(huì)喚醒同步隊(duì)列中的下一個(gè)節(jié)點(diǎn)
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
大概了解源碼后我們通過(guò)圖看下同步隊(duì)列和等待隊(duì)列的關(guān)系:

同步隊(duì)列是一個(gè)雙向的鏈表,每個(gè)節(jié)點(diǎn)會(huì)存儲(chǔ)下一個(gè)節(jié)點(diǎn)的信息,是一種隊(duì)列的實(shí)現(xiàn)。
等待隊(duì)列是一個(gè)單向的鏈表,只有使用到Condition時(shí)才會(huì)存在,并且會(huì)存在多個(gè)。
當(dāng)?shù)却?duì)列的線(xiàn)程被喚醒會(huì)被添加到同步隊(duì)列的尾部。
2.2 公平鎖與非公平鎖
二者的區(qū)別主要在于獲取鎖是否和排隊(duì)順序有關(guān)。當(dāng)鎖唄一個(gè)線(xiàn)程持有,其他嘗試獲取鎖的線(xiàn)程會(huì)被掛起,加到等待隊(duì)列中,先被掛起的在隊(duì)列的最前端。當(dāng)鎖被釋放,需要通知隊(duì)列中的線(xiàn)程。作為公平鎖,會(huì)先喚醒隊(duì)列最前端的線(xiàn)程;而非公平鎖會(huì)喚醒所有線(xiàn)程,通過(guò)競(jìng)爭(zhēng)去獲取鎖,后來(lái)的線(xiàn)程有可能獲得鎖。
3.3 lock()和unlock()
我們通過(guò)本節(jié)的開(kāi)始時(shí)提供的例子,代碼跟蹤發(fā)現(xiàn)lock()默認(rèn)走的是非公平鎖:
public ReentrantLock() {
//初始化默認(rèn)是非公平鎖
sync = new NonfairSync();
}
可以通過(guò)設(shè)置boolean的值設(shè)置是公平鎖還是非公平鎖
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
lock()方法走NonFairLock的lock方法
public void lock() {
sync.lock();
}
/**
* 獲取鎖
*/
final void lock() {
//CAS嘗試設(shè)置鎖狀態(tài),占用鎖
if (compareAndSetState(0, 1))
//修改狀態(tài)成功,設(shè)置當(dāng)前線(xiàn)程為獨(dú)占鎖擁有者
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
只有一個(gè)線(xiàn)程的時(shí)候會(huì)直接獨(dú)占,當(dāng)存在線(xiàn)程競(jìng)爭(zhēng)的時(shí)候CAS獲取會(huì)返回false,走acquire(1);走到AQS的acquire方法。
public final void acquire(int arg) {
//走非公平鎖的獲取鎖方法
if (!tryAcquire(arg) &&
//鎖獲取失敗并且添加該線(xiàn)程到等待隊(duì)列中
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中斷當(dāng)前線(xiàn)程
selfInterrupt();
}
逐步看看上面代碼中的幾個(gè)方法
tryAcquire()走到獲取非公平鎖:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//獲取當(dāng)前狀態(tài)
int c = getState();
if (c == 0) {
// 活躍狀態(tài),再次嘗試獲取鎖
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判斷當(dāng)前線(xiàn)程是否是占用鎖的線(xiàn)程
else if (current == getExclusiveOwnerThread()) {
//是當(dāng)前持有鎖的線(xiàn)程,計(jì)數(shù)加1
//TODO 這里我推測(cè)是可重入鎖計(jì)數(shù)的實(shí)現(xiàn),后面去驗(yàn)證
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
添加當(dāng)前線(xiàn)程到同步隊(duì)列
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 將尾節(jié)點(diǎn)設(shè)置為當(dāng)前新節(jié)點(diǎn)的前繼節(jié)點(diǎn)
Node pred = tail;
if (pred != null) {
node.prev = pred;
//CAS設(shè)置當(dāng)前節(jié)點(diǎn)為tail
if (compareAndSetTail(pred, node)) {
//將當(dāng)前節(jié)點(diǎn)設(shè)置為上一節(jié)點(diǎn)的下一節(jié)點(diǎn),有點(diǎn)繞
pred.next = node;
return node;
}
}
//尾節(jié)點(diǎn)是null
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//尾節(jié)點(diǎn)是null,初始化頭尾節(jié)點(diǎn)
if (compareAndSetHead(new Node()))
tail = head;
} else {
//將node 設(shè)置為tail,設(shè)置前后節(jié)點(diǎn)的prev和next
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued():
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//獲取當(dāng)前節(jié)點(diǎn)的前置節(jié)點(diǎn)
final Node p = node.predecessor();
//如果前置節(jié)點(diǎn)是頭并且能重新獲取到鎖,應(yīng)該是防止入隊(duì)列時(shí)頭結(jié)點(diǎn)被釋放
if (p == head && tryAcquire(arg)) {
//設(shè)置當(dāng)前節(jié)點(diǎn)為頭
setHead(node);
p.next = null; // help GC
failed = false;
//返回中斷失敗
return interrupted;
}
//如果前置節(jié)點(diǎn)不是head,也未獲取到鎖,立即執(zhí)行中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
接下來(lái)分析unlock()方法:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
//嘗試釋放
if (tryRelease(arg)) {
Node h = head;
//head不是null,不是活躍狀態(tài)
if (h != null && h.waitStatus != 0)
//釋放鎖成功
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//計(jì)數(shù)減1
int c = getState() - releases;
//當(dāng)前線(xiàn)程是否是持有鎖的線(xiàn)程,不是則拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//沒(méi)有線(xiàn)程持有鎖
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
而公平鎖獲取比非公平鎖多了一個(gè)判斷
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//此處增加了判斷,是否有前驅(qū)節(jié)點(diǎn)在等待
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//判斷是否有前驅(qū)節(jié)點(diǎn)在等待
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
三、Condition條件等待與通知
java的Object類(lèi)型實(shí)現(xiàn)線(xiàn)程等待與通知: 應(yīng)用Object的wait(),wait(long timeout),wait(long timeout, int nanos)與notify(),notifyAll()。整體上看是通過(guò)對(duì)象監(jiān)視器配合完成線(xiàn)程間的等待/通知機(jī)制。
Condition與Lock配合完成等待通知機(jī)制:針對(duì)Object類(lèi)型的等待與通知,Condition也提供了對(duì)應(yīng)的方式。
針對(duì)Object的wait(),wait(long timeout),wait(long timeout, int nanos),Condition提供了以下幾個(gè)方法:
void await() throws InterruptedException:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài),如果其他線(xiàn)程調(diào)用condition的signal或者signalAll方法并且當(dāng)前線(xiàn)程獲取Lock從await方法返回,如果在等待狀態(tài)中被中斷會(huì)拋出被中斷異常;
long awaitNanos(long nanosTimeout):當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知,中斷或者超時(shí);
boolean await(long time, TimeUnit unit)throws InterruptedException:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知,支持自定義時(shí)間單位
boolean awaitUntil(Date deadline) throws InterruptedException:當(dāng)前線(xiàn)程進(jìn)入等待狀態(tài)直到被通知,中斷或者到了某個(gè)指定時(shí)間
還額外提供個(gè)
void awaitUninterruptibly(); 與await()區(qū)別在于,使用await方法,調(diào)用interrupt()中斷后會(huì)報(bào)錯(cuò),而該方法不會(huì)報(bào)錯(cuò)。
針對(duì)Object的notify(),notifyAll(),Condition提供了以下幾個(gè)方法:
void signal():?jiǎn)拘岩粋€(gè)等待在condition上的線(xiàn)程,將該線(xiàn)程從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,如果在同步隊(duì)列中能夠競(jìng)爭(zhēng)到Lock則可以從等待方法中返回。
void signalAll():夠喚醒所有等待在condition上的線(xiàn)程,將全部線(xiàn)程從等待隊(duì)列中轉(zhuǎn)移到同步隊(duì)列中,如果在同步隊(duì)列中能夠競(jìng)爭(zhēng)到Lock則可以從等待方法中返回。
以上鎖的方式實(shí)際是在AQS中實(shí)現(xiàn)的,源碼請(qǐng)看上一章節(jié)的AQS分析。
Condition與Object方式的不同:
Condition能夠支持不響應(yīng)中斷,而通過(guò)使用Object方式不支持;
Condition能夠支持多個(gè)等待隊(duì)列(new 多個(gè)Condition對(duì)象),而Object方式只能支持一個(gè);
Condition能夠支持超時(shí)時(shí)間的設(shè)置,而Object不支持
Condition結(jié)合ReentrantLock的使用:
/**
* 庫(kù)存
*/
static class Inventory {
//初始化ReentrantLock實(shí)例
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
//庫(kù)存數(shù)量
private int num = 100;
//增加庫(kù)存
public void add(int n) throws InterruptedException {
//加鎖
lock.lock();
try {
//先等待sub的通知
condition.await();
num += n;
System.out.println("增加庫(kù)存后的數(shù)量=" + num);
} finally {
//釋放鎖
lock.unlock();
}
}
//減少庫(kù)存
public void sub(int n) throws InterruptedException {
//加鎖
lock.lock();
try {
num -= n;
System.out.println("減少庫(kù)存后的數(shù)量=" + num);
//睡1s,為了看add方法接收通知的效果
Thread.sleep(1000);
condition.signal();
} finally {
//釋放鎖
lock.unlock();
}
}
}
結(jié)果分析:按照代碼邏輯,先走增加方法,但是被await方法阻塞了,1s后執(zhí)行sub方法,減少數(shù)量后并sleep1s,使用signal方法通知add方法,最終看到sub先輸出,add后輸出。
減少庫(kù)存后的數(shù)量=99
增加庫(kù)存后的數(shù)量=100
在代碼中看到,condition對(duì)象實(shí)際是調(diào)用lock的new ConditionObject()方法,new了一個(gè)ConditionObject對(duì)象,ReentrantLock的內(nèi)部Sync繼承了AQS,而ConditionObject是AQS的一個(gè)內(nèi)部類(lèi),實(shí)現(xiàn)了Condition接口。接口內(nèi)提供了諸多通信機(jī)制的方法,可見(jiàn)ReentrantLock、AQS與Condition的緊密關(guān)聯(lián)。相互關(guān)系請(qǐng)見(jiàn)本章節(jié)開(kāi)頭的圖。
有點(diǎn)結(jié)論可以提出一下,了解過(guò)lock和synchronized之后,發(fā)現(xiàn)兩種鎖前者是基于jvm內(nèi)存模型的,后者基于代碼實(shí)現(xiàn),不知道同學(xué)們有沒(méi)有相同感受。
四、Latch門(mén)閂
首先我們寫(xiě)個(gè)例子,來(lái)理解下門(mén)栓的含義:
public static void main(String[] args) throws InterruptedException {
// 使用倒計(jì)數(shù)門(mén)閂器 ,迫使主線(xiàn)程進(jìn)入等待 ;設(shè)置門(mén)栓的值為10
CountDownLatch latch = new CountDownLatch(10);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
//門(mén)栓值減1
latch.countDown();
System.out.println("當(dāng)前門(mén)栓值:" + latch.getCount());
}
}).start();
//阻塞主線(xiàn)程,等門(mén)栓值為0,主線(xiàn)程執(zhí)行
latch.await();
System.out.println("主線(xiàn)程執(zhí)行。。。");
}
結(jié)果:從以下結(jié)果可以看到,當(dāng)門(mén)栓的值降到0之后,主線(xiàn)程執(zhí)行了。
當(dāng)前門(mén)栓值:9
當(dāng)前門(mén)栓值:8
當(dāng)前門(mén)栓值:7
當(dāng)前門(mén)栓值:6
當(dāng)前門(mén)栓值:5
當(dāng)前門(mén)栓值:4
當(dāng)前門(mén)栓值:3
當(dāng)前門(mén)栓值:2
當(dāng)前門(mén)栓值:1
當(dāng)前門(mén)栓值:0
主線(xiàn)程執(zhí)行。。。
接下來(lái)我們分析下原理,其中有個(gè)內(nèi)部類(lèi)Sync,同樣繼承了AQS

private static final class Sync extends AbstractQueuedSynchronizer
結(jié)合上面的例子逐步分析源碼,首先初始化了一個(gè)CountDownLatch對(duì)象:
// 使用倒計(jì)數(shù)門(mén)閂器 ,迫使主線(xiàn)程進(jìn)入等待 ;設(shè)置門(mén)栓的值為10
CountDownLatch latch = new CountDownLatch(10);
//構(gòu)造
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
//同步代碼塊
Sync(int count) {
//設(shè)置AQS的state計(jì)數(shù)
setState(count);
}
用await阻塞主線(xiàn)程:
public void await() throws InterruptedException {
//AQS的獲取中斷共享鎖
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//獲取當(dāng)前值是多少
if (tryAcquireShared(arg) < 0)
//獲取共享鎖
doAcquireSharedInterruptibly(arg);
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//填加獲取共享鎖類(lèi)型到同步隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲取前驅(qū)節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
//前驅(qū)節(jié)點(diǎn)等于head,嘗試獲取共享鎖,就是獲取state的值
int r = tryAcquireShared(arg);
if (r >= 0) {
//獲取共享鎖成功,設(shè)置當(dāng)前節(jié)點(diǎn)為head,釋放原h(huán)ead共享鎖
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//阻塞和中斷
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
countDown()減數(shù)量,釋放共享鎖
public void countDown() {
//釋放共享鎖
sync.releaseShared(1);
}
//AQS釋放共享鎖
public final boolean releaseShared(int arg) {
//獲取state并減1
if (tryReleaseShared(arg)) {
//無(wú)線(xiàn)循環(huán)并通過(guò)CAS釋放所有共享鎖
doReleaseShared();
return true;
}
return false;
}
五、CyclicBarrier線(xiàn)程?hào)艡?/h1>
先看一個(gè)使用例子
public static void main(String[] args) throws BrokenBarrierException, InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(6);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + "準(zhǔn)備就緒");
cyclicBarrier.await();
System.out.println(Thread.currentThread().getName() + "到達(dá)");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(1000);
System.out.println(Thread.currentThread().getName() + "準(zhǔn)備開(kāi)始");
cyclicBarrier.await();
}
結(jié)果:5個(gè)線(xiàn)程和main函數(shù)進(jìn)行await,當(dāng)總數(shù)達(dá)到6后,開(kāi)始執(zhí)行。是不是很簡(jiǎn)單。
Thread-1準(zhǔn)備就緒
Thread-4準(zhǔn)備就緒
Thread-0準(zhǔn)備就緒
Thread-2準(zhǔn)備就緒
Thread-3準(zhǔn)備就緒
main準(zhǔn)備開(kāi)始
Thread-1到達(dá)
Thread-0到達(dá)
Thread-3到達(dá)
Thread-2到達(dá)
Thread-4到達(dá)
看看源碼實(shí)現(xiàn):
//構(gòu)造函數(shù),parties為線(xiàn)程數(shù)量
public CyclicBarrier(int parties) {
this(parties, null);
}
//Runnable 參數(shù),這個(gè)參數(shù)的意思是最后一個(gè)到達(dá)線(xiàn)程要做的任務(wù)
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
//阻塞方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
//引入了Condition等待隊(duì)列,使用await()方法與signalAll()方法,通過(guò)counnt計(jì)數(shù)
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
六、Semaphore信號(hào)量
Semaphore 通常我們叫它信號(hào)量, 可以用來(lái)控制同時(shí)訪問(wèn)特定資源的線(xiàn)程數(shù)量,通過(guò)協(xié)調(diào)各個(gè)線(xiàn)程,以保證合理的使用資源。
官方解釋是Semaphore用于限制可以訪問(wèn)某些資源(物理或邏輯的)的線(xiàn)程數(shù)目,他維護(hù)了一個(gè)許可證集合,有多少資源需要限制就維護(hù)多少許可證集合,假如這里有N個(gè)資源,那就對(duì)應(yīng)于N個(gè)許可證,同一時(shí)刻也只能有N個(gè)線(xiàn)程訪問(wèn)。一個(gè)線(xiàn)程獲取許可證就調(diào)用acquire方法,用完了釋放資源就調(diào)用release方法。
舉個(gè)例子:
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 0; i < 10; i++) {
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "占用時(shí)間:" + LocalDateTime.now());
Thread.sleep(2000);
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
結(jié)果:每次只通過(guò)兩個(gè)線(xiàn)程,等待兩秒。
線(xiàn)程Thread-0占用時(shí)間:2020-08-24T09:45:31.738
線(xiàn)程Thread-1占用時(shí)間:2020-08-24T09:45:31.738
線(xiàn)程Thread-2占用時(shí)間:2020-08-24T09:45:33.740
線(xiàn)程Thread-3占用時(shí)間:2020-08-24T09:45:33.740
線(xiàn)程Thread-4占用時(shí)間:2020-08-24T09:45:35.740
線(xiàn)程Thread-5占用時(shí)間:2020-08-24T09:45:35.740
線(xiàn)程Thread-6占用時(shí)間:2020-08-24T09:45:37.741
線(xiàn)程Thread-7占用時(shí)間:2020-08-24T09:45:37.741
線(xiàn)程Thread-8占用時(shí)間:2020-08-24T09:45:39.741
線(xiàn)程Thread-9占用時(shí)間:2020-08-24T09:45:39.742
針對(duì)上面的例子,我們看下具體的實(shí)現(xiàn)原理:

實(shí)現(xiàn)了三個(gè)內(nèi)部類(lèi),與ReentrantLock是相同的,Syn繼承的AQS,公平鎖與非公平鎖分別繼承Sync實(shí)現(xiàn)同步。
初始化方法:默認(rèn)非公平鎖,同時(shí)定義下通行證的數(shù)量。將通行證數(shù)量設(shè)置到AQS的state。
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
protected final void setState(int newState) {
state = newState;
}
獲取鎖方法:semaphore.acquire();
public void acquire() throws InterruptedException {
//獲取共享可中斷鎖
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//嘗試獲取共享鎖,小于0,則表示當(dāng)前通行證不足
if (tryAcquireShared(arg) < 0)
//通行證數(shù)量不足,創(chuàng)建阻塞隊(duì)列
doAcquireSharedInterruptibly(arg);
}
跟蹤tryAcquireShared(arg)到底層:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
//獲取通行證數(shù)量
int available = getState();
//減去需要或取得數(shù)量
int remaining = available - acquires;
//獲取后數(shù)量小于0,直接返回獲取后數(shù)量,大于0,CAS設(shè)置state
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
跟蹤doAcquireSharedInterruptibly(int arg)方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//添加共享鎖節(jié)點(diǎn)到同步隊(duì)列的尾部
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲得當(dāng)前節(jié)點(diǎn)pre節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
//再次嘗試獲取共享鎖
int r = tryAcquireShared(arg);
if (r >= 0) {
//獲取共享鎖成功,設(shè)置當(dāng)前節(jié)點(diǎn)為head,釋放原h(huán)ead共享鎖
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//重組雙向鏈表,清空無(wú)效節(jié)點(diǎn),掛起當(dāng)前線(xiàn)程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
釋放鎖方法semaphore.release(),跟蹤到底層
public final boolean releaseShared(int arg) {
//釋放鎖
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int releases) {
for (;;) {
//獲取當(dāng)前狀態(tài)
int current = getState();
//加上要釋放的值得到最新的值
int next = current + releases;
//加完后小于當(dāng)前值,【】拋出異常
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS設(shè)置state
if (compareAndSetState(current, next))
return true;
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//是否需要喚醒后繼節(jié)點(diǎn)
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//喚醒h.nex節(jié)點(diǎn)線(xiàn)程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
七、Semaphore與Lock的區(qū)別(高頻面試)
最主要的區(qū)別在于,Semaphore可以進(jìn)行死鎖恢復(fù)。
我們看下Lock的釋放鎖源碼,以ReentrantLock為例。如果當(dāng)前線(xiàn)程不是持有鎖的線(xiàn)程,則拋出IllegalMonitorStateException異常,也就是說(shuō),Lock在unlock前,必須先lock,持有鎖。
protected final boolean tryRelease(int releases) {
//計(jì)數(shù)減1
int c = getState() - releases;
//當(dāng)前線(xiàn)程是否是持有鎖的線(xiàn)程,不是則拋出異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//沒(méi)有線(xiàn)程持有鎖
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
而Semaphore則沒(méi)有這個(gè)判斷,會(huì)直接將設(shè)置state的值,增加通行證的數(shù)量。分別舉兩個(gè)例子看下。
public static void main(String[] args) {
Lock lock = new ReentrantLock();
//Semaphore semaphore = new Semaphore(1);
new Thread(()->{
lock.unlock();
}).start();
}
結(jié)果拋出異常:
Exception in thread "Thread-0" java.lang.IllegalMonitorStateException
at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
at com.cloud.bssp.thread.SemaphoreAndLock.lambda$main$0(SemaphoreAndLock.java:26)
at java.lang.Thread.run(Thread.java:748)
下面來(lái)看下semaphore的例子
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore = new Semaphore(1);
System.out.println("當(dāng)前通行證數(shù)量:" + semaphore.availablePermits());
new Thread(()->{
semaphore.release();
}).start();
Thread.sleep(1000);
System.out.println("當(dāng)前通行證數(shù)量:" + semaphore.availablePermits());
}
結(jié)果:發(fā)現(xiàn)在release之后,數(shù)量增加的一個(gè)。我們可以利用這個(gè)特性去做死鎖恢復(fù)。
簡(jiǎn)單模仿下死鎖恢復(fù)的例子,兩個(gè)線(xiàn)程一個(gè)先占用semaphore1,一個(gè)先占用semaphore2,分別sleep5秒,這時(shí)候沒(méi)有釋放,在去占用另外一個(gè),發(fā)現(xiàn)產(chǎn)生了死鎖,線(xiàn)程卡在這里不動(dòng)了。main方法主線(xiàn)程會(huì)在10秒后去判斷是否釋放鎖,沒(méi)有的話(huà)由主線(xiàn)程去釋放,這時(shí)候發(fā)現(xiàn)兩個(gè)線(xiàn)程分別獲取到了鎖。
/**
* 死鎖恢復(fù)
*/
public static void main(String[] args) throws InterruptedException {
Semaphore semaphore1 = new Semaphore(1);
Semaphore semaphore2 = new Semaphore(1);
new Thread(() -> {
try {
semaphore1.acquire();
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "獲取semaphore1");
Thread.sleep(5000);
semaphore2.acquire();
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "獲取semaphore2");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
semaphore2.acquire();
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "獲取semaphore2");
Thread.sleep(5000);
semaphore1.acquire();
System.out.println("線(xiàn)程" + Thread.currentThread().getName() + "獲取semaphore1");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
Thread.sleep(10000);
//主線(xiàn)程等待十秒,判斷兩個(gè)線(xiàn)程是否執(zhí)行完畢,是否釋放鎖
if (semaphore1.availablePermits() != 1) {
System.out.println("發(fā)生死鎖了,釋放semaphore1");
semaphore1.release();
}
if (semaphore2.availablePermits() != 1) {
System.out.println("發(fā)生死鎖了,釋放semaphore2");
semaphore2.release();
}
}
結(jié)果:
線(xiàn)程Thread-0獲取semaphore1
線(xiàn)程Thread-1獲取semaphore2
發(fā)生死鎖了,釋放semaphore1
發(fā)生死鎖了,釋放semaphore2
線(xiàn)程Thread-1獲取semaphore1
線(xiàn)程Thread-0獲取semaphore2
八、ThreadLocal線(xiàn)程本地變量(高頻面試)
顧名思義,ThreadLocal可以理解為線(xiàn)程本地變量,當(dāng)創(chuàng)建了ThreadLocal變量,那么線(xiàn)程對(duì)于ThreadLocal的讀取就是相互隔離的,不會(huì)產(chǎn)生影響。
8.1 使用實(shí)例
先拋個(gè)實(shí)際使用的例子扔在這,10個(gè)線(xiàn)程分別對(duì)ThreadLocal進(jìn)行加1,最終結(jié)果都是101,每個(gè)線(xiàn)程修改了各自的本地變量。如果是int類(lèi)型的,結(jié)果應(yīng)該為110,體現(xiàn)了線(xiàn)程本地變量的特性。
/**
* 庫(kù)存
*/
static class Inventory {
private ThreadLocal<Integer> num = ThreadLocal.withInitial(() -> 100);
//增加庫(kù)存
public synchronized void add(int n, String threadName) {
//增加庫(kù)存
num.set(num.get() + n);
System.out.println("線(xiàn)程:" + threadName + ",增加庫(kù)存后的數(shù)量=" + num.get());
}
}
public static void main(String[] args) {
Inventory inventory = new Inventory();
for (int i = 0; i < 10; i++) {
new Thread(() -> {
inventory.add(1, Thread.currentThread().getName());
}).start();
}
}
結(jié)果:
線(xiàn)程:Thread-0,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-2,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-1,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-3,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-7,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-9,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-8,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-6,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-4,增加庫(kù)存后的數(shù)量=101
線(xiàn)程:Thread-5,增加庫(kù)存后的數(shù)量=101
8.2 源碼解讀
我很難寫(xiě)出比這篇文章更好的了,所以直接上連接了,不在寫(xiě)了,這篇文章絕對(duì)是當(dāng)前百度能找到最詳細(xì)的了。
https://www.cnblogs.com/micrari/p/6790229.html
九、Phaser 線(xiàn)程階段器(本文只介紹簡(jiǎn)單使用)
在jdk1.7中被引入,能夠完成多階段的任務(wù),并且每個(gè)階段可以多線(xiàn)程并發(fā)執(zhí)行,但是需要當(dāng)前階段全部完成才能進(jìn)入下一階段,相比于CyclicBarrier或者CountryDownLatch,功能更加強(qiáng)大和靈活。
用法
/**
* 線(xiàn)程數(shù),即學(xué)生數(shù)量
*/
private static int PARTIES = 5;
static Phaser p = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("第一題完成");
return false;
case 1:
System.out.println("第二題完成");
return false;
case 2:
System.out.println("第三題完成");
return false;
default:
return true;
}
}
};
private static void firstQuestion() {
System.out.println("線(xiàn)程:" + Thread.currentThread().getName() + ",第一題");
p.arriveAndAwaitAdvance();
}
private static void secondQuestion() {
System.out.println("線(xiàn)程:" + Thread.currentThread().getName() + ",第二題");
p.arriveAndAwaitAdvance();
}
private static void thirdQuestion() {
System.out.println("線(xiàn)程:" + Thread.currentThread().getName() + ",第三題");
p.arriveAndAwaitAdvance();
}
public static void main(String[] args) {
for (int i = 0; i < PARTIES; i++) {
new Thread(() -> {
//線(xiàn)程注冊(cè)
p.register();
firstQuestion();
secondQuestion();
thirdQuestion();
}).start();
}
}
結(jié)果:五個(gè)線(xiàn)程分階段完成了每個(gè)題目
線(xiàn)程:Thread-1,第一題
線(xiàn)程:Thread-3,第一題
線(xiàn)程:Thread-2,第一題
線(xiàn)程:Thread-0,第一題
線(xiàn)程:Thread-4,第一題
第一題完成
線(xiàn)程:Thread-4,第二題
線(xiàn)程:Thread-3,第二題
線(xiàn)程:Thread-2,第二題
線(xiàn)程:Thread-1,第二題
線(xiàn)程:Thread-0,第二題
第二題完成
線(xiàn)程:Thread-0,第三題
線(xiàn)程:Thread-4,第三題
線(xiàn)程:Thread-1,第三題
線(xiàn)程:Thread-3,第三題
線(xiàn)程:Thread-2,第三題
第三題完成
十、Exchanger 線(xiàn)程數(shù)據(jù)交換器 (本文只介紹簡(jiǎn)單使用)
Exchanger 是 JDK 1.5 開(kāi)始提供的一個(gè)用于兩個(gè)工作線(xiàn)程之間交換數(shù)據(jù)的封裝工具類(lèi),當(dāng)?shù)谝粋€(gè)線(xiàn)程調(diào)用了exchange()方法后,當(dāng)前線(xiàn)程會(huì)進(jìn)入阻塞狀態(tài),直到第二個(gè)線(xiàn)程也執(zhí)行了exchange()方法,交換數(shù)據(jù),繼續(xù)執(zhí)行。
使用實(shí)例
/**
* 初始化string類(lèi)型的Exchanger
*/
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
String flag1 = "111";
System.out.println(Thread.currentThread().getName() + "交換前flag1=" + flag1);
try {
//交換數(shù)據(jù),并進(jìn)入阻塞
flag1 = exchanger.exchange(flag1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "交換后flag1=" + flag1);
}).start();
Thread.sleep(1000);
new Thread(() -> {
String flag2 = "222";
System.out.println(Thread.currentThread().getName()+ "交換后flag2=" + flag2);
try {
//交換數(shù)據(jù),喚醒上一個(gè)線(xiàn)程
flag2 = exchanger.exchange(flag2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+ "交換前flag2=" + flag2);
}).start();
}
結(jié)果:在交換過(guò)后,flag1和flag2的值發(fā)生了互換。
Thread-0交換前flag1=111
Thread-1交換后flag2=222
Thread-1交換前flag2=111
Thread-0交換后flag1=222