并發(fā)系列之 Future 框架詳解

本文將主要講解 J.U.C 中的 Future 框架,并分析結(jié)合源碼分析其內(nèi)部結(jié)構(gòu)邏輯;

一、Future 框架概述

JDK 中的 Future 框架實(shí)際就是 Future 模式的實(shí)現(xiàn),通常情況下我們會(huì)配合線程池使用,但也可以單獨(dú)使用;下面我們就單獨(dú)使用簡單舉例;

1. 應(yīng)用實(shí)例

FutureTask future =newFutureTask<>(() -> {? log.info("異步任務(wù)執(zhí)行...");? Thread.sleep(2000);? log.info("過了很久很久...");return"異步任務(wù)完成";});log.info("啟動(dòng)異步任務(wù)...");newThread(future).start();log.info("繼續(xù)其他任務(wù)...");Thread.sleep(1000);log.info("獲取異步任務(wù)結(jié)果:{}", future.get());

打?。?/p>

[15:38:03,231INFO ] [main]? ? - 啟動(dòng)異步任務(wù)...

[15:38:03,231INFO ] [main]? ? - 繼續(xù)其他任務(wù)...

[15:38:03,231INFO ] [Thread-0] - 異步任務(wù)執(zhí)行...

[15:38:05,232INFO ] [Thread-0] - 過了很久很久...

[15:38:05,236INFO ] [main]? ? - 獲取異步任務(wù)結(jié)果:異步任務(wù)完成

如上面代碼所示,首先我們將要執(zhí)行的任務(wù)包裝成?Callable,這里如果不需要返回值也可以使用?Runnable;然后構(gòu)建?FutureTask?由一個(gè)線程啟動(dòng),最后使用?Future.get()?獲取異步任務(wù)結(jié)果;

2. Future 運(yùn)行邏輯

對于 Future 模式的流程圖如下:

對比上面的實(shí)例代碼,大家可能會(huì)發(fā)現(xiàn)有些不一樣,因?yàn)樵?FutureTask 同時(shí)繼承了 Runnable 和 Future 接口,所以再提交任務(wù)后沒有返回Future,而是直接使用自身調(diào)用 get;下面我們就對源碼進(jìn)行實(shí)際分析;

二、源碼分析

1. FutureTask 主體結(jié)構(gòu)

publicinterface RunnableFuture<V> extends Runnable, Future<V> {}publicclass FutureTask<V> implements RunnableFuture<V> {privatevolatileintstate;// 任務(wù)運(yùn)行狀態(tài)privateCallable callable;// 異步任務(wù)privateObject outcome;// 返回結(jié)果privatevolatileThread runner;// 異步任務(wù)執(zhí)行線程privatevolatileWaitNode waiters;// 等待異步結(jié)果的線程棧(通過Treiber stack算法實(shí)現(xiàn))public FutureTask(Callable<V> callable) {// 需要返回值if(callable ==null)thrownewNullPointerException();this.callable = callable;this.state = NEW;// ensure visibility of callable}public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;// ensure visibility of callable}? ...}

另外在代碼中還可以看見有很多地方都是用了?CAS?來更新變量,而 JDK1.6 中甚至使用了?AQS?來實(shí)現(xiàn);其原因就是同一個(gè)?FutureTask?可以多個(gè)線程同時(shí)提交,也可以多個(gè)線程同時(shí)獲??;?所以代碼中有很多的狀態(tài)變量:

// FutureTask.state 取值privatestaticfinalintNEW? ? ? ? ? =0;// 初始化到結(jié)果返回前privatestaticfinalintCOMPLETING? =1;// 結(jié)果賦值privatestaticfinalintNORMAL? ? ? =2;// 執(zhí)行完畢privatestaticfinalintEXCEPTIONAL? =3;// 執(zhí)行異常privatestaticfinalintCANCELLED? ? =4;// 任務(wù)取消privatestaticfinalintINTERRUPTING =5;// 設(shè)置中斷狀態(tài)privatestaticfinalintINTERRUPTED? =6;// 任務(wù)中斷

同時(shí)源碼的注釋中也詳細(xì)給出了可能出現(xiàn)的狀態(tài)轉(zhuǎn)換:

NEW -> COMPLETING -> NORMAL // 任務(wù)正常執(zhí)行

NEW -> COMPLETING -> EXCEPTION // 任務(wù)執(zhí)行異常

NEW ->CANCELLED // 任務(wù)取消

NEW -> INITERRUPTING -> INTERRUPTED // 任務(wù)中斷

注意這里的?COMPLETING?狀態(tài)是一個(gè)很微妙的狀態(tài),正因?yàn)橛兴拇嬖诓拍軐?shí)現(xiàn)無鎖賦值;大家先留意這個(gè)狀態(tài),然后在代碼中應(yīng)該能體會(huì)到;另外這里還有一個(gè)變量需要注意,WaitNode?;使用?Treiber stack?算法實(shí)現(xiàn)的無鎖棧;其原理說明可以參考下面第三節(jié);

2. 任務(wù)執(zhí)行

