并發(fā)編程的原理
目標(biāo):
- Lock 的使用
- AQS 原理分析
Condition-
CountDownLatch、Semaphore - 線程池分析
J.U.C = java.util.concurrent
Lock 的使用
-
volatile去解決可見性問題,防止指令重排序 -
synchronized是保證 可見性,有序性,原子性的一種手段
這是 JVM 層面提供的關(guān)鍵字。
**JDK** 層次有一個(gè) `java.util.concurrent` 的工具包,屬于 `并發(fā)` 在 JDK 層次的一種手段。這個(gè)手段也是對我們多線程在操作系統(tǒng)情況下一些控制的保證線程安全的方式。
同步鎖
我們知道,鎖是用來控制多個(gè)線程訪問共享資源的方式,一般來說,一個(gè)鎖能夠防止多個(gè)線程同時(shí)訪問共享資源,在 Lock 接口出現(xiàn)之前,JAVA 應(yīng)用程序只能依靠 `synchronized` 關(guān)鍵字來實(shí)現(xiàn)同步鎖的功能,在 JAVA5 以后,增加了 JUC 的并發(fā)包且提供了 `Lock` 接口用來實(shí)現(xiàn)鎖的功能,它提供了與 `synchroinzed` 關(guān)鍵字類似的同步功能,只是它比 `synchronized` 更靈活,能夠顯示的獲取和釋放鎖。
Lock的初步使用
`Lock` 是一個(gè)接口,核心的兩個(gè)方法 Lock 和 unlock ,它有很多的實(shí)現(xiàn),比如 `ReentrantLock` 、 `ReentrantReadWriteLock`;
public interface Lock {
void lock();
boolean tryLock();
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
Condition newCondition();
}


