業(yè)務(wù)場(chǎng)景
在web應(yīng)用開(kāi)發(fā)中我們經(jīng)常會(huì)遇到這樣的場(chǎng)景:一個(gè)請(qǐng)求任務(wù),我們需要去查多個(gè)庫(kù),并對(duì)查詢到的數(shù)據(jù)做處理,此時(shí)如果采用同步的方式去查,往往會(huì)導(dǎo)致請(qǐng)求響應(yīng)時(shí)間過(guò)慢。比如:兩個(gè)查詢?nèi)蝿?wù)task1,task2,task1查詢數(shù)據(jù)要花2s,處理數(shù)據(jù)要花1s;task2查詢數(shù)據(jù)花5s,處理數(shù)據(jù)花2s,那一次請(qǐng)求的時(shí)間是2+1+5+2=10s。而如果我們用異步的方式,則能減少請(qǐng)求響應(yīng)的時(shí)間。
而利用異步的方式,常常子任務(wù)還未執(zhí)行完,主線程就已經(jīng)結(jié)束了,導(dǎo)致數(shù)據(jù)不能很好的返回到前端,所以主線程必須保證所有的子任務(wù)執(zhí)行結(jié)束后才能退出。
接下來(lái)我講討論各種異步方式來(lái)處理這種業(yè)務(wù)場(chǎng)景的方式。
方式一:利用java多線程工具Future.get()獲取數(shù)據(jù)
public class TestFuture {
// 任務(wù)一執(zhí)行2s
public static class Task1 implements Callable {
public Object call() throws Exception {
System.out.println("task1 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task1");
Thread.sleep(2000);
System.out.println("task1 ending ...");
return lists;
}
}
// 任務(wù)一執(zhí)行5s
public static class Task2 implements Callable {
public Object call() throws Exception {
System.out.println("task2 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task2");
Thread.sleep(5000);
System.out.println("task2 ending ...");
return lists;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Long start = System.currentTimeMillis();
int cpuNum = Runtime.getRuntime().availableProcessors();
ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
Future<List> future1 = executor.submit(new Task1());
Future<List> future2 = executor.submit(new Task2());
// 獲取任務(wù)一和任務(wù)二的數(shù)據(jù) 進(jìn)行處理
List<String> lists1 = future1.get();
List<String> lists2 = future2.get();
// ===》分析點(diǎn):
dealTask1Data(lists1);
dealTask2Data(lists2);
System.out.println("task ending");
Long time = System.currentTimeMillis() - start;
System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
}
// 處理任務(wù)1數(shù)據(jù) 處理1s
public static void dealTask1Data(List<String> lists) throws InterruptedException {
System.out.println("deal task1 data ...");
Thread.sleep(1000);
}
// 處理任務(wù)2數(shù)據(jù) 處理2s
public static void dealTask2Data(List<String> lists) throws InterruptedException {
System.out.println("deal task2 data ...");
Thread.sleep(2000);
}
}
執(zhí)行結(jié)果
task1 starting ...
task2 starting ...
task1 ending ...
task2 ending ...
deal task1 data ...
deal task2 data ...
task ending
執(zhí)行任務(wù)所花的時(shí)間:8009s
結(jié)果分析:
查看源碼===》標(biāo)注處,利用future1.get(),future2.get()獲取數(shù)據(jù),需要等到future1和future2所有的數(shù)據(jù)返回后,主線程才能繼續(xù)往下執(zhí)行,所以執(zhí)行到future2.get()的時(shí)間需要5s,而后處理task1數(shù)據(jù)1s,處理task2數(shù)據(jù)2s,執(zhí)行時(shí)間為5+1+2 = 8s。
方式二: 利用CountDownLatch讓主線程等待子線程任務(wù)結(jié)束
public class TestCountDownLatch {
// 任務(wù)一執(zhí)行2s
public static class Task1 implements Callable {
private CountDownLatch countDownLatch;
public Task1(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public Object call() throws Exception {
System.out.println("task1 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task1");
Thread.sleep(2000);
System.out.println("task1 ending ...");
// 對(duì)任務(wù)一的數(shù)據(jù)進(jìn)行處理
dealTask1Data(lists);
// 任務(wù)一結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
countDownLatch.countDown();
return lists;
}
// 處理任務(wù)1數(shù)據(jù)
public void dealTask1Data(List<String> lists) throws InterruptedException {
System.out.println("deal task1 data ...");
Thread.sleep(1000);
}
}
// 任務(wù)一執(zhí)行5s
public static class Task2 implements Callable {
private CountDownLatch countDownLatch;
public Task2(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
public Object call() throws Exception {
System.out.println("task2 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task2");
Thread.sleep(5000);
System.out.println("task2 ending ...");
// 對(duì)任務(wù)二的數(shù)據(jù)進(jìn)行處理
dealTask2Data(lists);
// 任務(wù)二結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
countDownLatch.countDown();
return lists;
}
// 處理任務(wù)2數(shù)據(jù)
public static void dealTask2Data(List<String> lists) throws InterruptedException {
System.out.println("deal task2 data ...");
Thread.sleep(2000);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(2);
int cpuNum = Runtime.getRuntime().availableProcessors();
ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
executor.submit(new Task1(countDownLatch));
executor.submit(new Task2(countDownLatch));
// 等countDownLatch == 0時(shí),主線程結(jié)束 10s超時(shí),自動(dòng)結(jié)束,如果任務(wù)沒(méi)超過(guò)10s,也得等10s
// countDownLatch.await(10000, TimeUnit.MILLISECONDS);
// ===> 等到countDownLatch計(jì)數(shù)器為0,才往下執(zhí)行
countDownLatch.await();
System.out.println("task ending ...");
long time = System.currentTimeMillis() - start;
System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
}
}
執(zhí)行結(jié)果:
task1 starting ...
task2 starting ...
task1 ending ...
deal task1 data ...
task2 ending ...
deal task2 data ...
task ending ...
執(zhí)行任務(wù)所花的時(shí)間:7031s
結(jié)果分析:
將任務(wù)查詢到的數(shù)據(jù)處理放到每個(gè)線程里處理,然后利用CountDownLatch作為計(jì)數(shù)器,開(kāi)始給CountDownLatch設(shè)置任務(wù)數(shù),在每個(gè)線程執(zhí)行完畢之后,計(jì)數(shù)器減一,在===》標(biāo)注點(diǎn),主線程會(huì)等countDownLatch計(jì)數(shù)器為0的時(shí)候才會(huì)繼續(xù)往下執(zhí)行。因?yàn)樯厦娲a將數(shù)據(jù)處理放到了每個(gè)線程中,每個(gè)線程是并發(fā)執(zhí)行的,所以任務(wù)執(zhí)行時(shí)間是5+2=7s。
方式三:利用CyclicBarrier讓主線程等待子線程
public class TestCyclicBarrier {
// 任務(wù)一執(zhí)行2s
public static class Task1 implements Callable {
private CyclicBarrier cyclicBarrier;
public Task1(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public Object call() throws Exception {
System.out.println("task1 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task1");
Thread.sleep(2000);
System.out.println("task1 ending ...");
// 對(duì)任務(wù)一的數(shù)據(jù)進(jìn)行處理
dealTask1Data(lists);
// 任務(wù)一結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
cyclicBarrier.await();
return lists;
}
// 處理任務(wù)1數(shù)據(jù)
public void dealTask1Data(List<String> lists) throws InterruptedException {
System.out.println("deal task1 data ...");
Thread.sleep(1000);
}
}
// 任務(wù)一執(zhí)行2s
public static class Task2 implements Callable {
private CyclicBarrier cyclicBarrier;
public Task2(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
public Object call() throws Exception {
System.out.println("task2 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task2");
Thread.sleep(5000);
System.out.println("task2 ending ...");
// 對(duì)任務(wù)二的數(shù)據(jù)進(jìn)行處理
dealTask2Data(lists);
// 任務(wù)二結(jié)束 對(duì)countDownLatch計(jì)數(shù)器--
cyclicBarrier.await();
return lists;
}
// 處理任務(wù)2數(shù)據(jù)
public static void dealTask2Data(List<String> lists) throws InterruptedException {
System.out.println("deal task2 data ...");
Thread.sleep(2000);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException, BrokenBarrierException, TimeoutException {
long start = System.currentTimeMillis();
CyclicBarrier cyclicBarrier = new CyclicBarrier(3);
int cpuNum = Runtime.getRuntime().availableProcessors();
ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
executor.submit(new Task1(cyclicBarrier));
executor.submit(new Task2(cyclicBarrier));
// 等countDownLatch == 0時(shí),主線程結(jié)束 3s超時(shí),超時(shí)會(huì)報(bào)異常
// cyclicBarrier.await(3000, TimeUnit.MILLISECONDS);
// ===》
cyclicBarrier.await();
System.out.println("task ending ...");
long time = System.currentTimeMillis() - start;
System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
}
}
執(zhí)行結(jié)果:
task1 starting ...
task2 starting ...
task1 ending ...
deal task1 data ...
task2 ending ...
deal task2 data ...
task ending ...
執(zhí)行任務(wù)所花的時(shí)間:7022s
結(jié)果分析:
當(dāng)代碼執(zhí)行到===》標(biāo)注點(diǎn)的時(shí)候,cyclicBarrier.await()會(huì)看task1和task2的代碼是否也執(zhí)行到了cyclicBarrier.await(),如果有任務(wù)沒(méi)有執(zhí)行到,則會(huì)繼續(xù)等待,只有3個(gè)任務(wù)同時(shí)執(zhí)行到了cyclicBarrier.await()任務(wù)才會(huì)繼續(xù)往下執(zhí)行。
CountDownLatch與CyclicBarrier的區(qū)別
javadoc的解釋?zhuān)?/p>
- CountDownLatch:
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
一個(gè)線程(或者多個(gè)), 只有另外N個(gè)線程完成某個(gè)事情之后才能繼續(xù)往下執(zhí)行。(即只有計(jì)數(shù)器為0的時(shí)候,才能繼續(xù)往下執(zhí)行) - CyclicBarrier :
A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point.
N個(gè)線程相互等待,只有所有的線程都執(zhí)行到了barrier點(diǎn),所有線程才能繼續(xù)往下執(zhí)行,否則所有線程都必須等待。
方式四:利用CompletionService
public class TestCompletion {
// 任務(wù)一執(zhí)行2s
public static class Task1 implements Callable {
public Object call() throws Exception {
System.out.println("task1 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task1");
Thread.sleep(5000);
System.out.println("task1 ending ...");
return lists;
}
}
// 任務(wù)一執(zhí)行3s
public static class Task2 implements Callable {
public Object call() throws Exception {
System.out.println("task2 starting ...");
List<String> lists = new ArrayList<String>();
lists.add("task2");
Thread.sleep(3000);
System.out.println("task2 ending ...");
return lists;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException, BrokenBarrierException, TimeoutException {
long start = System.currentTimeMillis();
int cpuNum = Runtime.getRuntime().availableProcessors();
ExecutorService executor = new ThreadPoolExecutor(cpuNum, cpuNum * 2, 60000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
CompletionService<List<String>> completionService = new ExecutorCompletionService<List<String>>(executor);
completionService.submit(new Task1());
completionService.submit(new Task2());
// 能做到先返回任務(wù),結(jié)果就先輸出
try {
for (int i = 0; i < 2; i++) {
// Future<List<String>> result = completionService.take();
// System.out.println("hello world");
// System.out.println(result.get());
Future<List<String>> result2 = completionService.poll(5000, TimeUnit.MILLISECONDS);
System.out.println(result2.get());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
throw e;
}
System.out.println("task ending ...");
long time = System.currentTimeMillis() - start;
System.out.println("執(zhí)行任務(wù)所花的時(shí)間:" + time + "s");
}
}
CompletionService將Executor和BlockingQueue的功能融合在一起.可以將Callable任務(wù)提交給它來(lái)執(zhí)行,然后使用類(lèi)似與隊(duì)列操作的take和poll等方法來(lái)獲得已完成的結(jié)果,而這些結(jié)果會(huì)在完成時(shí)將被封裝為Future.ExecutorCompletionService實(shí)現(xiàn)了CompletionService,并將計(jì)算部分委托給一個(gè)Executor.
ExecutorCompletionService的實(shí)現(xiàn)非常簡(jiǎn)單.在構(gòu)造函數(shù)中創(chuàng)建一個(gè)BlockingQueue來(lái)保存計(jì)算完成的結(jié)果.當(dāng)計(jì)算完成時(shí),調(diào)用Future-Task中的done方法.當(dāng)提交某個(gè)任務(wù)時(shí),該任務(wù)將首先包裝為一個(gè)QueueingFuture,這是FutureTask的一個(gè)子類(lèi),然后再改寫(xiě)子類(lèi)的done方法,并將結(jié)果放入BlockingQueue中.take和poll方法委托給了BlockingQueue,這些方法會(huì)在得出結(jié)果之前阻塞.
private class QueueingFuture<V> extends FutureTask<V> {
QueueingFuture(Callable<V> c){super(c);}
QueueingFuture(Runnable t, V r) {super(t, r);}
protected void done() {
completionQueue.add(this);
}
}
結(jié)果:
task1 starting ...
task2 starting ...
task2 ending ...
[task2]
task1 ending ...
[task1]
task ending ...
執(zhí)行任務(wù)所花的時(shí)間:5015s
多個(gè)ExecutorCompletionservice可以共享一個(gè)Executor,因此可以創(chuàng)建一個(gè)特定計(jì)算私有,又能共享一個(gè)公共Executor的ExecutorCompletionService.因此,CompletionService的作用就相當(dāng)于一組計(jì)算的句柄.這與Future作為單個(gè)計(jì)算句柄是非常類(lèi)似的.通過(guò)記錄提交CompletionService的任務(wù)數(shù)量,并計(jì)算出已經(jīng)獲得的已完成結(jié)果的數(shù)量.通過(guò)記錄提交給CompletionService的任務(wù)數(shù)量,并計(jì)算出已經(jīng)獲得的已完成結(jié)果的數(shù)量,即使使用一個(gè)共享的Executor,也能知道已經(jīng)獲得了所有任務(wù)結(jié)果的時(shí)間.