Java高并發(fā)--AQS
主要是學(xué)習(xí)慕課網(wǎng)實(shí)戰(zhàn)視頻《Java并發(fā)編程入門(mén)與高并發(fā)面試》的筆記
AQS是AbstractQueuedSynchronizer的簡(jiǎn)稱(chēng),直譯過(guò)來(lái)是抽象隊(duì)列同步器。AQS的底層數(shù)據(jù)結(jié)構(gòu)是隊(duì)列,如下所示

AQS使用Node實(shí)現(xiàn)FIFO隊(duì)列,可以用于構(gòu)建鎖或者其他同步裝置的基礎(chǔ)框架,利用一個(gè)int類(lèi)型表示狀態(tài)(state)
使用該框架的功能需要讓子類(lèi)繼承,并重寫(xiě)相關(guān)方法。
子類(lèi)通過(guò)繼承并通過(guò)重寫(xiě)方法管理其狀態(tài)acquire和release的方法操縱狀態(tài)
可以同時(shí)實(shí)現(xiàn)排他鎖和共享鎖模式(獨(dú)占、共享),要么使用獨(dú)占鎖要么使用共享鎖,而不會(huì)同時(shí)使用兩者
閉鎖- CountdownLatch
常稱(chēng)為"閉鎖",是一個(gè)倒計(jì)數(shù)器,如下,設(shè)定了倒計(jì)數(shù)值為3.當(dāng)前線(xiàn)程調(diào)用await()被阻塞,其他線(xiàn)程每調(diào)用一次countDown()計(jì)數(shù)器減1,一直到計(jì)數(shù)值cnt等于0時(shí),當(dāng)前線(xiàn)程才可以繼續(xù)(恢復(fù))執(zhí)行??梢岳斫鉃橐粋€(gè)線(xiàn)程等待多個(gè)線(xiàn)程執(zhí)行完畢,這樣該線(xiàn)程可以利用其他多個(gè)線(xiàn)程的執(zhí)行結(jié)果。

下面是CountdownLatch的簡(jiǎn)單使用,當(dāng)計(jì)數(shù)器從60減到0時(shí)候,當(dāng)前線(xiàn)程會(huì)打印“Go!”
由于當(dāng)前線(xiàn)程被阻塞,需等到其余60個(gè)線(xiàn)程執(zhí)行完畢,因此當(dāng)前線(xiàn)程的"Go!"會(huì)最后打印。
package com.shy.concurrency.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author Haiyu
* @date 2019/1/3 11:00
*/
@Slf4j
public class CountdownLatchExample {
private static CountDownLatch cdl = new CountDownLatch(60);
public static void main(String[] args) throws InterruptedException {
final int num = 60;
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = num; i >= 0; i--) {
final int cur = i;
executorService.execute(() -> {
try {
run(cur);
} catch (InterruptedException e) {
log.error("exception", e);
} finally {
cdl.countDown();
}
});
}
cdl.await();
System.out.println("Go!");
executorService.shutdown();
}
public static void run(int cur) throws InterruptedException {
Thread.sleep(100);
System.out.println(cur);
}
}
使用CountdownLatch還可以實(shí)現(xiàn)限時(shí)任務(wù),在await()方法中可以傳入?yún)?shù),如下表示當(dāng)前線(xiàn)程只會(huì)等待10毫秒,超過(guò)這個(gè)時(shí)間就不再等待而是恢復(fù)執(zhí)行。
cdl.await(10, TimeUnit.MICROSECONDS);
將上面的程序cdl.await();修改成cdl.await(10, TimeUnit.MICROSECONDS);,將最先打印"Go!"因?yàn)樗坏却?0毫秒,而其他線(xiàn)程每次執(zhí)行前都會(huì)sleep100毫秒。
信號(hào)量 - Semaphore
可以控制某個(gè)資源可以有多少個(gè)線(xiàn)程同時(shí)訪(fǎng)問(wèn),即可以控制并發(fā)量。主要通過(guò)acquire和release方法,
-
acquire():獲得一個(gè)許可,若無(wú)法獲得會(huì)一直等待直到有線(xiàn)程釋放了許可或者被中斷 -
tryAcquire():嘗試獲得一個(gè)許可,獲取成功返回true,失敗返回false,不會(huì)等待立即返回 -
release():釋放一個(gè)許可
acquire()和release()還可以傳入?yún)?shù),指定一次獲得/釋放的許可數(shù);tryAcquire()還可以指定時(shí)間,在指定時(shí)間內(nèi)沒(méi)有獲得到許可就返回false。
循環(huán)柵欄 - CyclicBarrier
cyclicBarrier強(qiáng)調(diào)線(xiàn)程之間互相等待