fair 公平
ReentrantLock
重入鎖,表示支持重新進(jìn)入的鎖,也就是說,如果當(dāng)前線程 t1 通過調(diào)用 `#lock` 方法獲取了鎖之后,再次調(diào)用lock,是不會(huì)再阻塞去獲取鎖的,直接增加重試次數(shù)就行了。
public class AtomicDemo {
private static int count=0;
static Lock lock=new ReentrantLock();
public static void inc(){
lock.lock();
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count++;
lock.unlock();
}
public static void main(String[] args) throws InterruptedException {
for(int i=0;i<1000;i++){
new Thread(()->{AtomicDemo.inc();}).start();;
}
Thread.sleep(3000);
}
}
ReentrantReadWriteLock
我們以前理解的鎖,基本都是排他鎖,也就是這些鎖在同一時(shí)刻只允許一個(gè)線程進(jìn)行訪問,而讀寫所在同一時(shí)刻可以允許多個(gè)線程訪問,但是在寫線程訪問時(shí),所有的讀線程和其他寫線程都會(huì)被阻塞。讀寫鎖維護(hù)了一對鎖,一個(gè)讀鎖、一個(gè)寫鎖;一般情況下,讀寫鎖的性能都會(huì)比排它鎖好,因?yàn)榇蠖鄶?shù)場景 **讀是多于寫的** 。在讀多于寫的情況下,讀寫鎖能夠提供比排它鎖更好的并發(fā)性和吞吐量。
public class RWLockDemo {
// 排他鎖
// 共享鎖,在同一時(shí)刻可以有多個(gè)線程獲得鎖
// 讀鎖, 寫鎖
static Map<String, Object> cacheMap = new HashMap<>();
// 重入讀寫鎖
static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock read = readWriteLock.readLock(); // 讀鎖
static Lock write = readWriteLock.writeLock(); // 寫鎖
// 緩存的更新和讀取的時(shí)候
public static final Object get(String key) {
read.lock(); // 讀取的時(shí)候加上讀鎖
out.println("開始讀取數(shù)據(jù)");
try {
return cacheMap.get(key);
} finally {
read.unlock();
}
}
public static final Object set(String key, Object value) {
write.lock(); // 每一次寫數(shù)據(jù),都需要先加上寫鎖
out.println("開始寫數(shù)據(jù)");
try {
return cacheMap.put(key, value);
} finally {
write.unlock();
}
}
}
在這個(gè)案例中,通過hashmap來模擬了一個(gè)內(nèi)存緩存,然后使用讀寫鎖來保證這個(gè)內(nèi)存緩存的線程安全性。當(dāng)執(zhí)行讀操作的時(shí)候,需要獲取讀鎖,在并發(fā)訪問的時(shí)候,讀鎖不會(huì)被阻塞,因?yàn)樽x操作不會(huì)影響執(zhí)行結(jié)果。
在執(zhí)行寫操作時(shí),線程必須要獲取寫鎖,當(dāng)已經(jīng)有線程持有寫鎖的情況下,當(dāng)前線程會(huì)被阻塞,只有當(dāng)寫鎖釋放以后,其他讀寫操作才能繼續(xù)執(zhí)行。使用讀寫鎖提升讀操作的并發(fā)性。以保證每次寫操作對所有的讀寫操作的可見性。
- 讀鎖與讀鎖可以共享
- 讀鎖與寫鎖不可以共享(排他)
- 寫鎖與寫鎖不可以共享(排他)
Lock 和 synchronized 的簡單對比
通過我們對 `Lock` 的使用以及對 `synchronized` 的了解,基本上可以對比出這兩種鎖的區(qū)別了。因?yàn)檫@個(gè)也是在面試過程中比較常見的問題。
- 從層次上,一個(gè)是關(guān)鍵字、一個(gè)是類, 這是最直觀的差異
- 從使用上,
Lock具備更大的靈活性,可以控制鎖的釋放和獲取; 而synchronized的鎖的釋放是被動(dòng)的,當(dāng)出現(xiàn)異常或者同步代碼塊執(zhí)行完以后,才會(huì)釋放鎖 -
Lock可以判斷鎖的狀態(tài)、而synchronized無法做到
Lock 可以實(shí)現(xiàn)公平鎖、非公平鎖; 而 synchronized 只有非公平鎖
AQS
`Lock` 之所以能實(shí)現(xiàn)線程安全的鎖,主要的核心是 **AQS** ( `AbstractQueuedSynchronizer` ) , `AbstractQueuedSynchronizer` 提供了一個(gè) **FIFO** 隊(duì)列,可以看作是一個(gè)用來實(shí)現(xiàn)鎖以及其他需要同步功能的框架。這里簡稱該類為 **AQS** 。 **AQS** 的使用依靠繼承來完成,子類通過繼承 **AQS** 并實(shí)現(xiàn)所需的方法來管理同步狀態(tài)。例如常見的 `ReentrantLock` ,`CountDownLatch` 等 **AQS** 的兩種功能。
從使用上來說, AQS 的功能可以分為兩種:獨(dú)占和共享。
-
獨(dú)占鎖模式下,每次只能有一個(gè)線程持有鎖,比如前面給大家演示的
ReentrantLock就是以獨(dú)占方式實(shí)現(xiàn)的互斥鎖 -
共享鎖模式下,允許多個(gè)線程同時(shí)獲取鎖,并發(fā)訪問共享資源,比如
ReentrantReadWriteLock。
很顯然,獨(dú)占鎖是一種悲觀保守的加鎖策略,它限制了 讀/讀 沖突,如果某個(gè)只讀線程獲取鎖,則其他讀線程都只能等待,這種情況下就限制了不必要的并發(fā)性,因?yàn)樽x操作并不會(huì)影響數(shù)據(jù)的一致性。共享鎖則是一種樂觀鎖,它放寬了加鎖策略,允許多個(gè)執(zhí)行讀操作的線程同時(shí)訪問共享資源。
public class LockDemo {
static Lock lock = new ReentrantLock();// 有公平重入鎖和非公平重入鎖
private static int count = 0;
public static void incr(){
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.lock(); // 獲得鎖
count ++;
lock.unlock();
}
}
通過一個(gè)重入鎖的方式實(shí)現(xiàn)一個(gè)鎖的等待。JVM 層面是如何讓一個(gè)鎖等待的。
`lock.lock` 用到了 **CXQ** 以及我們的 `EntryList` ,通過隊(duì)列的方式,讓線程等待,等待之前,它用到了 **CAS** ,通過自旋的方式去嘗試獲得鎖。如果在指定時(shí)間內(nèi)獲得鎖失敗的話,它會(huì)去 `park`,最后當(dāng)我們這個(gè)鎖被釋放的時(shí)候,他會(huì)從 `EntryList` 里邊取出一個(gè)線程。再次去爭奪鎖。取出線程,喚醒一個(gè)線程 叫做 `unpark` 。
synchronized 來到了......
AQS的內(nèi)部實(shí)現(xiàn)
同步器依賴內(nèi)部的同步隊(duì)列(一個(gè) **FIFO雙向隊(duì)列** )來完成同步狀態(tài)的管理,當(dāng)前線程獲取同步狀態(tài)失敗時(shí),同步器會(huì)將當(dāng)前線程以及等待狀態(tài)等信息構(gòu)造成為一個(gè)節(jié)點(diǎn)( `Node` )并將其加入同步隊(duì)列,同時(shí)會(huì)阻塞當(dāng)前線程,當(dāng)同步狀態(tài)釋放時(shí),會(huì)把首節(jié)點(diǎn)中的線程喚醒,使其再次嘗試獲取同步狀態(tài)。
Node 的主要屬性如下
static final class Node {
int waitStatus; //表示節(jié)點(diǎn)的狀態(tài),包含cancelled(取消);condition 表示節(jié)點(diǎn)在等待condition也就是在condition隊(duì)列中
Node prev; //前繼節(jié)點(diǎn)
Node next; //后繼節(jié)點(diǎn)
Node nextWaiter; //存儲(chǔ)在condition隊(duì)列中的后繼節(jié)點(diǎn)
Thread thread; //當(dāng)前線程
}
**AQS** 類底層的數(shù)據(jù)結(jié)構(gòu)是使用雙向鏈表,是隊(duì)列的一種實(shí)現(xiàn)。包括一個(gè) head 節(jié)點(diǎn)和一個(gè) tail 節(jié)點(diǎn),分別表示頭結(jié)點(diǎn)和尾節(jié)點(diǎn),其中頭結(jié)點(diǎn)不存儲(chǔ) Thread ,僅保存 next 結(jié)點(diǎn)的引用。


當(dāng)一個(gè)線程成功地獲取了同步狀態(tài)(或者鎖),其他線程將無法獲取到同步狀態(tài),轉(zhuǎn)而被構(gòu)造成為節(jié)點(diǎn)并加入到同步隊(duì)列中,而這個(gè)加入隊(duì)列的過程必須要保證線程安全,因此同步器提供了一個(gè)基于 CAS 的設(shè)置尾節(jié)點(diǎn)的方法: `compareAndSetTail ( Node expect , Nodeupdate )` ,它需要傳遞當(dāng)前線程“認(rèn)為”的尾節(jié)點(diǎn)和當(dāng)前節(jié)點(diǎn),只有設(shè)置成功后,當(dāng)前節(jié)點(diǎn)才正式與之前的尾節(jié)點(diǎn)建立關(guān)聯(lián)。

同步隊(duì)列遵循 **FIFO** ,首節(jié)點(diǎn)是獲取同步狀態(tài)成功的節(jié)點(diǎn),首節(jié)點(diǎn)的線程在釋放同步狀態(tài)時(shí),將會(huì)喚醒后繼節(jié)點(diǎn),而后繼節(jié)點(diǎn)將會(huì)在獲取同步狀態(tài)成功時(shí)將自己設(shè)置為首節(jié)點(diǎn)。

