背景
Java異步編程離不開Future接口,但是Future接口提供的方法使用起來不夠靈活。為了判斷一個Future是否已經(jīng)完成,我們可以:
- 通過調(diào)用
get方法,以阻塞的形式獲取執(zhí)行結(jié)果。 - 調(diào)用有最大等待時間的
get方法,以阻塞的形式獲取執(zhí)行結(jié)果。 - 反復(fù)輪詢
isDone方法,直到任務(wù)完成,獲取結(jié)果。
以上3種方法都不夠靈活,會造成線程阻塞或耗費CPU資源。需要用戶過多的參與異步編程邏輯,對業(yè)務(wù)代碼的侵入性較強。
另外用戶如果需要異步執(zhí)行多個任務(wù),并且這些任務(wù)具有先后依賴關(guān)系?;趥鹘y(tǒng)的方式我們需要大量使用鎖,CountDownLatch和CyclicBarrier和阻塞隊列等,編程十分復(fù)雜。
CompletableFuture是Future的增強版,提供了一系列的同步或異步任務(wù)執(zhí)行操作。除此之外還能夠?qū)Ξ惒饺蝿?wù)多個階段的前后依賴關(guān)系進行控制。使用起來十分方便。
注意事項
所有的以async結(jié)尾的方法都為異步執(zhí)行。它們都可以傳入一個Executor線程池,用于異步執(zhí)行。如果不指定線程池,默認使用ForkJoinPool.commonPool()
下面以所有的async異步方法為例,說明下CompletableFuture的使用方法。
靜態(tài)方法
supplyAsync
提供初始數(shù)據(jù)。接收一個Supplier類型參數(shù)。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 1);
completableFuture.thenApplyAsync((i) -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return i + 1;
// 這里打印出2
}).thenAcceptAsync((i) -> System.out.println(i));
runAsync
異步執(zhí)行任務(wù),接收Runnable類型參數(shù)。
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runAsync");
});
completedFuture
直接返回一個已經(jīng)完成的CompletableFuture。接收一個任意類型數(shù)據(jù)為參數(shù)。
CompletableFuture<Integer> completableFuture = CompletableFuture.completedFuture(500);
// 此處可以立刻返回500
System.out.println(completableFuture.get());
allOf
參數(shù)接收任意多個CompletableFuture。該方法返回一個新的CompletableFuture,只有在參數(shù)所有的CompletableFuture完成的時候它才會完成。
long start = System.currentTimeMillis();
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runAsync1");
});
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runAsync2");
});
// 等待兩個CompletableFuture都完成的時候,future才會完成
CompletableFuture<Void> future = CompletableFuture.allOf(completableFuture1, completableFuture2);
// 阻塞到CompletableFuture1完成
future.get();
// 耗時大于3000毫秒
System.out.println(System.currentTimeMillis() - start);
anyOf
和allOf一樣,返回一個新的CompletableFuture,參數(shù)中任意一個CompletableFuture完成時它就能完成。
long start = System.currentTimeMillis();
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runAsync1");
});
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runAsync2");
});
// 兩個CompletableFuture有任何一個完成,future就可以完成
CompletableFuture<Object> future = CompletableFuture.anyOf(completableFuture1, completableFuture2);
// 阻塞到CompletableFuture2完成
future.get();
// 耗時略大于1000毫秒
System.out.println(System.currentTimeMillis() - start);
普通方法
thenApplyAsync
進行數(shù)據(jù)處理,接收前一步驟傳遞的數(shù)據(jù),處理加工后返回。返回數(shù)據(jù)類型可以和前一步驟返回的數(shù)據(jù)類型不同。
接收參數(shù)為Function類型。
CompletableFuture.supplyAsync(() -> 1).thenApplyAsync((i) -> "Hi: " + i).whenCompleteAsync(((s, throwable) -> {
// 返回 Hi: 1
System.out.println(s);
}));
thenAcceptAsync
接收上游傳遞過來的數(shù)據(jù)并消費。接收一個Consumer類型參數(shù)。
CompletableFuture.supplyAsync(() -> 1).thenAcceptAsync(System.out::println);
acceptEitherAsync
接收另一個CompletableFuture和一個Consumer。含義為2個CompletableFuture哪個先運行完成,就采用誰的執(zhí)行結(jié)果。
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}).acceptEitherAsync(completableFuture1, integer -> {
// completableFuture1最早返回,所以integer的值為1
System.out.println(integer);
});
這個例子中第一個CompletableFuture比第二個先執(zhí)行完畢,因此acceptEitherAsync輸出第一個CompletableFuture的結(jié)果。
applyToEitherAsync
接收一個CompletableFuture和一個Function。和acceptEitherAsync類似,只不過第二個參數(shù)從Consumer變成Function。
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}).applyToEitherAsync(completableFuture1, (integer -> {
// 這里CompletableFuture1首先完成,所以integer為1
return integer;
})).whenCompleteAsync((integer, throwable) -> {
System.out.println(integer);
});
thenAcceptBothAsync
接收另一個CompletableFuture和BiConsumer,用于在兩個CompletableFuture都執(zhí)行完的時候,獲取他們的執(zhí)行結(jié)果并處理。
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}).thenAcceptBothAsync(completableFuture1, (integer, integer2) -> {
// 等到兩個CompletableFuture都完成之后回調(diào)
// 在這個例子中需要等待5秒鐘
回調(diào)的兩個參數(shù)分別對應(yīng)兩個CompletableFuture的執(zhí)行結(jié)果
System.out.println(integer + integer2);
});
runAfterEitherAsync
接收另一個CompletableFuture和Runnable類型參數(shù)。兩個CompletableFuture有任意一個完成的時候,執(zhí)行Runnable。
long start = System.currentTimeMillis();
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}).runAfterEitherAsync(completableFuture1, () -> {
// 任何一個CompletableFuture完成的時候都會執(zhí)行Runnable
// 這個例子等待1秒后會執(zhí)行Runnable,此處打印的值略大于1000
System.out.println(System.currentTimeMillis() - start);
});
runAfterBothAsync
接收另一個CompletableFuture和Runnable類型參數(shù)。兩個CompletableFuture都執(zhí)行完成的時候,執(zhí)行Runnable。
long start = System.currentTimeMillis();
CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}).runAfterBothAsync(completableFuture1, () -> {
// 和runAfterEitherAsync不同的是,這里必須等到兩個CompletableFuture都完成的時候才會執(zhí)行Runnable
// 這里需要等待5秒鐘后才會執(zhí)行,打印的值略大于5000
System.out.println(System.currentTimeMillis() - start);
});
complete
接收任意類型數(shù)據(jù)作為參數(shù)。如果CompletableFuture沒有完成的時候調(diào)用complete()方法,后續(xù)再調(diào)用這個future的get()方法時返回這個值。
// 睡眠2秒再返回結(jié)果
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
// 注意,此處如果睡眠3秒,調(diào)用complete時completableFuture已經(jīng)執(zhí)行完畢返回1,complete方法不會修改返回值。此時調(diào)用get方法返回1
// 如果此處沒有睡眠3秒,調(diào)用complete時completableFuture尚未執(zhí)行完畢,下面調(diào)用get的時候方法返回1000
Thread.sleep(3000);
completableFuture.complete(1000);
Integer integer = completableFuture.get();
System.out.println(integer);
completeExceptionally
接收一個Throwable類型參數(shù)。如果CompletableFuture沒有完成的時候調(diào)用completeExceptionally()方法,后續(xù)再調(diào)用這個future的get()方法時會拋出這個異常。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
// 如果調(diào)用completeExceptionally的時候completableFuture沒有執(zhí)行完成,那么下面調(diào)用get的時候會拋出IllegalArgumentException
Thread.sleep(3000);
completableFuture.completeExceptionally(new IllegalArgumentException());
Integer integer = completableFuture.get();
System.out.println(integer);
cancel
相當于completeExceptionally(new CancellationException()),不再贅述。接收的參數(shù)對行為沒有影響。
如果CompletableFuture沒有執(zhí)行完成的時候調(diào)用了cancel,cancel方法返回true。
如果CompletableFuture執(zhí)行完成的時候調(diào)用了cancel,cancel方法返回false。
exceptionally
設(shè)置當CompletableFuture執(zhí)行拋出異常時候的返回值。用于處理異常情況。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
// 拋出一個異常
if (true) {
throw new RuntimeException("Error Message");
}
return 1;
});
// 如果遇到異常,返回500
completableFuture.exceptionally((t) -> 500)
.whenCompleteAsync(((integer, throwable) -> {
// 打印出500
System.out.println(integer);
// 異常已被處理,返回null
System.out.println(throwable.getMessage());
}));
handleAsync
相比exceptionally更為復(fù)雜的處理exception方式。接收一個BiFunction類型的參數(shù)。
handleAsync也可以視為回調(diào)方法具有返回值的whenCompleteAsync。
注意和exceptionally的區(qū)別,handleAsync沒有自動發(fā)現(xiàn)是否拋出exception的能力,需要手工編寫相關(guān)邏輯。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
if (true) {
throw new RuntimeException("Error Message");
}
return 1;
});
completableFuture
.handleAsync((integer, throwable) -> {
System.out.println(integer);
System.out.println(throwable);
return null == throwable ? 1 : 500;
})
.whenCompleteAsync(((integer, throwable) -> {
System.out.println(integer);
System.out.println(throwable.getMessage());
}));
whenCompleteAsync
完成時異步回調(diào),接收一個BiConsumer<? super T, ? super Throwable>類型參數(shù)。
例子在前面的代碼中已有體現(xiàn),不再贅述。
thenCombineAsync
含義相當于"thenApplyBothAsync",但是JDK不叫這個名字。
接收另一個CompletableFuture和BiFunction作為參數(shù)。目的為等待兩個CompletableFuture都完成的時候,執(zhí)行BiFunction對兩個CompletableFuture的返回值進行處理。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
});
CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
});
completableFuture.thenCombineAsync(completableFuture2, (integer, integer2) -> {
// 等兩個CompletableFuture都執(zhí)行完畢后,返回他們兩個返回值的和
return integer + integer2;
}).whenCompleteAsync(((integer, throwable) -> {
System.out.println(integer);
}));
thenComposeAsync
將兩個CompletableFuture組合起來。接收的參數(shù)類型為Function。
CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> {
return 1;
});
completableFuture.thenComposeAsync(integer -> {
// 這里返回另一個CompletableFuture,該CompletableFuture可以使用前面CompletableFuture的返回值進行計算
return CompletableFuture.supplyAsync(() -> {
return 10 + integer;
});
}).whenCompleteAsync(((integer, throwable) -> {
// 返回11
System.out.println(integer);
}));