異步任務(wù)執(zhí)行的設(shè)計(jì)模式

參考:java的設(shè)計(jì)模式

異步執(zhí)行方法回調(diào)的設(shè)計(jì)模式:異步方法調(diào)用是在等待任務(wù)結(jié)果時(shí)不阻塞調(diào)用線程的模式。該模式提供了多個(gè)獨(dú)立的任務(wù)并行處理和取得任務(wù)結(jié)果或者等待所有任務(wù)結(jié)束。

  • 總覽圖如下


    image.png
  • 下面為代碼示例,首先是執(zhí)行器接口
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncExecutor.java
 * @Description: 執(zhí)行器executor的三個(gè)關(guān)聯(lián)的對(duì)象,1:傳入的參數(shù)線程task,2:傳入的保存結(jié)果狀態(tài)的callback
 * 3:返回值result。它也是整個(gè)模式的核心部分
 * @version: v1.0.0
 */
public interface AsyncExecutor {

//   開始執(zhí)行任務(wù),未持有callback則說(shuō)明客戶端不需要對(duì)返回結(jié)果做額外判斷。返回異步結(jié)果
      <T> AsyncResult<T> startProcess(Callable<T> task);

//       開始執(zhí)行任務(wù),持有callback則說(shuō)明客戶端自定義實(shí)現(xiàn)額外判斷。返回異步結(jié)果
      <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback);

//    結(jié)束異步任務(wù),如果必要時(shí)阻塞當(dāng)前的線程并返回結(jié)果結(jié)束任務(wù)
      <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException;
}
  • 異步執(zhí)行返回結(jié)果接口
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsyncResult.java
 * @Description: executor執(zhí)行器執(zhí)行的返回結(jié)果。它應(yīng)該提供執(zhí)行狀態(tài)、任務(wù)返回值、結(jié)果掛起
 * @version: v1.0.0
 */
public interface AsyncResult<T> {

//  線程任務(wù)是否完成
    boolean isCompleted();
//  獲取任務(wù)的返回值
    T getValue() throws ExecutionException;

//    阻塞當(dāng)前線程,直到異步任務(wù)完成,如果執(zhí)行中斷,拋出異常
    void await() throws InterruptedException;
}
  • 保存執(zhí)行器executor執(zhí)行結(jié)果(task任務(wù)狀態(tài),返回值),客戶端可以進(jìn)行自定義處理
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: AsynCallback.java
 * @Description: 保存執(zhí)行器executor執(zhí)行結(jié)果(task任務(wù)狀態(tài),返回值),可以由客戶端進(jìn)行自定義處理
 * @version: v1.0.0
 */
public interface AsynCallback<T> {

    //客戶端實(shí)現(xiàn),對(duì)executor執(zhí)行結(jié)果后做自定義處理
    void onComplete(T val,Optional<Exception> ex);
}
  • 執(zhí)行器的具體實(shí)現(xiàn)
/**
 * Copyright: Copyright (c) 2017 LanRu-Caifu
 * @author xzg
 * 2017年9月8日
 * @ClassName: ThreadAsyncExecutor.java
 * @Description: 
 * @version: v1.0.0
 */
public class ThreadAsyncExecutor implements AsyncExecutor {

//      為區(qū)別線程,為每個(gè)線程命名
      private final AtomicInteger idx = new AtomicInteger(0);

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task) {
        return startProcess(task, null);
      }

      @Override
      public <T> AsyncResult<T> startProcess(Callable<T> task, AsyncCallback<T> callback) {
//      CompletableResult作為executor的返回結(jié)果,它會(huì)對(duì)callback傳遞參數(shù)讓callback自行處理
        CompletableResult<T> result = new CompletableResult<>(callback);
//      啟動(dòng)一個(gè)線程去處理任務(wù)線程,并將任務(wù)線程的返回結(jié)果設(shè)置到result中
        new Thread(() -> {
          try {
            result.setValue(task.call());
          } catch (Exception ex) {
            result.setException(ex);
          }
        } , "executor-" + idx.incrementAndGet()).start();
        return result;
      }
//      結(jié)束任務(wù),如果當(dāng)前任務(wù)沒(méi)有完成則讓出cpu讓其他任務(wù)使用。如果執(zhí)行結(jié)束返回結(jié)果
      @Override
      public <T> T endProcess(AsyncResult<T> asyncResult) throws ExecutionException, InterruptedException {
        if (!asyncResult.isCompleted()) {
          asyncResult.await();
        }
        return asyncResult.getValue();
      }

      /**
       * Simple implementation of async result that allows completing it successfully with a value or exceptionally with an
       * exception. A really simplified version from its real life cousins FutureTask and CompletableFuture.
       *
       * @see java.util.concurrent.FutureTask
       * @see java.util.concurrent.CompletableFuture
       */