設(shè)置首節(jié)點(diǎn)是通過獲取同步狀態(tài)成功的線程來完成的,由于只有一個(gè)線程能夠成功獲取到同步狀態(tài),因此設(shè)置頭節(jié)點(diǎn)的方法并不需要使用CAS來保證,它只需要將首節(jié)點(diǎn)設(shè)置成為原首節(jié)點(diǎn)的后繼節(jié)點(diǎn)并斷開原首節(jié)點(diǎn)的 `next` 引用即可
compareAndSet

java.util.concurrent.locks.AbstractQueuedSynchronizer
**AQS** 中,除了本身的鏈表結(jié)構(gòu)以外,還有一個(gè)很關(guān)鍵的功能,就是 **CAS** ,這個(gè)是保證在多線程并發(fā)的情況下保證線程安全的前提下去把線程加入到 **AQS** 中的方法,可以簡單理解為樂觀鎖
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
這個(gè)方法里面,首先,用到了 `unsafe` 類,(Unsafe類是在 `sun.misc` 包下,不屬于 JAVA 標(biāo)準(zhǔn)。但是很多 **Java** 的基礎(chǔ)類庫,包括一些被廣泛使用的高性能開發(fā)庫都是基于 `Unsafe` 類開發(fā)的,比如 Netty 、 Hadoop 、 Kafka 等; Unsafe 可認(rèn)為是 JAVA 中留下的后門,提供了一些低層次操作,如直接內(nèi)存訪問、線程調(diào)度等) 。然后調(diào)用了 `#compareAndSwapObject` 這個(gè)方法。
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4,
Object var5);
這個(gè)是一個(gè) `native` 方法,
第一個(gè)參數(shù)為需要改變的對象,第二個(gè)為偏移量(即之前求出來的 headOffset 的值),第三個(gè)參數(shù)為期待的值,第四個(gè)為更新后的值
整個(gè)方法的作用是如果當(dāng)前時(shí)刻的值等于預(yù)期值 var4 相等,則更新為新的期望值 var5,如果更新成功,則返回 **true** ,否則返回 **false** ;
這里傳入了一個(gè) headOffset ,這個(gè) headOffset 是什么呢?在下面的代碼中,通過 `unsafe.objectFieldOffset`

然后通過反射獲取了 AQS 類中的成員變量,并且這個(gè)成員變量被 volatile 修飾的

unsafe.objectFieldOffset
`headOffset` 這個(gè)是指類中相應(yīng)字段在該類的偏移量,在這里具體即是指 head 這個(gè)字段在 **AQS** 類的內(nèi)存中相對于該類首地址的偏移量。
一個(gè) JAVA 對象可以看成是一段內(nèi)存,每個(gè)字段都得按照一定的順序放在這段內(nèi)存里,通過這個(gè)方法可以準(zhǔn)確地告訴你某個(gè)字段相對于對象的起始內(nèi)存地址的字節(jié)偏移。用于在后面的 `compareAndSwapObject` 中,去根據(jù)偏移量找到對象在內(nèi)存中的具體位置。
這個(gè)方法在 `unsafe.cpp` 文件中,代碼如下:
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapObject(JNIEnv *env, jobject unsafe, jobject
obj, jlong offset, jobject e_h, jobject x_h))
UnsafeWrapper("Unsafe_CompareAndSwapObject");
oop x = JNIHandles::resolve(x_h); // 新值
oop e = JNIHandles::resolve(e_h); // 預(yù)期值
oop p = JNIHandles::resolve(obj);
HeapWord* addr = (HeapWord *)index_oop_from_field_offset_long(p, offset);// 在內(nèi)存中的具體位置
oop res = oopDesc::atomic_compare_exchange_oop(x, addr, e, true);// 調(diào)用了另一個(gè)方法,實(shí)際上就是通過cas操作來替換內(nèi)存中的值是否成功
jboolean success = (res == e); // 如果返回的res等于e,則判定滿足compare條件(說明res應(yīng)該為內(nèi)存中的當(dāng)前值),但實(shí)際上會(huì)有ABA的問題
if (success) // success為true時(shí),說明此時(shí)已經(jīng)交換成功(調(diào)用的是最底層的cmpxchg指令)
update_barrier_set((void*)addr, x); // 每次Reference類型數(shù)據(jù)寫操作時(shí),都會(huì)產(chǎn)生一個(gè)WriteBarrier暫時(shí)中斷操作,配合垃圾收集器
return success;
UNSAFE_END
所以其實(shí) #compareAndSet 這個(gè)方法,最終調(diào)用的是 unsafe 類的 #compareAndSwap ,這個(gè)指令會(huì)對內(nèi)存中的共享數(shù)據(jù)做原子的讀寫操作。
- 首先, cpu會(huì)把內(nèi)存中將要被更改的數(shù)據(jù)與期望值作比較
- 然后,當(dāng)兩個(gè)值相等時(shí),cpu才會(huì)將內(nèi)存中的對象替換為新的值。否則,不做變更操作。
- 最后,返回操作執(zhí)行結(jié)果
很顯然,這是一種樂觀鎖的實(shí)現(xiàn)思路。
ReentrantLock的實(shí)現(xiàn)原理分析
之所以叫重入鎖是因?yàn)橥粋€(gè)線程如果已經(jīng)獲得了鎖,那么后續(xù)該線程調(diào)用lock方法時(shí)不需要再次獲取鎖,也就是不會(huì)阻塞;重入鎖提供了兩種實(shí)現(xiàn),一種是非公平的重入鎖,另一種是公平的重入鎖。怎么理解公平和非公平呢?
如果在絕對時(shí)間上,先對鎖進(jìn)行獲取的請求一定先被滿足獲得鎖,那么這個(gè)鎖就是公平鎖,反之,就是不公平的。簡單來說公平鎖就是等待時(shí)間最長的線程最優(yōu)先獲取鎖。
默認(rèn)的情況下就是非公平鎖。
非公平鎖的實(shí)現(xiàn)流程時(shí)序圖

