1.概述
同步工具類可以是任何一個類,只要它根據(jù)其自身的狀態(tài)來協(xié)調(diào)線程的控制流。阻塞隊列可以作為同步工具類,其他類型的同步工具還包括信號量(Semaphore),柵欄(Latch),閉鎖(Latch).
2.閉鎖
閉鎖可以延遲線程的進度直到到達終止狀態(tài),閉鎖的作用相當于一扇門:在閉鎖到達結束狀態(tài)之前,這扇門一直是關閉的,并且沒有任何線程能通過,當?shù)竭_結束狀態(tài)時,這扇門會打開并允許所有的線程通過。
CountDownLatch
構造器:
CountDownLatch的構造函數(shù)接受一個int類型的參數(shù)作為計數(shù)器,如果你想要等待N個點完成,這里就傳入N。
public CountDownLatch(int var1) {
if (var1 < 0) {
throw new IllegalArgumentException("count < 0");
} else {
this.sync = new CountDownLatch.Sync(var1);
}
}
| 方法 | 說明 |
|---|---|
| countDown() | 每次調(diào)用該方法N就會減一 |
| await() | 在計數(shù)到達零之前,該方法會阻塞當前線程。之后會釋放所有等待的線程,所有的后續(xù)調(diào)用都將立即返回。 |
| await(long time,TimeUnit unit) | 這個方法等待特定時間后,就不會阻塞當前線程。 |
CountDownLatch計數(shù)無法被重置。如果需要重置計數(shù),請考慮使用CyclicBarria。 由于countDown()方法可以用在任何地方,所以這里說的N個點可以是N個線程,也可以是1個線程里的N個執(zhí)行步驟。
簡單入門:
public class CountDownLatchTest {
private static CountDownLatch c = new CountDownLatch(2);
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
System.out.println(1);
c.countDown();
System.out.println(2);
c.countDown();
}).start();
c.await();
System.out.println(3);
}
}
輸出
1
2
3
進階用法:
下面給出了兩個類,其中一組 worker 線程使用了兩個倒計數(shù)鎖存器:
- 第一個類是一個啟動信號,在 driver 為繼續(xù)執(zhí)行 worker 做好準備之前,它會阻止所有的 worker 繼續(xù)執(zhí)行。
- 第二個類是一個完成信號,它允許 driver 在完成所有 worker 之前一直等待。
public class Worker implements Runnable {
private CountDownLatch startSignal;
private CountDownLatch doneSignal;
Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
}
public void run() {
try {
startSignal.await();
work();
doneSignal.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void work() {
}
}
public class Driver {
public static void main(String[] args) throws InterruptedException {
CountDownLatch startSignal = new CountDownLatch(1);//開始信號
CountDownLatch doneSignal = new CountDownLatch(5);//結束信號
for (int i = 0; i < 5; i++) {
new Thread(
new Worker(startSignal, doneSignal)
).start();
}
doSomethingElse("before startSignal");
startSignal.countDown();//開始所有工作
doSomethingElse("before doneSignal");
doneSignal.await();//等待所有的工作結束
doSomethingElse("done!");
}
public static void doSomethingElse(String str) {
System.out.println(str);
}
}
輸出:
before startSignal
before doneSignal
done!
3.信號量
Semaphore
一個計數(shù)信號量。從概念上講,信號量維護了一個許可集。如有必要,在許可可用前會阻塞每一個 acquire(),然后再獲取該許可。每個 release() 添加一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可對象,Semaphore 只對可用許可的號碼進行計數(shù),并采取相應的行動。
計算信號量的一種簡化形式是二值信號量,即初始值為1的Semaphore,二值信號量可以用做互斥體,并具備不可重入的加鎖語義。
Semaphore 通常用于限制可以訪問某些資源(物理或邏輯的)的線程數(shù)目。例如:
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
executorService.execute(() -> {
try {
s.acquire();
System.out.println("save data thread=" + Thread.currentThread().getName());
s.release();
} catch (InterruptedException e) {
}
});
}
}
}
代碼中,雖然有30個線程在執(zhí)行,但是只允許10個并發(fā)執(zhí)行。Semaphore的構造方法 Semaphore(int permits)接受一個整型的數(shù)字,表示可用的許可證數(shù)量。Semaphore(10)表示允 許10個線程獲取許可證,也就是最大并發(fā)數(shù)是10。Semaphore的用法也很簡單,首先線程使用 Semaphore的acquire()方法獲取一個許可證,使用完之后調(diào)用release()方法歸還許可證。還可以 用tryAcquire()方法嘗試獲取許可證。
4.柵欄(屏障)
CyclicBarrier
字面意思是可循環(huán)使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫做同步點)時阻塞,直到最后一個線程到達屏障時,屏障才會打開,所有被屏障攔截的線程才會繼續(xù)執(zhí)行。同時,Barrier在釋放等待線程之后可以重用。
CyclicBarrier支持一個可選的Runnable命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續(xù)所有參與線程之前更新共享狀態(tài),此屏障操作 很有用。
柵欄與閉鎖的關鍵區(qū)別:
- 閉鎖是一次性對象,柵欄可以重用,也可以reset()重置。
- 所有線程必須都到達柵欄位置,才能繼續(xù)執(zhí)行。閉鎖用于等待事件,而柵欄用于等待其他線程。
示例用法:
用戶多線程計算數(shù)據(jù),最后合并計算結果的場景。
一個Excel保存了用戶所有的銀行流水,每個Sheet保存了一個賬戶近一年的每筆銀行流水,現(xiàn)在需要統(tǒng)計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水,都執(zhí)行完了之后,得到每個sheet的日均銀行流水,最后,再用barrierAction用這些線程的計算結果,計算整個Excel的日均銀行流水。
import java.util.Map;
import java.util.concurrent.*;
/**
* Created by dachao on 16-7-28.
*/
public class BankWaterService implements Runnable {
/*
創(chuàng)建4個屏障,處理完之后執(zhí)行當前類的run方法
*/
private CyclicBarrier c = new CyclicBarrier(4, this);
/*
假設只有4個sheet,所以只啟動4個線程
*/
private Executor executor = Executors.newFixedThreadPool(4);
/*
保存每個sheet計算出的銀流結果
*/
private ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<>();
private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(() -> {
//計算當前sheet的銀行數(shù)據(jù),代碼省略
count.put(Thread.currentThread().getName(), 1);
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
}
@Override
public void run() {
int result = 0;
//匯總每個sheet計算出的結果
for (Map.Entry<String, Integer> sheet : count.entrySet()) {
result += sheet.getValue();
}
//將結果輸出
count.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService service = new BankWaterService();
service.count();
}
}
輸出結果:
4
5.線程間交換數(shù)據(jù)的Exchanger
Exchanger(交換者)
是一個用于線程間協(xié)作的工具類。Exchanger用于進行線程間的數(shù)據(jù)交 換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數(shù)據(jù)。這兩個線程通過 exchange方法交換數(shù)據(jù),如果第一個線程先執(zhí)行exchange()方法,它會一直等待第二個線程也 執(zhí)行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數(shù)據(jù),將本線程生產(chǎn) 出來的數(shù)據(jù)傳遞給對方。
Exchanger可以用于遺傳算法,遺傳算法里需要選出兩個人作為交配對象,這時候會交換 兩人的數(shù)據(jù),并使用交叉規(guī)則得出2個交配結果。Exchanger也可以用于校對工作,比如我們需 要將紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行 錄入,錄入到Excel之后,系統(tǒng)需要加載這兩個Excel,并對兩個Excel數(shù)據(jù)進行校對,看看是否 錄入一致。
import java.util.Map;
import java.util.concurrent.*;
/**
* Created by dachao on 16-7-28.
*/
public class BankWaterService implements Runnable {
/*
創(chuàng)建4個屏障,處理完之后執(zhí)行當前類的run方法
*/
private CyclicBarrier c = new CyclicBarrier(4, this);
/*
假設只有4個sheet,所以只啟動4個線程
*/
private ExecutorService executor = Executors.newFixedThreadPool(4);
/*
保存每個sheet計算出的銀流結果
*/
private ConcurrentHashMap<String, Integer> count = new ConcurrentHashMap<>();
private void count() {
for (int i = 0; i < 4; i++) {
executor.execute(() -> {
//計算當前sheet的銀行數(shù)據(jù),代碼省略
count.put(Thread.currentThread().getName(), 1);
try {
c.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
});
}
executor.shutdown();
}
@Override
public void run() {
int result = 0;
//匯總每個sheet計算出的結果
for (Map.Entry<String, Integer> sheet : count.entrySet()) {
result += sheet.getValue();
}
//將結果輸出
count.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService service = new BankWaterService();
service.count();
}
}
輸出:
4