//   執(zhí)行器executor的三個(gè)關(guān)聯(lián)的對(duì)象,1:傳入的參數(shù)線程task,2:傳入的保存結(jié)果狀態(tài)的callback,3:返回值result
//    異步執(zhí)行的結(jié)果封裝,持有callback對(duì)象(該對(duì)象可由客戶端重寫),這里是將執(zhí)行的結(jié)果保存到callback中的value|exception
      private static class CompletableResult<T> implements AsyncResult<T> {
//      幾種執(zhí)行的狀態(tài)
        static final int RUNNING = 1;
        static final int FAILED = 2;
        static final int COMPLETED = 3;
//      對(duì)象鎖
        final Object lock;
//      Optional封裝callback
        final Optional<AsyncCallback<T>> callback;
//      初始狀態(tài)
        volatile int state = RUNNING;
//             執(zhí)行結(jié)果
        T value;
//             執(zhí)行異常情況
        Exception exception;

        CompletableResult(AsyncCallback<T> callback) {
          this.lock = new Object();
          this.callback = Optional.ofNullable(callback);
        }

        /**
         * Sets the value from successful execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 封裝任務(wù)的返回結(jié)果
         * @param value
         *          value of the evaluated task
         */
        void setValue(T value) {
          this.value = value;
          this.state = COMPLETED;
          this.callback.ifPresent(ac -> ac.onComplete(value, Optional.<Exception>empty()));
          synchronized (lock) {
            lock.notifyAll();
          }
        }

        /**
         * Sets the exception from failed execution and executes callback if available. Notifies any thread waiting for
         * completion.
         * 設(shè)置異常
         * @param exception
         *          exception of the failed task
         */
        void setException(Exception exception) {
          this.exception = exception;
          this.state = FAILED;
          this.callback.ifPresent(ac -> ac.onComplete(null, Optional.of(exception)));
          synchronized (lock) {
            lock.notifyAll();
          }
        }
//      是否運(yùn)行狀態(tài)
        @Override
        public boolean isCompleted() {
          return state > RUNNING;
        }
//      取得任務(wù)結(jié)果
        @Override
        public T getValue() throws ExecutionException {
          if (state == COMPLETED) {
            return value;
          } else if (state == FAILED) {
            throw new ExecutionException(exception);
          } else {
            throw new IllegalStateException("Execution not completed yet");
          }
        }
//      未完成時(shí)不參與競(jìng)爭(zhēng)
        @Override
        public void await() throws InterruptedException {
          synchronized (lock) {
            while (!isCompleted()) {
              lock.wait();
            }
          }
        }
      }
    }
  • 測(cè)試部分
public class App {
      public static void main(String[] args) throws Exception {
        // 新建一個(gè)executor執(zhí)行器
        AsyncExecutor executor = new ThreadAsyncExecutor();

        // 開始執(zhí)行一些任務(wù)
        AsyncResult<Integer> asyncResult1 = executor.startProcess(lazyval(10, 500));
        AsyncResult<String> asyncResult2 = executor.startProcess(lazyval("test", 300));
        AsyncResult<Long> asyncResult3 = executor.startProcess(lazyval(50L, 700));
        AsyncResult<Integer> asyncResult4 = executor.startProcess(lazyval(20, 400), callback("Callback result 4"));
        AsyncResult<String> asyncResult5 = executor.startProcess(lazyval("callback", 600), callback("Callback result 5"));

        // emulate processing in the current thread while async tasks are running in their own threads
        Thread.sleep(350); // Oh boy I'm working hard here
        log("Some hard work done");

        // wait for completion of the tasks
        Integer result1 = executor.endProcess(asyncResult1);
        String result2 = executor.endProcess(asyncResult2);
        Long result3 = executor.endProcess(asyncResult3);
//      下面的執(zhí)行結(jié)果掛起
        asyncResult4.await();
        asyncResult5.await();

        // 打印線程結(jié)果
        log("Result 1: " + result1);
        log("Result 2: " + result2);
        log("Result 3: " + result3);
      }

      /**
       * Creates a callable that lazily evaluates to given value with artificial delay.
       * 創(chuàng)建一個(gè)任務(wù)
       * @param value
       *          value to evaluate
       * @param delayMillis
       *          artificial delay in milliseconds
       * @return new callable for lazy evaluation
       */
      private static <T> Callable<T> lazyval(T value, long delayMillis) {
        return () -> {
          Thread.sleep(delayMillis);
          log("Task completed with: " + value);
          return value;
        };
      }

      /**
       * 客戶端自定義callback
       */
      private static <T> AsyncCallback<T> callback(String name) {
//        返回一個(gè)callback重寫 void onComplete(T value, Optional<Exception> ex)的實(shí)現(xiàn)類對(duì)象
        return (value, ex) -> {
          if (ex.isPresent()) {
            log(name + " failed: " + ex.map(Exception::getMessage).orElse(""));
          } else {
            log(name + ": " + value);
          }
        };
      }
//      日志方法
      private static void log(String msg) {
        System.out.println(msg);
      }
    }
Task completed with: test
Some hard work done
Task completed with: 20
Callback result 4: 20
Task completed with: 10
Task completed with: callback
Callback result 5: callback
Task completed with: 50
Result 1: 10
Result 2: test
Result 3: 50
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容