異步執(zhí)行方法回調(diào)的設(shè)計(jì)模式:異步方法調(diào)用是在等待任務(wù)結(jié)果時(shí)不阻塞調(diào)用線程的模式。該模式提供了多個(gè)獨(dú)立的任務(wù)并行處理和取得任務(wù)結(jié)果或者等待所有任務(wù)結(jié)束。
-
總覽圖如下
image.png - 下面為代碼示例,首先是執(zhí)行器接口
/**
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: AsyncExecutor.java
* @Description: 執(zhí)行器executor的三個(gè)關(guān)聯(lián)的對(duì)象,1:傳入的參數(shù)線程task,2:傳入的保存結(jié)果狀態(tài)的callback
* 3:返回值result。它也是整個(gè)模式的核心部分
* @version: v1.0.0
*/
public interface AsyncExecutor {
// 開始執(zhí)行任務(wù),未持有callback則說(shuō)明客戶端不需要對(duì)返回結(jié)果做額外判斷。返回異步結(jié)果
<T> AsyncResult<T> startProcess(Callable<T> task);
// 開始執(zhí)行任務(wù),持有callback則說(shuō)明客戶端自定義實(shí)現(xiàn)額外判斷。返回異步結(jié)果
<T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);
// 結(jié)束異步任務(wù),如果必要時(shí)阻塞當(dāng)前的線程并返回結(jié)果結(jié)束任務(wù)
<T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
}
- 異步執(zhí)行返回結(jié)果接口
/**
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: AsyncResult.java
* @Description: executor執(zhí)行器執(zhí)行的返回結(jié)果。它應(yīng)該提供執(zhí)行狀態(tài)、任務(wù)返回值、結(jié)果掛起
* @version: v1.0.0
*/
public interface AsyncResult<T> {
// 線程任務(wù)是否完成
boolean isCompleted();
// 獲取任務(wù)的返回值
T getValue() throws ExecutionException;
// 阻塞當(dāng)前線程,直到異步任務(wù)完成,如果執(zhí)行中斷,拋出異常
void await() throws InterruptedException;
}
- 保存執(zhí)行器executor執(zhí)行結(jié)果(task任務(wù)狀態(tài),返回值),客戶端可以進(jìn)行自定義處理
/**
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: AsynCallback.java
* @Description: 保存執(zhí)行器executor執(zhí)行結(jié)果(task任務(wù)狀態(tài),返回值),可以由客戶端進(jìn)行自定義處理
* @version: v1.0.0
*/
public interface AsynCallback<T> {
//客戶端實(shí)現(xiàn),對(duì)executor執(zhí)行結(jié)果后做自定義處理
void onComplete(T val,Optional<Exception> ex);
}
- 執(zhí)行器的具體實(shí)現(xiàn)
/**
* Copyright: Copyright (c) 2017 LanRu-Caifu
* @author xzg
* 2017年9月8日
* @ClassName: ThreadAsyncExecutor.java
* @Description:
* @version: v1.0.0
*/
public class ThreadAsyncExecutor implements AsyncExecutor {
// 為區(qū)別線程,為每個(gè)線程命名
private final AtomicInteger idx = new AtomicInteger(0);
@Override
public <T> AsyncResult<T> startProcess(Callable<T> task) {
return startProcess(task, null);
}
@Override
public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
// CompletableResult作為executor的返回結(jié)果,它會(huì)對(duì)callback傳遞參數(shù)讓callback自行處理
CompletableResult<T> result = new CompletableResult<>(callback);
// 啟動(dòng)一個(gè)線程去處理任務(wù)線程,并將任務(wù)線程的返回結(jié)果設(shè)置到result中
new Thread(() -> {
try {
result.setValue(task.call());
} catch (Exception ex) {
result.setException(ex);
}
} , "executor-" + idx.incrementAndGet()).start();
return result;
}
// 結(jié)束任務(wù),如果當(dāng)前任務(wù)沒(méi)有完成則讓出cpu讓其他任務(wù)使用。如果執(zhí)行結(jié)束返回結(jié)果
@Override
public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
if (!asyncResult.isCompleted()) {
asyncResult.await();
}
return asyncResult.getValue();
}
/**
* Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
* exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
*
* @see java.util.concurrent.FutureTask
* @see java.util.concurrent.CompletableFuture
*/
// 執(zhí)行器executor的三個(gè)關(guān)聯(lián)的對(duì)象,1:傳入的參數(shù)線程task,2:傳入的保存結(jié)果狀態(tài)的callback,3:返回值result
// 異步執(zhí)行的結(jié)果封裝,持有callback對(duì)象(該對(duì)象可由客戶端重寫),這里是將執(zhí)行的結(jié)果保存到callback中的value|exception
private static class CompletableResult<T> implements AsyncResult<T> {
// 幾種執(zhí)行的狀態(tài)
static final int RUNNING = 1;
static final int FAILED = 2;
static final int COMPLETED = 3;
// 對(duì)象鎖
final Object lock;
// Optional封裝callback
final Optional<AsyncCallback<T>> callback;
// 初始狀態(tài)
volatile int state = RUNNING;
// 執(zhí)行結(jié)果
T value;
// 執(zhí)行異常情況
Exception exception;
CompletableResult(AsyncCallback<T> callback) {
this.lock = new Object();
this.callback = Optional.ofNullable(callback);
}
/**
* Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
* completion.
* 封裝任務(wù)的返回結(jié)果
* @param value
* value of the evaluated task
*/
void setValue(T value) {
this.value = value;
this.state = COMPLETED;
this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
synchronized (lock) {
lock.notifyAll();
}
}
/**
* Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
* completion.
* 設(shè)置異常
* @param exception
* exception of the failed task
*/
void setException(Exception exception) {
this.exception = exception;
this.state = FAILED;
this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
synchronized (lock) {
lock.notifyAll();
}
}
// 是否運(yùn)行狀態(tài)
@Override
public boolean isCompleted() {
return state > RUNNING;
}
// 取得任務(wù)結(jié)果
@Override
public T getValue() throws ExecutionException {
if (state == COMPLETED) {
return value;
} else if (state == FAILED) {
throw new ExecutionException(exception);
} else {
throw new IllegalStateException("Execution not completed yet");
}
}
// 未完成時(shí)不參與競(jìng)爭(zhēng)
@Override
public void await() throws InterruptedException {
synchronized (lock) {
while (!isCompleted()) {
lock.wait();
}
}
}
}
}
- 測(cè)試部分
public class App {
public static void main(String[] args) throws Exception {
// 新建一個(gè)executor執(zhí)行器
AsyncExecutor executor = new ThreadAsyncExecutor();
// 開始執(zhí)行一些任務(wù)
AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));
// emulate processing in the current thread while async tasks are running in their own threads
Thread.sleep(350); // Oh boy I'm working hard here
log("Some hard work done");
// wait for completion of the tasks
Integer result1 = executor.endProcess(asyncResult1);
String result2 = executor.endProcess(asyncResult2);
Long result3 = executor.endProcess(asyncResult3);
// 下面的執(zhí)行結(jié)果掛起
asyncResult4.await();
asyncResult5.await();
// 打印線程結(jié)果
log("Result 1: " + result1);
log("Result 2: " + result2);
log("Result 3: " + result3);
}
/**
* Creates a callable that lazily evaluates to given value with artificial delay.
* 創(chuàng)建一個(gè)任務(wù)
* @param value
* value to evaluate
* @param delayMillis
* artificial delay in milliseconds
* @return new callable for lazy evaluation
*/
private static <T> Callable<T> lazyval(T value, long delayMillis) {
return () -> {
Thread.sleep(delayMillis);
log("Task completed with: " + value);
return value;
};
}
/**
* 客戶端自定義callback
*/
private static <T> AsyncCallback<T> callback(String name) {
// 返回一個(gè)callback重寫 void onComplete(T value, Optional<Exception> ex)的實(shí)現(xiàn)類對(duì)象
return (value, ex) -> {
if (ex.isPresent()) {
log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
} else {
log(name + ": " + value);
}
};
}
// 日志方法
private static void log(String msg) {
System.out.println(msg);
}
}
Task completed with: test
Some hard work done
Task completed with: 20
Callback result 4: 20
Task completed with: 10
Task completed with: callback
Callback result 5: callback
Task completed with: 50
Result 1: 10
Result 2: test
Result 3: 50
