在流式處理的過程中, 在中間步驟的處理中, 如果涉及到一些費(fèi)事的操作或者是外部系統(tǒng)的數(shù)據(jù)交互, 那么就會(huì)給整個(gè)流造成一定的延遲. 在 flink 的 1.2 版本中引入了 Asynchronous I/O, 能夠支持異步的操作, 以提高 flink 系統(tǒng)與外部數(shù)據(jù)系統(tǒng)交互的性能及吞吐量.
在使用 Flink 的異步 IO 時(shí), 主要有兩個(gè) API可以使用, 一個(gè)是AsyncDataStream.unorderedWait( ), 另一個(gè)AsyncDataStream.orderedWait( ).在異步處理過程中,原本數(shù)據(jù)的順序可能會(huì)發(fā)生變化, 使用unorderWait的方法, 不會(huì)考慮順序的問題, 一旦處理完成就會(huì)直接返回結(jié)果, 這種方法具有較低的延遲和負(fù)載. 那么orderWait的方法就是想對(duì)應(yīng)的, 嚴(yán)格按照原本流中的數(shù)據(jù)順序做返回, 會(huì)對(duì)系統(tǒng)造成一定的延遲. 實(shí)際中應(yīng)該根據(jù)具體的業(yè)務(wù)情況做選擇.unorderedWait或orderedWait有兩個(gè)關(guān)于async operation的參數(shù),一個(gè)是timeout參數(shù)用于設(shè)置async的超時(shí)時(shí)間,一個(gè)是capacity參數(shù)用于指定同一時(shí)刻最大允許多少個(gè)(并發(fā))async request在執(zhí)行;
在使用異步IO時(shí),需要自己去繼承AsyncFunction,AsyncFunction接口繼承了Function,它定義了asyncInvoke方法以及一個(gè)default的timeout方法;asyncInvoke方法執(zhí)行異步邏輯,然后通過ResultFuture.complete將結(jié)果或異常設(shè)置到ResultFuture,如果異常則通過ResultFuture.completeExceptionally(Throwable)來傳遞 ResultFuture;RichAsyncFunction繼承了AbstractRichFunction,同時(shí)聲明實(shí)現(xiàn)AsyncFunction接口,它不沒有實(shí)現(xiàn)asyncInvoke,交由子類實(shí)現(xiàn);它覆蓋了setRuntimeContext方法,這里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext進(jìn)行包裝.
下面是一個(gè)驗(yàn)證 Async I/O 的demo, 具體代碼見倉(cāng)庫 -> code link
public class AsyncIOExample {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> inp = env.fromElements(AsyncIOData.WORDS);
// 接收數(shù)據(jù)
SingleOutputStreamOperator<String> out = inp.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.println("讀取數(shù)據(jù):" + s + " 當(dāng)前時(shí)間:" + System.currentTimeMillis());
return s;
}
});
// 使用 AsyncFunction 對(duì)函數(shù)做一個(gè)簡(jiǎn)單的處理, 中間隨機(jī)睡眠 1-10s
DataStream<String> asyncStream = AsyncDataStream.unorderedWait(out, new SimpleAsyncFunction(), 20_000L, TimeUnit.MILLISECONDS);
// 對(duì)已經(jīng)被 AsyncFunction 處理過的數(shù)據(jù)再輸出一次
asyncStream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
System.out.println("數(shù)據(jù)處理完畢:" + s + " 當(dāng)前時(shí)間:" + System.currentTimeMillis());
return s;
}
});
env.execute("AsyncFunction Demo");
}
public static class SimpleAsyncFunction extends RichAsyncFunction<String, String>{
private long waitTime;
private final Random rnd = new Random(hashCode());
@Override
public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
// 隨機(jī)睡眠 1 - 10s
System.out.println("開始 AsyncFunction target -> " + input);
waitTime = rnd.nextInt(10);
Thread.sleep(waitTime * 1000);
String out = input + input;
resultFuture.complete(Collections.singletonList(out));
System.out.println("結(jié)束 AsyncFunction target -> " + input + " Sleep time = " + waitTime + "s");
}
}
}
以上代碼的輸出結(jié)果為:
讀取數(shù)據(jù):D 當(dāng)前時(shí)間:1569574233046
讀取數(shù)據(jù):C 當(dāng)前時(shí)間:1569574233047
讀取數(shù)據(jù):A 當(dāng)前時(shí)間:1569574233048
讀取數(shù)據(jù):B 當(dāng)前時(shí)間:1569574233049
開始 AsyncFunction target -> D
開始 AsyncFunction target -> C
開始 AsyncFunction target -> A
開始 AsyncFunction target -> B
結(jié)束 AsyncFunction target -> DSleep time = 6s
數(shù)據(jù)處理完畢:DD 當(dāng)前時(shí)間:1569574239065
結(jié)束 AsyncFunction target -> CSleep time = 6s
數(shù)據(jù)處理完畢:CC 當(dāng)前時(shí)間:1569574239069
結(jié)束 AsyncFunction target -> ASleep time = 6s
數(shù)據(jù)處理完畢:AA 當(dāng)前時(shí)間:1569574239072
結(jié)束 AsyncFunction target -> BSleep time = 6s
數(shù)據(jù)處理完畢:BB 當(dāng)前時(shí)間:1569574239076