源碼分析
ReentrantLock.lock
public void lock() {
sync.lock();
}
這個(gè)是獲取鎖的入口,調(diào)用了 `sync.lock` ; `sync` 是一個(gè)實(shí)現(xiàn)了 **AQS** 的抽象類,這個(gè)類的主要作用是用來實(shí)現(xiàn)同步控制的,并且 sync 有兩個(gè)實(shí)現(xiàn),一個(gè)是 `NonfairSync` (非公平鎖)、另一個(gè)是 `FailSync` (公平鎖); 我們先來分析一下非公平鎖的實(shí)現(xiàn)
NonfairSync.lock
final void lock() {
if (compareAndSetState(0, 1)) //這是跟公平鎖的主要區(qū)別,一上來就試探鎖是否空閑,如果可以插隊(duì),則設(shè)置獲得鎖的線程為當(dāng)前線程
//exclusiveOwnerThread屬性是AQS從父類AbstractOwnableSynchronizer中繼承的屬性,用來保存當(dāng)前占用同步狀態(tài)的線程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //嘗試去獲取鎖
}
`compareAndSetState`,這個(gè)方法在前面提到過了,再簡單講解一下,通過 **CAS** 算法去改變 state 的值,而這個(gè) state 是什么呢? 在 **AQS** 中存在一個(gè)變量 state ,對于`ReentrantLock` 來說,如果 `state = 0` 表示無鎖狀態(tài)、如果 `state > 0` 表示有鎖狀態(tài)。 ( state 在不同的鎖里邊表達(dá)的意思是不一樣的 )
所以在這里,是表示當(dāng)前的 state 如果等于 0 ,則替換為 1 ,如果替換成功表示獲取鎖成功了由于 `ReentrantLock` 是可重入鎖,所以持有鎖的線程可以多次加鎖,經(jīng)過判斷加鎖線程就是當(dāng)前持有鎖的線程時(shí)
(即 exclusiveOwnerThread==Thread.currentThread() ),即可加鎖,每次加鎖都會(huì)將 state 的值 +1 , state 等于幾,就代表當(dāng)前持有鎖的線程加了幾次鎖;
解鎖時(shí)每解一次鎖就會(huì)將 state 減 1 , state 減到 0 后,鎖就被釋放掉,這時(shí)其他線程可以加鎖;
AbstractQueuedSynchronizer.acquire
如果 **CAS** 操作未能成功,說明 `state` 已經(jīng)不為 0 ,此時(shí)繼續(xù) `acquire(1)` 操作, `acquire` 是AQS中的方法 當(dāng)多個(gè)線程同時(shí)進(jìn)入這個(gè)方法時(shí),首先通過 **CAS** 去修改 **state** 的狀態(tài),如果修改成功表示競爭鎖成功,競爭失敗的, `tryAcquire` 會(huì)返回 `false`
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
這個(gè)方法的主要作用是
- 嘗試獲取獨(dú)占鎖,獲取成功則返回,否則
- 自旋獲取鎖,并且判斷中斷標(biāo)識,如果中斷標(biāo)識為
true,則設(shè)置線程中斷 -
addWaiter方法把當(dāng)前線程封裝成Node,并添加到隊(duì)列的尾部
NonfairSync.tryAcquire
`tryAcquire` 方法嘗試獲取鎖,如果成功就返回,如果不成功,則把當(dāng)前線程和等待狀態(tài)信息構(gòu)造成一個(gè)Node節(jié)點(diǎn),并將結(jié)點(diǎn)放入同步隊(duì)列的尾部。然后為同步隊(duì)列中的當(dāng)前節(jié)點(diǎn)循環(huán)等待獲取鎖,直到成功。
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
nofairTryAcquire
這里可以看出非公平鎖的含義,即獲取鎖并不會(huì)嚴(yán)格根據(jù)爭用鎖的先后順序決定。這里的實(shí)現(xiàn)邏輯類似 `synchroized` 關(guān)鍵字的偏向鎖的做法,即可重入而不用進(jìn)一步進(jìn)行鎖的競爭,也解釋了 `ReentrantLock` 中 `Reentrant` 的意義。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); //獲取當(dāng)前的狀態(tài),前面講過,默認(rèn)情況下是0表示無鎖狀態(tài)
if (c == 0) {
if (compareAndSetState(0, acquires)) { //通過cas來改變state狀態(tài)的值,如果更新成功,表示獲取鎖成功,這個(gè)操作外部方法lock()就做過一次,這里再做只是為了再嘗試一次,盡量以最簡單的方式獲取鎖。
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果當(dāng)前線程等于獲取鎖的線程,表示重入,直接累加重入次數(shù)
int nextc = c + acquires;
if (nextc < 0) // overflow 如果這個(gè)狀態(tài)值越界,拋出異常;如果沒有越界,則設(shè)置后返回true
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果狀態(tài)不為0,且當(dāng)前線程不是owner,則返回false。
return false; //獲取鎖失敗,返回false
}
addWaiter
當(dāng)前鎖如果已經(jīng)被其他線程鎖持有,那么當(dāng)前線程來去請求鎖的時(shí)候,會(huì)進(jìn)入這個(gè)方法,這個(gè)方法主要是把當(dāng)前線程封裝成 node ,添加到 **AQS** 的鏈表中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); //創(chuàng)建一個(gè)獨(dú)占的Node節(jié)點(diǎn),mode為排他模式
// 嘗試快速入隊(duì),如果失敗則降級至full enq
Node pred = tail; // tail是AQS中表示同步隊(duì)列隊(duì)尾的屬性,剛開始為null,所以進(jìn)行enq(node)方法
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 防止有其他線程修改tail,使用CAS進(jìn)行修改,如果失敗則降級至full enq
pred.next = node; // 如果成功之后舊的tail的next指針再指向新的tail,成為雙向鏈表
return node;
}
}
enq(node); // 如果隊(duì)列為null或者CAS設(shè)置新的tail失敗
return node;
}
enq
enq就是通過自旋操作把當(dāng)前節(jié)點(diǎn)加入到隊(duì)列中
private Node enq(final Node node) {
for (;;) { //無效的循環(huán),為什么采用for(;;),是因?yàn)樗鼒?zhí)行的指令少,不占用寄存器
Node t = tail;// 此時(shí)head, tail都為null
if (t == null) { // Must initialize// 如果tail為null則說明隊(duì)列首次使用,需要進(jìn)行初始化
if (compareAndSetHead(new Node()))// 設(shè)置頭節(jié)點(diǎn),如果失敗則存在競爭,留至下一輪循環(huán)
tail = head; // 用CAS的方式創(chuàng)建一個(gè)空的Node作為頭結(jié)點(diǎn),因?yàn)榇藭r(shí)隊(duì)列中只一個(gè)頭結(jié)點(diǎn),所以tail也指向head,第一次循環(huán)執(zhí)行結(jié)束
} else {
//進(jìn)行第二次循環(huán)時(shí),tail不為null,進(jìn)入else區(qū)域。將當(dāng)前線程的Node結(jié)點(diǎn)的prev指向tail,然后使用CAS將tail指向Node
//這部分代碼和addWaiter代碼一樣,將當(dāng)前節(jié)點(diǎn)添加到隊(duì)列
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node; //t此時(shí)指向tail,所以可以CAS成功,將tail重新指向CNode。此時(shí)t為更新前的tail的值,即指向空的頭結(jié)點(diǎn),t.next=node,就將頭結(jié)點(diǎn)的后續(xù)結(jié)點(diǎn)指向Node,返回頭結(jié)點(diǎn)。
return t;
}
}
}
}
代碼運(yùn)行到這里, AQS 隊(duì)列的結(jié)構(gòu)就是這樣一個(gè)表現(xiàn)。

