flink使用08-在dataStream中使用AsyncFunction

在流式處理的過程中, 在中間步驟的處理中, 如果涉及到一些費(fèi)事的操作或者是外部系統(tǒng)的數(shù)據(jù)交互, 那么就會(huì)給整個(gè)流造成一定的延遲. 在 flink 的 1.2 版本中引入了 Asynchronous I/O, 能夠支持異步的操作, 以提高 flink 系統(tǒng)與外部數(shù)據(jù)系統(tǒng)交互的性能及吞吐量.

在使用 Flink 的異步 IO 時(shí), 主要有兩個(gè) API可以使用, 一個(gè)是AsyncDataStream.unorderedWait( ), 另一個(gè)AsyncDataStream.orderedWait( ).在異步處理過程中,原本數(shù)據(jù)的順序可能會(huì)發(fā)生變化, 使用unorderWait的方法, 不會(huì)考慮順序的問題, 一旦處理完成就會(huì)直接返回結(jié)果, 這種方法具有較低的延遲和負(fù)載. 那么orderWait的方法就是想對(duì)應(yīng)的, 嚴(yán)格按照原本流中的數(shù)據(jù)順序做返回, 會(huì)對(duì)系統(tǒng)造成一定的延遲. 實(shí)際中應(yīng)該根據(jù)具體的業(yè)務(wù)情況做選擇.unorderedWait或orderedWait有兩個(gè)關(guān)于async operation的參數(shù),一個(gè)是timeout參數(shù)用于設(shè)置async的超時(shí)時(shí)間,一個(gè)是capacity參數(shù)用于指定同一時(shí)刻最大允許多少個(gè)(并發(fā))async request在執(zhí)行;

在使用異步IO時(shí),需要自己去繼承AsyncFunction,AsyncFunction接口繼承了Function,它定義了asyncInvoke方法以及一個(gè)default的timeout方法;asyncInvoke方法執(zhí)行異步邏輯,然后通過ResultFuture.complete將結(jié)果或異常設(shè)置到ResultFuture,如果異常則通過ResultFuture.completeExceptionally(Throwable)來傳遞 ResultFuture;RichAsyncFunction繼承了AbstractRichFunction,同時(shí)聲明實(shí)現(xiàn)AsyncFunction接口,它不沒有實(shí)現(xiàn)asyncInvoke,交由子類實(shí)現(xiàn);它覆蓋了setRuntimeContext方法,這里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext進(jìn)行包裝.

下面是一個(gè)驗(yàn)證 Async I/O 的demo, 具體代碼見倉(cāng)庫 -> code link

public class AsyncIOExample {
    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> inp = env.fromElements(AsyncIOData.WORDS);
        // 接收數(shù)據(jù)
        SingleOutputStreamOperator<String> out = inp.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.println("讀取數(shù)據(jù):" + s + "  當(dāng)前時(shí)間:" + System.currentTimeMillis());
                return s;
            }
        });
        // 使用 AsyncFunction 對(duì)函數(shù)做一個(gè)簡(jiǎn)單的處理, 中間隨機(jī)睡眠 1-10s
        DataStream<String> asyncStream = AsyncDataStream.unorderedWait(out, new SimpleAsyncFunction(), 20_000L, TimeUnit.MILLISECONDS);
        // 對(duì)已經(jīng)被 AsyncFunction 處理過的數(shù)據(jù)再輸出一次
        asyncStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                System.out.println("數(shù)據(jù)處理完畢:" + s + "  當(dāng)前時(shí)間:" + System.currentTimeMillis());
                return s;
            }
        });


        env.execute("AsyncFunction Demo");
    }

    public static class SimpleAsyncFunction extends RichAsyncFunction<String, String>{

        private long waitTime;
        private final Random rnd = new Random(hashCode());

        @Override
        public void asyncInvoke(String input, ResultFuture<String> resultFuture) throws Exception {
            // 隨機(jī)睡眠 1 - 10s
            System.out.println("開始 AsyncFunction  target -> " + input);
            waitTime = rnd.nextInt(10);
            Thread.sleep(waitTime * 1000);
            String out = input + input;
            resultFuture.complete(Collections.singletonList(out));
            System.out.println("結(jié)束 AsyncFunction  target -> " + input + "  Sleep time = " + waitTime + "s");
        }
    }
}

以上代碼的輸出結(jié)果為:

讀取數(shù)據(jù):D  當(dāng)前時(shí)間:1569574233046
讀取數(shù)據(jù):C  當(dāng)前時(shí)間:1569574233047
讀取數(shù)據(jù):A  當(dāng)前時(shí)間:1569574233048
讀取數(shù)據(jù):B  當(dāng)前時(shí)間:1569574233049
開始 AsyncFunction  target -> D
開始 AsyncFunction  target -> C
開始 AsyncFunction  target -> A
開始 AsyncFunction  target -> B
結(jié)束 AsyncFunction  target -> DSleep time = 6s
數(shù)據(jù)處理完畢:DD  當(dāng)前時(shí)間:1569574239065
結(jié)束 AsyncFunction  target -> CSleep time = 6s
數(shù)據(jù)處理完畢:CC  當(dāng)前時(shí)間:1569574239069
結(jié)束 AsyncFunction  target -> ASleep time = 6s
數(shù)據(jù)處理完畢:AA  當(dāng)前時(shí)間:1569574239072
結(jié)束 AsyncFunction  target -> BSleep time = 6s
數(shù)據(jù)處理完畢:BB  當(dāng)前時(shí)間:1569574239076
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,656評(píng)論 1 32
  • 序 本文主要研究一下flink的Async I/O 實(shí)例 本實(shí)例展示了flink Async I/O的基本用法,首...
    go4it閱讀 2,503評(píng)論 0 2
  • iPhone的標(biāo)準(zhǔn)推薦是CFNetwork 庫編程,其封裝好的開源庫是 cocoa AsyncSocket庫,用它...
    Ethan_Struggle閱讀 2,359評(píng)論 2 12
  • 序 本文主要研究一下flink的AsyncWaitOperator AsyncWaitOperator flink...
    go4it閱讀 1,384評(píng)論 0 0
  • 這是一個(gè)深刻的主題,探索人生意義。我們的行為由信念由思維決定,今天講到生命的意義,人為什么而活著,活著應(yīng)該擁有的思...
    英子Lucy閱讀 474評(píng)論 0 1

友情鏈接更多精彩內(nèi)容