public void run() {if(state != NEW ||// 確保任務(wù)執(zhí)行完成后,不再重復(fù)執(zhí)行!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))// 確保只有一個(gè)線程執(zhí)行return;try{? ? Callable c = callable;if(c !=null&& state == NEW) {? ? ? V result;booleanran;try{? ? ? ? result = c.call();? ? ? ? ran =true;? ? ? }catch(Throwable ex) {? ? ? ? result =null;? ? ? ? ran =false;? ? ? ? setException(ex);// 設(shè)置異常結(jié)果}if(ran) set(result);// 設(shè)置結(jié)果}? }finally{? ? runner =null;ints = state;if(s >= INTERRUPTING) handlePossibleCancellationInterrupt(s);// 確保中斷狀態(tài)已經(jīng)設(shè)置}}

// 設(shè)置異步任務(wù)結(jié)果protected void set(V v) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保證結(jié)果只能設(shè)置一次outcome = v;? ? UNSAFE.putOrderedInt(this, stateOffset, NORMAL);// final statefinishCompletion();// 喚醒等待線程}}

protected void setException(Throwable t) {if(UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {// 保證結(jié)果只能設(shè)置一次outcome = t;? ? UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL);// final statefinishCompletion();? }}

3. 任務(wù)取消

public boolean cancel(boolean mayInterruptIfRunning) {if(!(state == NEW &&// 只有在任務(wù)執(zhí)行階段才能取消UNSAFE.compareAndSwapInt(this, stateOffset, NEW,// 設(shè)置取消狀態(tài)mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))returnfalse;try{// in case call to interrupt throws exceptionif(mayInterruptIfRunning) {try{? ? ? ? Thread t = runner;if(t !=null)? ? ? ? ? t.interrupt();? ? ? }finally{// final stateUNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);? ? ? }? ? }? }finally{? ? finishCompletion();? }returntrue;}

注意?cancel(false)?也就是僅取消,并沒有打斷;異步任務(wù)會(huì)繼續(xù)執(zhí)行,只是這里首先設(shè)置了?FutureTask.state = CANCELLED?,所以最后在設(shè)置結(jié)果的時(shí)候會(huì)失敗,UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)?;

4. 獲取結(jié)果

public V get() throws InterruptedException, ExecutionException {ints = state;if(s <= COMPLETING)? ? s = awaitDone(false,0L);// 阻塞等待returnreport(s);}private V report(int s) throws ExecutionException {// 根據(jù)最后的狀態(tài)返回結(jié)果Object x = outcome;if(s == NORMAL)return(V)x;if(s >= CANCELLED)thrownewCancellationException();thrownewExecutionException((Throwable)x);}

private int awaitDone(boolean timed, long nanos)? throws InterruptedException {finallongdeadline = timed ? System.nanoTime() + nanos :0L;? WaitNode q =null;booleanqueued =false;for(;;) {if(Thread.interrupted()) {? ? ? removeWaiter(q);// 移除等待節(jié)點(diǎn)thrownewInterruptedException();? ? }ints = state;if(s > COMPLETING) {// 任務(wù)已完成if(q !=null)? ? ? ? q.thread =null;returns;? ? }elseif(s == COMPLETING)// 正在賦值,直接先出讓線程Thread.yield();elseif(q ==null)// 任務(wù)還未完成需要等待q =newWaitNode();elseif(!queued)? ? ? queued = UNSAFE.compareAndSwapObject(this, waitersOffset,? ? ? ? ? ? ? ? ? ? ? ? q.next = waiters, q);// 使用 Treiber stack 算法elseif(timed) {? ? ? nanos = deadline - System.nanoTime();if(nanos <=0L) {? ? ? ? removeWaiter(q);returnstate;? ? ? }? ? ? LockSupport.parkNanos(this, nanos);? ? }elseLockSupport.park(this);? }}

三、Treiber stack

在《Java 并發(fā)編程實(shí)戰(zhàn)》中講了,?創(chuàng)建非阻塞算法的關(guān)鍵在于,找出如何將原子修改的范圍縮小到單個(gè)變量上,同時(shí)還要維護(hù)數(shù)據(jù)的一致性 。

@ThreadSafepublicclass ConcurrentStack <E> {? AtomicReference> top =newAtomicReference<>();privatestaticclass Node <E> {publicfinalE item;publicNode next;public Node(E item) {this.item = item;? ? }? }public void push(E item) {? ? Node newHead =newNode<>(item);? ? Node oldHead;do{? ? ? oldHead = top.get();? ? ? newHead.next = oldHead;? ? }while(!top.compareAndSet(oldHead, newHead));? }public E pop() {? ? Node oldHead;? ? Node newHead;do{? ? ? oldHead = top.get();if(oldHead ==null)returnnull;? ? ? newHead = oldHead.next;? ? }while(!top.compareAndSet(oldHead, newHead));returnoldHead.item;? }}

總結(jié)

總體來講源碼比較簡單,因?yàn)槠浔旧碇皇且粋€(gè) Future 模式的實(shí)現(xiàn)

但是其中的狀態(tài)量的設(shè)置,還有里面很多無鎖的處理方式,才是 FutureTask 帶給我們的精華!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容