CountdownLatch和CyclicBarrier的區(qū)別如下:
- CountDownLatch強(qiáng)調(diào)一個(gè)線(xiàn)程等待其他所有線(xiàn)程,通過(guò)cdl.await()讓當(dāng)前線(xiàn)程等待在倒計(jì)數(shù)器上,每有一個(gè)線(xiàn)程執(zhí)行完,cdl.countDown(),將計(jì)數(shù)減1,減到0時(shí)通知當(dāng)前線(xiàn)程執(zhí)行。簡(jiǎn)單的說(shuō)就是一個(gè)線(xiàn)程等待,直到他所等待的其他線(xiàn)程都執(zhí)行完成,當(dāng)前線(xiàn)程才可以繼續(xù)執(zhí)行。
- cyclicBarrier強(qiáng)調(diào)線(xiàn)程之間互相等待,只要有一個(gè)線(xiàn)程還沒(méi)到來(lái),所有線(xiàn)程會(huì)一起等待??梢詡魅胍粋€(gè)Runnable作為計(jì)數(shù)完成要執(zhí)行的任務(wù)。每有一個(gè)線(xiàn)程調(diào)用cyc.await()計(jì)數(shù)減1,減到0時(shí)會(huì)執(zhí)行一次該Runnable。簡(jiǎn)單地說(shuō)就是線(xiàn)程之間互相等待,等所有線(xiàn)程都準(zhǔn)備好,即調(diào)用await()方法之后,執(zhí)行一次Runnable,此時(shí)所有線(xiàn)程開(kāi)始同時(shí)執(zhí)行!
- CountDownLatch的計(jì)數(shù)器只能使用一次。而CyclicBarrier的計(jì)數(shù)器可以使用reset() 方法重置。
下面是一個(gè)例子,CyclicBarrier還可以傳入一個(gè)Runnable, 每次計(jì)數(shù)到的時(shí)候都會(huì)執(zhí)行一次。
package com.shy.concurrency.aqs;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;
/**
* @author Haiyu
* @date 2019/1/3 11:00
*/
@Slf4j
public class CyclicBarrierExample {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(10, () -> System.out.println("集合完畢"));
public static void main(String[] args) throws InterruptedException {
final int num = 10;
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0;i < num; i++) {
final int cur = i;
executorService.execute(() -> {
try {
run(cur);
} catch (Exception e) {
log.error("exception", e);
}
});
}
executorService.shutdown();
}
public static void run(int cur) throws InterruptedException, BrokenBarrierException {
log.info("{} is ready", cur);
cyclicBarrier.await();
log.info("{} is working", cur);
}
}
CyclicBarrier(10, () -> System.out.println("集合完畢"));該句表示10個(gè)線(xiàn)程相互等待,所有10個(gè)線(xiàn)程都調(diào)用了await()方法后,會(huì)執(zhí)行一次傳入的Runnable;之后10個(gè)線(xiàn)程被喚醒,計(jì)數(shù)重新開(kāi)始,這也是Cyclic的意義——可重復(fù)利用的計(jì)數(shù)器。
因此上面程序會(huì)先打印10個(gè)ready,然后打印一次“集合完畢”;之后會(huì)打印10個(gè)working。
重入鎖 - ReentrantLock
synchronized是JVM的內(nèi)置鎖,而重入鎖是Java代碼實(shí)現(xiàn)的。重入鎖是synchronized的擴(kuò)展,可以完全代替后者。重入鎖可以重入,允許同一個(gè)線(xiàn)程連續(xù)多次獲得同一把鎖。其次,重入鎖獨(dú)有的功能有:
- 可以相應(yīng)中斷,synchronized要么獲得鎖執(zhí)行,要么保持等待。而重入鎖可以響應(yīng)中斷,使得線(xiàn)程在遲遲得不到鎖的情況下,可以不再等待。主要由
lockInterruptibly()實(shí)現(xiàn),這是一個(gè)可以對(duì)中斷進(jìn)行響應(yīng)的鎖申請(qǐng)動(dòng)作,鎖中斷可以避免死鎖。 - 鎖的申請(qǐng)可以有等待時(shí)限,用
tryLock()可以實(shí)現(xiàn)限時(shí)等待,如果超時(shí)還未獲得鎖會(huì)返回false,也防止了線(xiàn)程遲遲得不到鎖時(shí)一直等待,可避免死鎖。 - 公平鎖,即鎖的獲得按照線(xiàn)程先來(lái)后到的順序依次獲得,不會(huì)產(chǎn)生饑餓現(xiàn)象。synchronized的鎖默認(rèn)是不公平的,重入鎖可通過(guò)傳入構(gòu)造方法的參數(shù)實(shí)現(xiàn)公平鎖。
- 重入鎖可以綁定多個(gè)Condition條件,這些condition通過(guò)調(diào)用await/singal實(shí)現(xiàn)線(xiàn)程間通信??梢詫?shí)現(xiàn)分組喚醒需要喚醒的線(xiàn)程,而不是像synchronized的wait/notify一樣要么隨機(jī)喚醒要給要么喚醒全部。
讀寫(xiě)鎖 - ReentrantReadWriteLock
ReadWriteLock即讀寫(xiě)鎖,它有兩個(gè)方法如下,分別返回一個(gè)讀鎖和寫(xiě)鎖,即讀寫(xiě)鎖分離。
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();
在讀時(shí)使用readLock進(jìn)行加鎖,在寫(xiě)時(shí)使用writeLock進(jìn)行加鎖。使得讀-讀不阻塞,讀線(xiàn)程完全并行,適合讀多寫(xiě)少的場(chǎng)合。讀鎖會(huì)完全阻塞寫(xiě)鎖,它使用的依然是悲觀(guān)的鎖策略。如果有大量的讀線(xiàn)程,也有可能引起寫(xiě)線(xiàn)程的饑餓。(如果想獲得寫(xiě)鎖,不允許還有讀鎖、寫(xiě)鎖還保持著,即 在沒(méi)有任何讀鎖、寫(xiě)鎖的情況下才能獲得寫(xiě)鎖)