acquireQueued
`addWaiter` 返回了插入的節(jié)點(diǎn),作為 `acquireQueued` 方法的入?yún)?,這個(gè)方法主要用于爭搶鎖。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();// 獲取prev節(jié)點(diǎn),若為null即刻拋出NullPointException
if (p == head && tryAcquire(arg)) {// 如果前驅(qū)為head才有資格進(jìn)行鎖的搶奪
setHead(node); // 獲取鎖成功后就不需要再進(jìn)行同步操作了,獲取鎖成功的線程作為新的head節(jié)點(diǎn)
//凡是head節(jié)點(diǎn),head.thread與head.prev永遠(yuǎn)為null,但是head.next不為null
p.next = null; // help GC
failed = false; //獲取鎖成功
return interrupted;
}
//如果獲取鎖失敗,則根據(jù)節(jié)點(diǎn)的waitStatus決定是否需要掛起線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())// 若前面為true,則執(zhí)行掛起,待下次喚醒的時(shí)候檢測中斷的標(biāo)志
interrupted = true;
}
} finally {
if (failed) // 如果拋出異常則取消鎖的獲取,進(jìn)行出隊(duì)(sync queue)操作
cancelAcquire(node);
}
}
原來的 head 節(jié)點(diǎn)釋放鎖以后,會(huì)從隊(duì)列中移除,原來 head 節(jié)點(diǎn)的 next 節(jié)點(diǎn)會(huì)成為 head 節(jié)點(diǎn)

