Java 下多線程的開發(fā)我們可以自己?jiǎn)⒂枚嗑€程,線程池,除此之外,Java還為我們提供了Fork-Join、CountDownLatch、CyclicBarrier等并發(fā)工具類。掌握并使用它們,有助于我們?cè)谶M(jìn)行并發(fā)線程開發(fā)的過程中,更加得心應(yīng)手。
Fork-Join
Fork-Join是一個(gè)使用多線程的并發(fā)工具類,可以讓我們不去了解諸如Thread,Runnable 等相關(guān)的知識(shí),只要遵循forkjoin的開發(fā)模式,就可以寫出很好的多線程并發(fā)程序,
分而治之
forkjoin 在處理某一類問題時(shí)非常的有用,哪一類問題?分而治之的問題。分治法的設(shè)計(jì)思想是:將一個(gè)難以直接解決的大問題,分割成一些規(guī)模較小的相同問題,以便各個(gè)擊破,分而治之。
分治策略是:對(duì)于一個(gè)規(guī)模為n 的問題,若該問題可以容易地解決(比如說規(guī)模n 較小)則直接解決,否則將其分解為k 個(gè)規(guī)模較小的子問題,這些子問題互相獨(dú)立且與原問題形式相同(子問題相互之間有聯(lián)系就會(huì)變?yōu)閯?dòng)態(tài)規(guī)范算法),遞歸地解這些子問題,然后將各子問題的解合并得到原問題的解。這種算法設(shè)計(jì)策略叫做分治法。
歸并排序
歸并排序是建立在歸并操作上的一種有效的排序算法。該算法是采用分治法的一個(gè)非常典型的應(yīng)用。將已有序的子序列合并,得到完全有序的序列;即先使每個(gè)子序列有序,再使子序列段間有序。若將兩個(gè)有序表合并成一個(gè)有序表,稱為2-路歸并,與之對(duì)應(yīng)的還有多路歸并。
對(duì)于給定的一組數(shù)據(jù),利用遞歸與分治技術(shù)將數(shù)據(jù)序列劃分成為越來越小的半子表,在對(duì)半子表排序后,再用遞歸方法將排好序的半子表合并成為越來越大的有序序列。為了提升性能,有時(shí)我們?cè)诎胱颖淼膫€(gè)數(shù)小于某個(gè)數(shù)(比如15)的情況下,對(duì)半子表的排序采用其他排序算法,比如插入排序。
// 歸并排序,遞歸實(shí)現(xiàn)
public static void sortMergeRecursion(int[] nums) {
sortMergeRecursionHelper(nums, 0, nums.length - 1);
}
public static void sortMergeRecursionHelper(int[] nums,int left, int right) {
if(left == right) return; // 當(dāng)待排序的序列長(zhǎng)度為1時(shí),遞歸開始回溯,進(jìn)行merge
int middle = left + (right - left) / 2;
sortMergeRecursionHelper(nums, left, middle);
sortMergeRecursionHelper(nums, middle + 1, right);
mergeArr(nums, left, middle, right);
}
public static void mergeArr(int[] nums, int left, int middle, int right) {
int[] tem = new int[right - left + 1];
int i = left, j = middle + 1, k = 0;
while(i <= middle && j <= right) {
tem[k++] = nums[i] < nums[j]? nums[i++] : nums[j++];
}
while(i <= middle) {
tem[k++] = nums[i++];
}
while(j <= right) {
tem[k++] = nums[j++];
}
// 將輔助數(shù)組數(shù)據(jù)寫入原數(shù)組
int index = 0;
while(left <= right) {
nums[left++] = tem[index++];
}
}
Fork-Join 原理
Fork/Join框架,就是在必要的情況下,將一個(gè)大任務(wù),進(jìn)行拆分(fork)成若干個(gè)小任務(wù)(拆到不可以再拆時(shí)),再將一個(gè)個(gè)小任務(wù)運(yùn)算的結(jié)果進(jìn)行Join匯總。除了分而治之之外,F(xiàn)ork-Join還有一種重要的思想,那就是工作密取。
工作密取
即當(dāng)前線程的Task已經(jīng)全被執(zhí)行完畢,則自動(dòng)取到其他線程的Task池中取出Task 繼續(xù)執(zhí)行。ForkJoinPool 中維護(hù)著多個(gè)線程(一般為CPU 核數(shù))在不斷地執(zhí)行Task,每個(gè)線程除了執(zhí)行自己職務(wù)內(nèi)的Task之外,還會(huì)根據(jù)自己工作線程的閑置情況去獲取其他繁忙的工作線程的Task,如此一來就能能夠減少線程阻塞或是閑置的時(shí)間,提高CPU利用率。
Fork/Join 實(shí)戰(zhàn)
Fork/Join 使用的標(biāo)準(zhǔn)范式
我們要使用ForkJoin 框架,必須首先創(chuàng)建一個(gè)ForkJoin 任務(wù)。它提供在任務(wù)中執(zhí)行fork 和join 的操作機(jī)制,通常我們不直接繼承ForkjoinTask 類,只需要直接繼承其子類。
- RecursiveAction,用于沒有返回結(jié)果的任務(wù)
- RecursiveTask,用于有返回值的任務(wù)
task 要通過ForkJoinPool 來執(zhí)行,使用submit() 或invoke() 提交,兩者的區(qū)別是:invoke() 是同步執(zhí)行,調(diào)用之后需要等待任務(wù)完成,才能執(zhí)行后面的代碼;submit()是異步執(zhí)行。join()和get()方法當(dāng)任務(wù)完成的時(shí)候返回計(jì)算結(jié)果。
在我們自己實(shí)現(xiàn)的compute()方法里,首先需要判斷任務(wù)是否足夠小,如果足夠小就直接執(zhí)行任務(wù)。如果不足夠小,就必須分割成兩個(gè)子任務(wù),每個(gè)子任務(wù)在調(diào)用invokeAll()方法時(shí),又會(huì)進(jìn)入compute()方法,看看當(dāng)前子任務(wù)是否需要繼續(xù)分割成孫任務(wù),如果不需要繼續(xù)分割,則執(zhí)行當(dāng)前子任務(wù)并返回結(jié)果。使用join方法會(huì)等待子任務(wù)執(zhí)行完并得到其結(jié)果。
同步用法(有返回值)
在這里我們使用Fork/Join的同步用法統(tǒng)計(jì)整型數(shù)組中所有元素的和。
MakeArray是一個(gè)用于生成隨機(jī)數(shù)組的工具類。
先來看單線程計(jì)算:
public class SumNormal {
public static void main(String[] args) {
int count = 0;
int[] src = MakeArray.makeArray();
long start = System.currentTimeMillis();
for(int i= 0;i<src.length;i++){
// SleepTools.ms(1);
count = count + src[i];
}
System.out.println("The count is "+count
+" spend time:"+(System.currentTimeMillis()-start)+"ms");
}
}
運(yùn)行打印結(jié)果:The count is 23995323 spend time:0ms。
我們?cè)賮砜词褂肍ork/Join來計(jì)算:
public class SumArray {
private static class SumTask extends RecursiveTask<Integer> {
//拆分的閾值
private final static int THRESHOLD = MakeArray.ARRAY_LENGTH / 10;
private int[] src;
private int fromIndex;
private int toIndex;
public SumTask(int[] src, int fromIndex, int toIndex) {
this.src = src;
this.fromIndex = fromIndex;
this.toIndex = toIndex;
}
@Override
protected Integer compute() {
//任務(wù)的大小是否合適
if (toIndex - fromIndex < THRESHOLD) {
System.out.println(" from index = " + fromIndex + " toIndex=" + toIndex);
int count = 0;
for (int i = fromIndex; i <= toIndex; i++) {
// SleepTools.ms(1);
count = count + src[i];
}
return count;
} else {
//fromIndex....mid.....toIndex
int mid = (fromIndex + toIndex) / 2;
SumTask left = new SumTask(src, fromIndex, mid);
SumTask right = new SumTask(src, mid + 1, toIndex);
invokeAll(left, right);
return left.join() + right.join();
}
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
int[] src = MakeArray.makeArray();
SumTask innerFind = new SumTask(src, 0, src.length - 1);
long start = System.currentTimeMillis();
pool.invoke(innerFind);
//System.out.println("Task is Running.....");
System.out.println("The count is " + innerFind.join()
+ " spend time:" + (System.currentTimeMillis() - start) + "ms");
}
}
運(yùn)行打印結(jié)果:The count is 23704295 spend time:2ms。
從結(jié)果來看,我們使用Fork/Join,還不如使用單線程。為什么會(huì)這樣,F(xiàn)ork/Join的Task是繼承自RecursiveTask的,Recursive是遞歸的意思,既然是遞歸,那么就涉及到方法的出棧和壓棧。另一方面由于是多線程任務(wù),那么就會(huì)有線程的上下文切換,由于這是CPU計(jì)算密集型任務(wù),自然而然就會(huì)比單線程的慢。
既然這樣,我們?cè)诖a中加上一句SleepTools.ms(1);,再來看結(jié)果如何呢。
單線程:The count is 23973773 spend time:6912ms
Fork/Join:The count is 24007429 spend time:920ms
這個(gè)時(shí)間比單線程快了將近7倍吧,對(duì)于單線程來說,每次相加休眠1ms,那么4000個(gè)數(shù)的累加就休眠將近4000ms。而對(duì)于Fork/Join來說,由于分而治之的思想,線程休眠1ms影響微小。當(dāng)單線程任務(wù)耗時(shí)變長(zhǎng)的情況下,F(xiàn)ork/Join的性能優(yōu)勢(shì)就體現(xiàn)出來了。
因此我們?cè)谑褂枚嗑€程開發(fā)的時(shí)候,要考慮清楚線程任務(wù)的性質(zhì),再做選擇。
異步用法(無返回值)
在這里我們使用Fork/Join的異步用法遍歷指定目錄,尋找指定類型文件
/**
* 類說明:遍歷指定目錄(含子目錄)找尋指定類型文件
*/
public class FindDirsFiles extends RecursiveAction {
private File path;
public FindDirsFiles(File path) {
this.path = path;
}
@Override
protected void compute() {
List<FindDirsFiles> subTasks = new ArrayList<>();
File[] files = path.listFiles();
if (files != null) {
for (File file : files) {
if (file.isDirectory()) {
// 對(duì)每個(gè)子目錄都新建一個(gè)子任務(wù)。
subTasks.add(new FindDirsFiles(file));
} else {
// 遇到文件,檢查。
if (file.getAbsolutePath().endsWith("txt")) {
System.out.println("文件:" + file.getAbsolutePath());
}
}
}
if (!subTasks.isEmpty()) {
// 在當(dāng)前的 ForkJoinPool 上調(diào)度所有的子任務(wù)。
for (FindDirsFiles subTask : invokeAll(subTasks)) {
subTask.join();
}
}
}
}
public static void main(String[] args) {
try {
// 用一個(gè) ForkJoinPool 實(shí)例調(diào)度總?cè)蝿?wù)
ForkJoinPool pool = new ForkJoinPool();
FindDirsFiles task = new FindDirsFiles(new File("F:/"));
//異步提交
pool.execute(task);
System.out.println("Task is Running......");
Thread.sleep(1);
//主線程做自己的工作
int otherWork = 0;
for (int i = 0; i < 100; i++) {
otherWork = otherWork + i;
}
System.out.println("Main Thread done sth......,otherWork=" + otherWork);
//阻塞方法
task.join();
System.out.println("Task end");
} catch (Exception e) {
e.printStackTrace();
}
}
}
CountDownLatch
CountDownLatch,也稱之為閉鎖,CountDownLatch 這個(gè)類能夠使一個(gè)線程等待其他線程完成各自的工作后再執(zhí)行。例如,應(yīng)用程序的主線程希望在負(fù)責(zé)啟動(dòng)框架服務(wù)的線程已經(jīng)啟動(dòng)所有的框架服務(wù)之后再執(zhí)行(初始化)。
CountDownLatch 是通過一個(gè)計(jì)數(shù)器來實(shí)現(xiàn)的,計(jì)數(shù)器的初始值為初始任務(wù)的數(shù)量。每當(dāng)完成了一個(gè)任務(wù)后,計(jì)數(shù)器的值就會(huì)減1(CountDownLatch.countDown()方法)。當(dāng)計(jì)數(shù)器值到達(dá)0 時(shí),它表示所有的已經(jīng)完成了任務(wù),然后在閉鎖上等待CountDownLatch.await()方法的線程就可以恢復(fù)執(zhí)行任務(wù)。
基本使用
構(gòu)造方法
- CountDownLatch(int count) 構(gòu)造一個(gè)以給定計(jì)數(shù) CountDownLatch CountDownLatch。
方法:
- await() 當(dāng)前線程等到鎖存器計(jì)數(shù)到零,除非線程是 interrupted 。
- await(long timeout, TimeUnit unit) 使當(dāng)前線程等待直到鎖存器計(jì)數(shù)到零為止,除非線程為 interrupted或指定的等待時(shí)間過去。
- countDown() 減少鎖存器的計(jì)數(shù),如果計(jì)數(shù)達(dá)到零,釋放所有等待的線程。
- getCount() 返回當(dāng)前計(jì)數(shù)。
實(shí)例演示:
/**
*類說明:演示CountDownLatch用法,
* 共5個(gè)初始化子線程,6個(gè)閉鎖扣除點(diǎn),扣除完畢后,主線程和業(yè)務(wù)線程才能繼續(xù)執(zhí)行
*/
public class UseCountDownLatch {
static CountDownLatch latch = new CountDownLatch(6);
/**
* 初始化線程
*/
private static class InitThread implements Runnable{
@Override
public void run() {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work......");
latch.countDown();
for(int i =0;i<2;i++) {
System.out.println("Thread_"+Thread.currentThread().getId()
+" ........continue do its work");
}
}
}
/**
* 業(yè)務(wù)線程等待latch的計(jì)數(shù)器為0 完成
*/
private static class BusiThread implements Runnable{
@Override
public void run() {
try{
latch.await();
for(int i =0;i<3;i++) {
System.out.println("BusiThread_"+Thread.currentThread().getId()
+" do business-----");
}
}catch (Exception e) {
}
}
}
public static void main(String[] args) throws InterruptedException {
new Thread(new Runnable() {
@Override
public void run() {
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 1st......");
latch.countDown();
System.out.println("begin step 2nd.......");
SleepTools.ms(1);
System.out.println("Thread_"+Thread.currentThread().getId()
+" ready init work step 2nd......");
latch.countDown();
}
}).start();
new Thread(new BusiThread()).start();
for(int i=0;i<=3;i++){
Thread thread = new Thread(new InitThread());
thread.start();
}
latch.await();
System.out.println("Main do ites work........");
}
}
從代碼來看,等待的線程可以是多個(gè),且可以在主線程和異步線程中使用latch.await();,而且同一個(gè)線程里面,也可以多次調(diào)用latch.countDown();。在使用的時(shí)候,我們也要注意,如果閉鎖扣除點(diǎn)大于實(shí)際扣減數(shù),那么等待線程就會(huì)一直等下去,所以一定要計(jì)算好閉鎖初始化值和扣減次數(shù)。
CountDownLatch的不足
CountDownLatch是一次性的,計(jì)數(shù)器的值只能在構(gòu)造方法中初始化一次,之后沒有任何機(jī)制再次對(duì)其設(shè)置值,當(dāng)CountDownLatch使用完畢后,它不能再次被使用。
CyclicBarrier
CyclicBarrier,也稱為循環(huán)阻塞,允許一組線程全部等待彼此達(dá)到共同屏障點(diǎn)的同步輔助。循環(huán)阻塞在涉及固定大小的線程的程序中很有用,這些線程必須偶爾等待彼此。阻塞之所以被稱為循環(huán),因?yàn)樗梢栽诘却木€程被釋放之后重新使用。
基本使用
構(gòu)造方法
- CyclicBarrier(int parties)
創(chuàng)建一個(gè)新的 CyclicBarrier ,當(dāng)給定數(shù)量的線程(線程)等待它時(shí),它將跳閘,并且當(dāng)屏障跳閘時(shí)不執(zhí)行預(yù)定義的動(dòng)作。 - CyclicBarrier(int parties, Runnable barrierAction)
創(chuàng)建一個(gè)新的 CyclicBarrier ,當(dāng)給定數(shù)量的線程(線程)等待時(shí),它將跳閘,當(dāng)屏障跳閘時(shí)執(zhí)行給定的屏障動(dòng)作,由最后一個(gè)進(jìn)入屏障的線程執(zhí)行。
方法
- int await() 等待所有 parties已經(jīng)在這個(gè)障礙上調(diào)用了 await 。
- int await(long timeout, TimeUnit unit) 等待所有 parties已經(jīng)在此屏障上調(diào)用 await ,或指定的等待時(shí)間過去。
- int getNumberWaiting() 返回目前正在等待障礙的各方的數(shù)量。
- int getParties() 返回旅行這個(gè)障礙所需的parties數(shù)量。
- boolean isBroken() 查詢這個(gè)障礙是否處于破碎狀態(tài)。
- void reset() 將屏障重置為初始狀態(tài)。
實(shí)例演示
/**
* 類說明:演示CyclicBarrier用法,共5個(gè)子線程,他們?nèi)客瓿晒ぷ骱螅怀鲎约航Y(jié)果,
* 再被統(tǒng)一釋放去做自己的事情,而交出的結(jié)果被另外的線程拿來拼接字符串
*/
public class UseCyclicBarrier {
static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new CollectRunnable());
private static ConcurrentHashMap<String, Long> resultMap
= new ConcurrentHashMap<>();//存放子線程工作結(jié)果的容器
public static void main(String[] args) {
for (int i = 0; i <= 4; i++) {
Thread thread = new Thread(new SubRunnable());
thread.start();
}
}
private static class CollectRunnable implements Runnable {
@Override
public void run() {
StringBuilder result = new StringBuilder();
for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {
result.append("[" + workResult.getValue() + "]");
}
System.out.println(" the result = " + result);
System.out.println("do other business........");
}
}
private static class SubRunnable implements Runnable {
@Override
public void run() {
long id = Thread.currentThread().getId();
try {
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " ....do something ");
resultMap.put(Thread.currentThread().getId() + "", id);
cyclicBarrier.await();
Thread.sleep(1000 + id);
System.out.println("Thread_" + id + " ....do its business ");
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
我們定義了兩個(gè)Runnable,SubRunnable和CollectRunnable,然后定義了CyclicBarrier對(duì)象,阻塞計(jì)數(shù)初始值為5,并設(shè)定屏障動(dòng)作為CollectRunnable。在SubRunnable中調(diào)用cyclicBarrier.await();使屏障點(diǎn)加1,在主線程中新啟5個(gè)線程,分別傳入SubRunnable任務(wù)。
運(yùn)行代碼,可以觀察到執(zhí)行SubRunnable的線程會(huì)在調(diào)用完await()后,進(jìn)入阻塞等待狀態(tài),直到所有線程都到達(dá)屏障點(diǎn),即到達(dá)屏障點(diǎn)的線程數(shù)等于5了,達(dá)到初始化設(shè)定的值,最后一個(gè)到達(dá)屏障點(diǎn)的線程就會(huì)執(zhí)行去執(zhí)行CollectRunnable中的邏輯,然后各個(gè)線程才會(huì)去執(zhí)行各自任務(wù)接下來的代碼。
Thread_9 ....do something
Thread_10 ....do something
Thread_11 ....do something
Thread_12 ....do something
Thread_13 ....do something
Thread_13 the result = [11][12][13][9][10]
Thread_13 do other business........
Thread_9 ....do its business
Thread_10 ....do its business
Thread_11 ....do its business
Thread_12 ....do its business
Thread_13 ....do its business
Process finished with exit code 0
CountDownLatch和CyclicBarrier的比較
- CountDownLatch是線程組之間的等待,即一個(gè)(或多個(gè))線程等待N個(gè)線程完成某件事情之后再執(zhí)行;而CyclicBarrier則是線程組內(nèi)的等待,即每個(gè)線程相互等待,即N個(gè)線程都被攔截之后,然后依次執(zhí)行。
- CountDownLatch是減計(jì)數(shù)方式,而CyclicBarrier是加計(jì)數(shù)方式。
- CountDownLatch計(jì)數(shù)為0無法重置,而CyclicBarrier計(jì)數(shù)達(dá)到初始值,則可以重置。
- CountDownLatch不可以復(fù)用,而CyclicBarrier可以復(fù)用。
Semaphore
基本使用
Semaphore,也稱之為信號(hào)量,信號(hào)量就相當(dāng)于一個(gè)計(jì)數(shù)器,通常用來限制線程的數(shù)量。它也是一個(gè)線程同步的輔助類,可以維護(hù)當(dāng)前訪問自身的線程個(gè)數(shù),并提供了同步機(jī)制。使用Semaphore可以控制同時(shí)訪問資源的線程個(gè)數(shù),例如,實(shí)現(xiàn)一個(gè)文件允許的并發(fā)訪問數(shù)。
構(gòu)造方法
- public Semaphore(int permits)
- public Semaphore(int permits, boolean fair) 可以提供了公平和非公平兩種策略
主要方法
- void acquire() 從此信號(hào)量獲取一個(gè)許可,在提供一個(gè)許可前一直將線程阻塞,否則線程被中斷。
- void release() 釋放一個(gè)許可,將其返回給信號(hào)量。
- int availablePermits() 返回此信號(hào)量中當(dāng)前可用的許可數(shù)。
- boolean hasQueuedThreads() 查詢是否有線程正在等待獲取。
實(shí)例演示
我們用一個(gè)連接池獲取連接的例子來演示Semaphore的使用:
DBPoolSemaphore.java
/**
* 類說明:演示Semaphore用法,一個(gè)數(shù)據(jù)庫(kù)連接池的實(shí)現(xiàn)
*/
public class DBPoolSemaphore {
private final static int POOL_SIZE = 10;
private final Semaphore sp;
//存放數(shù)據(jù)庫(kù)連接的容器
private static LinkedList<Connection> pool = new LinkedList<Connection>();
//初始化池
static {
for (int i = 0; i < POOL_SIZE; i++) {
pool.addLast(SqlConnectImpl.fetchConnection());
}
}
public DBPoolSemaphore() {
sp = new Semaphore(10);
}
/*歸還連接*/
public void returnConnect(Connection connection) throws InterruptedException {
if (connection != null) {
System.out.println("當(dāng)前有" + sp.getQueueLength() + "個(gè)線程等待數(shù)據(jù)庫(kù)連接!!"
+ "可用連接數(shù):" + sp.availablePermits());
synchronized (pool) {
pool.addLast(connection);
}
sp.release();
}
}
/*從池子拿連接*/
public Connection takeConnect() throws InterruptedException {
sp.acquire();
Connection connection;
synchronized (pool) {
connection = pool.removeFirst();
}
return connection;
}
}
SemaphoreTest.java
/**
* 類說明:測(cè)試數(shù)據(jù)庫(kù)連接池
*/
public class AppTest {
private static DBPoolSemaphore dbPool = new DBPoolSemaphore();
private static class BusinessThread extends Thread {
@Override
public void run() {
Random r = new Random();//讓每個(gè)線程持有連接的時(shí)間不一樣
long start = System.currentTimeMillis();
try {
Connection connect = dbPool.takeConnect();
System.out.println("Thread_" + Thread.currentThread().getId()
+ "_獲取數(shù)據(jù)庫(kù)連接共耗時(shí)【" + (System.currentTimeMillis() - start) + "】ms.");
SleepTools.ms(100 + r.nextInt(100));//模擬業(yè)務(wù)操作,線程持有連接查詢數(shù)據(jù)
System.out.println("查詢數(shù)據(jù)完成,歸還連接!");
dbPool.returnConnect(connect);
} catch (InterruptedException e) {
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 50; i++) {
Thread thread = new BusinessThread();
thread.start();
}
}
在這個(gè)例子,在DBPoolSemaphore類中,我們初始化了一個(gè)有10個(gè)連接的連接池,并new了一個(gè)Semaphore實(shí)例,在主線程中,啟動(dòng)了50個(gè)線程去獲取數(shù)據(jù)庫(kù)連接,獲取的過程需要通過Semaphore.acquire()來獲取,如果獲取不到,則阻塞,直到分配到信號(hào)量許可為止。當(dāng)使用完連接后,則釋放連接,并通過Semaphore.release()歸還信號(hào)量許可。
Semaphore注意事項(xiàng)
在使用Semaphore的過程中,有一點(diǎn)非常需要注意,即使創(chuàng)建信號(hào)量的時(shí)候,指定了信號(hào)量的大小,但是不正當(dāng)使用release()操作釋放信號(hào)量會(huì)使得信號(hào)量超過配置的大小,也就有可能同時(shí)執(zhí)行的線程數(shù)量比最開始設(shè)置的要大。因?yàn)闆]有任何線程獲取信號(hào)量的時(shí)候,依然能夠釋放并且釋放的有效。例如:
private void testRelease(){
Semaphore semaphore = new Semaphore(3);
Runnable runnable = () -> {
try {
System.out.println(Thread.currentThread().getName() + "try acquire");
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "acquired semaphore");
} catch (InterruptedException e) {
e.printStackTrace();
}
};
semaphore.release();
// 4個(gè)線程都能夠獲取到信號(hào)量
for (int i = 0; i < 4; i++) {
new Thread(runnable).start();
}
}
因此,推薦的做法是,保證在使用時(shí),一個(gè)線程先acquire(),然后release()。如果釋放線程和獲取線程不是同一個(gè),那么最好保證這種對(duì)應(yīng)關(guān)系。不要釋放過多的許可證。
Exchange
Exchanger類可用于兩個(gè)線程之間交換信息??珊?jiǎn)單地將Exchanger對(duì)象理解為一個(gè)包含兩個(gè)格子的容器,通過exchanger方法可以向兩個(gè)格子中填充信息。當(dāng)兩個(gè)格子中的均被填充時(shí),該對(duì)象會(huì)自動(dòng)將兩個(gè)格子的信息交換,然后返回給線程,從而實(shí)現(xiàn)兩個(gè)線程的信息交換。
簡(jiǎn)單說就是一個(gè)線程在完成一定的事務(wù)后想與另一個(gè)線程交換數(shù)據(jù),則第一個(gè)先拿出數(shù)據(jù)的線程會(huì)一直等待第二個(gè)線程,直到第二個(gè)線程拿著數(shù)據(jù)到來時(shí)才能彼此交換對(duì)應(yīng)數(shù)據(jù)。其定義為 Exchanger<V> 泛型類型,其中 V 表示可交換的數(shù)據(jù)類型,
基本使用
構(gòu)造方法
- Exchanger():無參構(gòu)造方法。
方法
- V exchange(V v):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷),然后將給定的對(duì)象傳送給該線程,并接收該線程的對(duì)象。
- V exchange(V v, long timeout, TimeUnit unit):等待另一個(gè)線程到達(dá)此交換點(diǎn)(除非當(dāng)前線程被中斷或超出了指定的等待時(shí)間),然后將給定的對(duì)象傳送給該線程,并接收該線程的對(duì)象。
實(shí)例演示:
/**
* 類說明:演示CyclicExchange用法
*/
public class UseExchange {
private static final Exchanger<String> exchange = new Exchanger<String>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
long id = Thread.currentThread().getId();
String data = "A";
try {
System.out.println("Thread [" + id + "] 交換前 : " + data);
data = exchange.exchange(data);
System.out.println("Thread [" + id + "] 交換后 : " + data);
} catch (InterruptedException e) {
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
long id = Thread.currentThread().getId();
String data = "B";
try {
System.out.println("Thread [" + id + "] 交換前 : " + data);
Thread.sleep(2000);
data = exchange.exchange(data);
System.out.println("Thread [" + id + "] 交換后 : " + data);
} catch (InterruptedException e) {
}
}
}).start();
}
}
運(yùn)行結(jié)果如下:
Thread [9] 交換前 : A
Thread [10] 交換前 : B
Thread [10] 交換后 : A
Thread [9] 交換后 : B
可以看出,當(dāng)一個(gè)線程到達(dá) exchange 調(diào)用點(diǎn)時(shí),如果其他線程此前已經(jīng)調(diào)用了此方法,則其他線程會(huì)被調(diào)度喚醒并與之進(jìn)行對(duì)象交換,然后各自返回;如果其他線程還沒到達(dá)交換點(diǎn),則當(dāng)前線程會(huì)被掛起,直至其他線程到達(dá)才會(huì)完成交換并正常返回,或者當(dāng)前線程被中斷或超時(shí)返回。