shouldParkAfterFailedAcquire
從上面的分析可以看出,只有隊(duì)列的第二個(gè)節(jié)點(diǎn)可以有機(jī)會(huì)爭用鎖,如果成功獲取鎖,則此節(jié)點(diǎn)晉升為頭節(jié)點(diǎn)。對于第三個(gè)及以后的節(jié)點(diǎn), `if (p == head)` 條件不成立,首先進(jìn)行 `shouldParkAfterFailedAcquire(p, node)` 操作。
`#shouldParkAfterFailedAcquire` 方法是判斷一個(gè)爭用鎖的線程是否應(yīng)該被阻塞。它首先判斷一個(gè)節(jié)點(diǎn)的前置節(jié)點(diǎn)的狀態(tài)是否為 `Node.SIGNAL` ,如果是,是說明此節(jié)點(diǎn)已經(jīng)將狀態(tài)設(shè)置-如果鎖釋放,則應(yīng)當(dāng)通知它,所以它可以安全地阻塞了,返回 `true` 。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 前繼節(jié)點(diǎn)的狀態(tài)
if (ws == Node.SIGNAL)//如果是SIGNAL狀態(tài),意味著當(dāng)前線程需要被unpark喚醒
return true;
// 如果前節(jié)點(diǎn)的狀態(tài)大于0,即為CANCELLED狀態(tài)時(shí),則會(huì)從前節(jié)點(diǎn)開始逐步循環(huán)找到一個(gè)沒有被“CANCELLED”節(jié)點(diǎn)設(shè)置為當(dāng)前節(jié)點(diǎn)的前節(jié)點(diǎn),返回false。在下次循環(huán)執(zhí)行shouldParkAfterFailedAcquire時(shí),返回true。這個(gè)操作實(shí)際是把隊(duì)列中CANCELLED的節(jié)點(diǎn)剔除掉。
if (ws > 0) {// 如果前繼節(jié)點(diǎn)是“取消”狀態(tài),則設(shè)置 “當(dāng)前節(jié)點(diǎn)”的 “當(dāng)前前繼節(jié)點(diǎn)” 為 “‘原前繼節(jié)點(diǎn)'的前繼節(jié)點(diǎn)”。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 如果前繼節(jié)點(diǎn)為“0”或者“共享鎖”狀態(tài),則設(shè)置前繼節(jié)點(diǎn)為SIGNAL狀態(tài)。
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
解讀:假如有t1,t2兩個(gè)線程都加入到了鏈表中
img
如果head節(jié)點(diǎn)位置的線程一直持有鎖,那么t1和t2就是掛起狀態(tài),而HEAD以及Thread1的awaitStatus都是 SIGNAL ,在多次嘗試獲取鎖失敗以后,就會(huì)通過下面的方法進(jìn)行掛起(這個(gè)地方就是避免了驚群效應(yīng),每個(gè)節(jié)點(diǎn)只需要關(guān)心上一個(gè)節(jié)點(diǎn)的狀態(tài)即可)

-
SIGNAL:值為 -1 ,表示當(dāng)前節(jié)點(diǎn)的后繼節(jié)點(diǎn)將要或者已經(jīng)被阻塞,在當(dāng)前節(jié)點(diǎn)釋放的時(shí)候需要 unpark 后繼節(jié)點(diǎn); -
CONDITION:值為 -2 ,表示當(dāng)前節(jié)點(diǎn)在等待 condition ,即在 condition 隊(duì)列中; -
PROPAGATE:值為 -3 ,表示 releaseShared 需要被傳遞給后續(xù)節(jié)點(diǎn)(僅在共享模式下使用);
parkAndCheckInterrupt
如果 shouldParkAfterFailedAcquire 返回了 true ,則執(zhí)行:“ parkAndCheckInterrupt() ”方法,它是通過 LockSupport.park(this)將當(dāng)前線程掛起到WATING狀態(tài),它需要等待一個(gè)中斷、unpark方法來喚醒它,通過這樣一種FIFO的機(jī)制的等待,來實(shí)現(xiàn)了Lock的操作
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);// LockSupport提供park()和unpark()方法實(shí)現(xiàn)阻塞線程和解除線程阻塞
return Thread.interrupted();
}
ReentrantLock.unlock
加鎖的過程分析完以后,再來分析一下釋放鎖的過程,調(diào)用 release 方法,這個(gè)方法里面做兩件事。
- 釋放鎖 ;
- 喚醒 park 的線程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease
這個(gè)動(dòng)作可以認(rèn)為就是一個(gè)設(shè)置鎖狀態(tài)的操作,而且是將狀態(tài)減掉傳入的參數(shù)值(參數(shù)是 1 ),如果結(jié)果狀態(tài)為 0 ,就將排它鎖的 Owner 設(shè)置為 null ,以使得其他的線程有機(jī)會(huì)進(jìn)行執(zhí)行。 在排它鎖中,加鎖的時(shí)候狀態(tài)會(huì)增加 1 (當(dāng)然可以自己修改這個(gè)值),在解鎖的時(shí)候減掉 1 ,同一個(gè)鎖,在可以重入后,可能會(huì)被疊加為 2、3、4這些值,只有 `unlock()` 的次數(shù)與 `lock()` 的次數(shù)對應(yīng)才會(huì)將 Owner 線程設(shè)置為空,而且也只有這種情況下才會(huì)返回 true 。
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 這里是將鎖的數(shù)量減1
if (Thread.currentThread() != getExclusiveOwnerThread())// 如果釋放的線程和獲取鎖的線程不是同一個(gè),拋出非法監(jiān)視器狀態(tài)異常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 由于重入的關(guān)系,不是每次釋放鎖c都等于0,
// 直到最后一次釋放鎖時(shí),才會(huì)把當(dāng)前線程釋放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
LockSupport
`LockSupport` 類是 Java6 引入的一個(gè)類,提供了基本的線程同步原語。LockSupport 實(shí)際上是調(diào)用了 Unsafe 類里的函數(shù),歸結(jié)到 Unsafe 里,只有兩個(gè)函數(shù):
public native void unpark(Thread jthread);
public native void park(boolean isAbsolute, long time);
`unpark` 函數(shù)為線程提供“許可(permit)”,線程調(diào)用park函數(shù)則等待“許可”。這個(gè)有點(diǎn)像信號量,但是這個(gè)“許可”是不能疊加的,“許可”是一次性的。
permit相當(dāng)于0/1的開關(guān),默認(rèn)是0,調(diào)用一次 unpark 就加 1 變成了 1 .調(diào)用一次 park 會(huì)消費(fèi) permit ,又會(huì)變成 0 。 如果再調(diào)用一次 park 會(huì)阻塞,因?yàn)?permit 已經(jīng)是 0 了。直到 permit 變成 1 .這時(shí)調(diào)用 unpark 會(huì)把permit 設(shè)置為 1 .每個(gè)線程都有一個(gè)相關(guān)的 permit , permit 最多只有一個(gè),重復(fù)調(diào)用unpark不會(huì)累積。
在使用 LockSupport 之前,我們對線程做同步,只能使用 wait 和 notify ,但是 wait 和 notify 其實(shí)不是很靈活,并且耦合性很高,調(diào)用notify必須要確保某個(gè)線程處于 wait 狀態(tài),而 park/unpark 模型真正解耦了線程之間的同步,先后順序沒有沒有直接關(guān)聯(lián),同時(shí)線程之間不再需要一個(gè)Object或者其他變量來存儲(chǔ)狀態(tài),不再需要關(guān)心對方的狀態(tài)。
總結(jié)
分析了獨(dú)占式同步狀態(tài)獲取和釋放過程后,做個(gè)簡單的總結(jié):在獲取同步狀態(tài)時(shí),同步器維護(hù)一個(gè)同步隊(duì)列,獲取狀態(tài)失敗的線程都會(huì)被加入到隊(duì)列中并在隊(duì)列中進(jìn)行自旋;移出隊(duì)列(或停止自旋)的條件是前驅(qū)節(jié)點(diǎn)為頭節(jié)點(diǎn)且成功獲取了同步狀態(tài)。在釋放同步狀態(tài)時(shí),同步器調(diào)用 `tryRelease(int arg)` 方法釋放同步狀態(tài),然后喚醒頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)。
公平鎖和非公平鎖的區(qū)別
鎖的公平性是相對于獲取鎖的順序而言的,如果是一個(gè)公平鎖,那么鎖的獲取順序就應(yīng)該符合請求的絕對時(shí)間順序,也就是 **FIFO** 。 在上面分析的例子來說,只要 **CAS** 設(shè)置同步狀態(tài)成功,則表示當(dāng)前線程獲取了鎖,而公平鎖則不一樣,差異點(diǎn)有兩個(gè)。
FairSync.tryAcquire
final void** lock() {
acquire(1);
}
非公平鎖在獲取鎖的時(shí)候,會(huì)先通過CAS進(jìn)行搶占,而公平鎖則不會(huì)
FairSync.tryAcquire
protected final boolean* tryAcquire(int acquires) {
final Thread current = Thread.currentThread*();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
這個(gè)方法與 `nonfairTryAcquire(int acquires)` 比較,不同的地方在于判斷條件多了 `hasQueuedPredecessors()` 方法,也就是加入了[同步隊(duì)列中當(dāng)前節(jié)點(diǎn)是否有前驅(qū)節(jié)點(diǎn)]的判斷,如果該方法返回 `true` ,則表示有線程比當(dāng)前線程更早地請求獲取鎖,因此需要等待前驅(qū)線程獲取并釋放鎖之后才能繼續(xù)獲取鎖。
Condition
通過前面的課程學(xué)習(xí),我們知道任意一個(gè)Java對象,都擁有一組監(jiān)視器方法(定義在 `java.lang.Object`上),主要包括 `wait()` 、 `notify()` 以及 `notifyAll()` 方法,這些方法與同步關(guān)鍵字配合,可以實(shí)現(xiàn)等待/通知模式 `J.U.C` 包提供了 `Condition` 來對鎖進(jìn)行精準(zhǔn)控制, `Condition` 是一個(gè)多線程協(xié)調(diào)通信的工具類,可以讓某些線程一起等待某個(gè)條件( `Condition` ),只有滿足條件時(shí),線程才會(huì)被喚醒。
condition使用案例
ConditionWait
@RequiredArgsConstructor
public class ConditionWait extends Thread {
private final Lock lock;
private final Condition condition;
@Override
public void run() {
lock.lock();
try {
System.out.println("【" + Thread.currentThread().getName() + "】開始執(zhí)行 condition.await()");
condition.await(); //
System.out.println("【" + Thread.currentThread().getName() + "】執(zhí)行結(jié)束 condition.await()");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
ConditionSignal
@RequiredArgsConstructor
public class ConditionNotify extends Thread {
private final Lock lock;
private final Condition condition;
@Override
public void run() {
lock.lock();
try {
System.out.println("【" + Thread.currentThread().getName() + "】開始執(zhí)行 condition.signal()");
condition.signal(); // signal 和 signalAll
System.out.println("【" + Thread.currentThread().getName() + "】執(zhí)行結(jié)束 condition.signal()");
} finally {
lock.unlock();
}
}
}
通過這個(gè)案例簡單實(shí)現(xiàn)了 `wait` 和 `notify` 的功能,當(dāng)調(diào)用 `await` 方法后,當(dāng)前線程會(huì)釋放鎖并等待,而其他線程調(diào)用 `condition` 對象的 `signal` 或者 `signalall` 方法通知被阻塞的線程,然后自己執(zhí)行 `unlock` 釋放鎖,被喚醒的線程獲得之前的鎖繼續(xù)執(zhí)行,最后釋放鎖。
所以,condition 中兩個(gè)最重要的方法,一個(gè)是 await ,一個(gè)是 signal 方法
-
await: 把當(dāng)前線程阻塞掛起 -
signal: 喚醒阻塞的線程
public class ConnditionDemo {
public static void main(String[] args) {
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
ConditionWait conditionWait = new ConditionWait(lock, condition);
conditionWait.start();
ConditionNotify conditionNotify = new ConditionNotify(lock, condition);
conditionNotify.start();
}
}
【Thread-0】開始執(zhí)行 condition.await()
【Thread-1】開始執(zhí)行 condition.signal()
【Thread-1】執(zhí)行結(jié)束 condition.signal()
【Thread-0】執(zhí)行結(jié)束 condition.await()
await 方法
調(diào)用 `Condition` 的 `await()` 方法(或者以 `await` 開頭的方法),會(huì)使當(dāng)前線程進(jìn)入等待隊(duì)列并釋放鎖,同時(shí)線程狀態(tài)變?yōu)榈却隣顟B(tài)。當(dāng)從 `await()`方法返回時(shí),當(dāng)前線程一定獲取了 `Condition` 相關(guān)聯(lián)的鎖。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //創(chuàng)建一個(gè)新的節(jié)點(diǎn),節(jié)點(diǎn)狀態(tài)為condition,采用的數(shù)據(jù)結(jié)構(gòu)仍然是鏈表
int savedState = fullyRelease(node); //釋放當(dāng)前的鎖,得到鎖的狀態(tài),并喚醒AQS隊(duì)列中的一個(gè)線程
int interruptMode = 0;
//如果當(dāng)前節(jié)點(diǎn)沒有在同步隊(duì)列上,即還沒有被signal,則將當(dāng)前線程阻塞
//isOnSyncQueue 判斷當(dāng)前 node 狀態(tài),如果是 CONDITION 狀態(tài),或者不在隊(duì)列上了,就繼續(xù)阻塞,還在隊(duì)列上且不是 CONDITION 狀態(tài)了,就結(jié)束循環(huán)和阻塞
while (!isOnSyncQueue(node)) {//第一次判斷的是false,因?yàn)榍懊嬉呀?jīng)釋放鎖了
LockSupport.park(this); // 第一次總是 park 自己,開始阻塞等待
// 線程判斷自己在等待過程中是否被中斷了,如果沒有中斷,則再次循環(huán),會(huì)在 isOnSyncQueue 中判斷自己是否在隊(duì)列上.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 當(dāng)這個(gè)線程醒來,會(huì)嘗試拿鎖, 當(dāng) acquireQueued 返回 false 就是拿到鎖了.
// interruptMode != THROW_IE -> 表示這個(gè)線程沒有成功將 node 入隊(duì),但 signal 執(zhí)行了 enq 方法讓其入隊(duì)了.
// 將這個(gè)變量設(shè)置成 REINTERRUPT.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果 node 的下一個(gè)等待者不是 null, 則進(jìn)行清理,清理 Condition 隊(duì)列上的節(jié)點(diǎn).
// 如果是 null ,就沒有什么好清理的了.
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 如果線程被中斷了,需要拋出異常.或者什么都不做
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal
調(diào)用 `Condition` 的 `signal()` 方法,將會(huì)喚醒在等待隊(duì)列中等待時(shí)間最長的節(jié)點(diǎn)(首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)之前,會(huì)將節(jié)點(diǎn)移到同步隊(duì)列中
public final void signal() {
if (!isHeldExclusively()) //先判斷當(dāng)前線程是否獲得了鎖
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 拿到 Condition 隊(duì)列上第一個(gè)節(jié)點(diǎn)
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)// 如果第一個(gè)節(jié)點(diǎn)的下一個(gè)節(jié)點(diǎn)是 null,那么, 最后一個(gè)節(jié)點(diǎn)也是 null.
lastWaiter = null; // 將 next 節(jié)點(diǎn)設(shè)置成 null
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
該方法先是 CAS 修改了節(jié)點(diǎn)狀態(tài),如果成功,就將這個(gè)節(jié)點(diǎn)放到 AQS 隊(duì)列中,然后喚醒這個(gè)節(jié)點(diǎn)上的線程。此時(shí),那個(gè)節(jié)點(diǎn)就會(huì)在 await 方法中蘇醒。
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
// 如果上一個(gè)節(jié)點(diǎn)的狀態(tài)被取消了, 或者嘗試設(shè)置上一個(gè)節(jié)點(diǎn)的狀態(tài)為 SIGNAL 失敗了(SIGNAL 表示: 他的
next 節(jié)點(diǎn)需要停止阻塞),
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); // 喚醒輸入節(jié)點(diǎn)上的線程.
return true;
}
讀寫鎖與 synchronized 在只讀的線程中需要耗費(fèi)大量的時(shí)間的時(shí)候的性能對比,本質(zhì)在于當(dāng)讀操作時(shí),不進(jìn)行阻塞
// [readWrite] 9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:665
// [synchronized] 9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:19007
public class TestperformanceDemo {
static Integer demoInteger = 0;
// 重入讀寫鎖
static ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock read = readWriteLock.readLock(); // 讀鎖
static Lock write = readWriteLock.writeLock(); // 寫鎖
public static void main(String[] args) {
int countSum = 10000;
add1000Seconds(countSum);
add1000synchronized(countSum);
// [readWrite] 9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:665
// [synchronized] 9999次讀操作{每次讀操作 睡眠 1 ms},1次 " +1" 操作以后, 結(jié)果為:1[耗時(shí)]:19007
}
/***
* 第二種方式
*/
public static void add1000Seconds(int countSum) {
demoInteger = 0;
ExecutorService executorService = Executors.newFixedThreadPool(countSum);
long start = System.currentTimeMillis();
CompletionService completionService = new ExecutorCompletionService(executorService);
for (int i = 0; i < countSum; i++) {
int finalI = i;
completionService.submit(() -> {
if (finalI == 0) {
write.lock();
try {
demoInteger++;
} finally {
write.unlock();
}
} else {
read.lock();
try {
Integer value = demoInteger;
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
read.unlock();
}
}
}, null);
}
int count = 0;
while (count < countSum) { // 等待任務(wù)完成全部
if (completionService.poll() != null) {
count++;
}
}
long end = System.currentTimeMillis();
System.out.println("[readWrite] " + (countSum - 1) + "次讀操作{每次讀操作 睡眠 1 ms},"
+ 1 + "次 \" +1\" 操作以后, 結(jié)果為:" + demoInteger
+ "[耗時(shí)]:" + (end - start));
}
/***
* synchronized
*/
public static void add1000synchronized(int countSum) {
demoInteger = 0;
ExecutorService executorService = Executors.newFixedThreadPool(countSum);
long start = System.currentTimeMillis();
CompletionService completionService = new ExecutorCompletionService(executorService);
for (int i = 0; i < countSum; i++) {
int finalI = i;
completionService.submit(() -> {
synchronized (RWLockDemo.class) {
if (finalI == 0) {
demoInteger++;
} else {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Integer value = demoInteger;
}
}
}, null);
}
int count = 0;
while (count < countSum) { // 等待任務(wù)完成全部
if (completionService.poll() != null) {
count++;
}
}
long end = System.currentTimeMillis();
System.out.println("[synchronized] " + (countSum - 1) + "次讀操作{每次讀操作 睡眠 1 ms},"
+ 1 + "次 \" +1\" 操作以后, 結(jié)果為:" + demoInteger
+ "[耗時(shí)]:" + (end - start));
}
}