Flink Sql教程(5)

Flink 雙流Join

概述

  • 在之前的Flink教程03里面給大家講過了維表Join,今天來和大家分享一下雙流Join
  • 目前Flink雙流Join分成兩類:UnBounded Join 和 Time Interval Join
  • 在有些場景下,用哪個都行,不過后者的性能會優(yōu)于前者,而且如果在雙流Join之后想要再進行窗口計算,那么只能使用Time Interval Join,目前的UnBounded Join后面是沒有辦法再進行Event Time的窗口計算

UnBounded Join

  • 分為兩種Join,一種是Inner Join,另一種是Outer Join
    • Inner Join:雙流Join最大的問題是兩邊的數(shù)據(jù)量是不一樣的,會存在一條流中的數(shù)據(jù)已經(jīng)到達,而另一條流中與之匹配的數(shù)據(jù)還未到達的情況,那么Flink是如何解決這個問題的呢?舉個例子:假設左表先來了3條數(shù)據(jù),Join 的key分別是1、2、3,右表中尚未有數(shù)據(jù)到達,那么Flink 會將左表的那三條數(shù)據(jù)緩存在Join節(jié)點的state中,同時不會有數(shù)據(jù)下發(fā)。此時,右表來了一條key是4的數(shù)據(jù),未能與左表中的key關聯(lián)上,那么這條數(shù)據(jù)同樣也會被緩存在Join節(jié)點的state中。而當右表來了一條key為1的數(shù)據(jù)時,與左表中key為1的數(shù)據(jù)成功關聯(lián),那么此時,會將這兩條數(shù)據(jù)Join起來之后的數(shù)據(jù)下發(fā),而其他尚未匹配上的數(shù)據(jù)會在state中繼續(xù)等待,直到他們的有緣人出現(xiàn),才能夠繼續(xù)前進。
    • Outer Join:支持LEFT JOIN、RIGHT JOINFULL OUTER JOIN三種語法,此處我們以LEFT JOIN為例。還是左表先來三條數(shù)據(jù),key分別是1、2、3,不過此時的結果會和上面的不一樣,他們?nèi)齻€雖然還會在Join節(jié)點的state中緩存,但是會將數(shù)據(jù)下發(fā),那么大家會問了,右邊的數(shù)據(jù)怎么辦,此時并沒有Join成功啊,如果下發(fā)數(shù)據(jù)不就存在異常嗎?答:Flink會將右邊的數(shù)據(jù)補上NULL,當右表中key為1、2、3的數(shù)據(jù)出現(xiàn)時,會將剛才下發(fā)的三條數(shù)據(jù)撤回,將右表中的數(shù)據(jù)重新填充到下發(fā)的三條數(shù)據(jù)中,之后,再將這三條數(shù)據(jù)下發(fā);而如果右表先到了,左表尚未到達的話,會一直等待,不會先行下發(fā)再撤回。RIGHT JOIN與之相似,只是一個下發(fā)左邊,一個下發(fā)右邊;FULL OUTER JOIN是兩邊都會下發(fā)和撤回。
    • 缺點:
      • 因為要存放大量的數(shù)據(jù)在state中,如果左右表的數(shù)據(jù)一直無法匹配,那么久而久之,內(nèi)存很容易就被打爆。解決辦法有加機器和使用RocksDBStateBackend,同時需要配上合理的狀態(tài)清理配置,具體的寫法可以自行翻看官網(wǎng)文檔
      • Join之前最好先根據(jù)主鍵去重,不然會緩存大量無用數(shù)據(jù)在Join節(jié)點的state節(jié)點中。舉個栗子:key為1的數(shù)據(jù)因為各種原因出現(xiàn)了三條,而這三條實際上是同一條數(shù)據(jù)。那么,在Join時,如果右表只有一條key為1的數(shù)據(jù),那么只會有一條數(shù)據(jù)下發(fā)(Inner Join)另外兩條一直在死等;或者下發(fā)一條有右邊數(shù)據(jù)的和兩條右邊數(shù)據(jù)為NULL的數(shù)據(jù)(Left Outer Join),同時,這兩條數(shù)據(jù)也會在Join節(jié)點的state中緩存,等待右表的數(shù)據(jù)到達。同樣也會打爆我們的內(nèi)存。去重可以很好的減少Join節(jié)點內(nèi)存的壓力
      • 假設現(xiàn)在有A、B、C三條流要進行JOIN,SQL寫法為:A LEFT JOIN B ON A.KEY1 = B.KEY1 LEFT JOIN C ON B.KEY2 = C.KEY2,如果A與B Join的結果產(chǎn)生了大量B.KEY2為NULL的數(shù)據(jù),那么在與C Join時,必然會出現(xiàn)熱點問題。那么如何解決呢?我們可以交換Join的順序,讓B、C先行Join,產(chǎn)生的結果再與A流進行Join,這樣就能很好的解決熱點問題
    • 下面我們通過代碼和運行結果,來看看UnBounded Join的寫法和產(chǎn)生結果
    package FlinkSql;

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    
    public class FlinkSql05 {
    
        public static final String KAFKA_TABLE_SOURCE_DDL_01 = ""+
                "CREATE TABLE t1 (\n" +
                "    user_id BIGINT,\n" +
                "    order_id BIGINT,\n" +
                "    ts BIGINT\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定連接類型是kafka\n" +
                "    'connector.version' = '0.11',  -- 與我們之前Docker安裝的kafka版本要一致\n" +
                "    'connector.topic' = 'unBoundedJoin01_t1', -- 之前創(chuàng)建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定從最早消費\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
    
        public static final String KAFKA_TABLE_SOURCE_DDL_02 = ""+
                "CREATE TABLE t2 (\n" +
                "    order_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    ts BIGINT\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定連接類型是kafka\n" +
                "    'connector.version' = '0.11',  -- 與我們之前Docker安裝的kafka版本要一致\n" +
                "    'connector.topic' = 'unBoundedJoin01_t2', -- 之前創(chuàng)建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定從最早消費\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
        public static final String KAFKA_TABLE_SOURCE_DDL_03 = ""+
                "CREATE TABLE t3 (\n" +
                "    user_id BIGINT,\n" +
                "    order_id BIGINT,\n" +
                "    ts BIGINT,\n" +
                "    r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 計算列,因為ts是bigint,沒法作為水印,所以用UDF轉(zhuǎn)成TimeStamp\n"+
                "    WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定連接類型是kafka\n" +
                "    'connector.version' = '0.11',  -- 與我們之前Docker安裝的kafka版本要一致\n" +
                "    'connector.topic' = 'timeIntervalJoin_01', -- 之前創(chuàng)建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定從最早消費\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
    
        public static final String KAFKA_TABLE_SOURCE_DDL_04 = ""+
                "CREATE TABLE t4 (\n" +
                "    order_id BIGINT,\n" +
                "    item_id BIGINT,\n" +
                "    ts BIGINT,\n" +
                "    r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 計算列,因為ts是bigint,沒法作為水印,所以用UDF轉(zhuǎn)成TimeStamp\n"+
                "    WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
                ") WITH (\n" +
                "    'connector.type' = 'kafka',  -- 指定連接類型是kafka\n" +
                "    'connector.version' = '0.11',  -- 與我們之前Docker安裝的kafka版本要一致\n" +
                "    'connector.topic' = 'timeIntervalJoin_02', -- 之前創(chuàng)建的topic \n" +
                "    'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
                "    'connector.startup-mode' = 'latest-offset',  --指定從最早消費\n" +
                "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
                "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
                "    'format.type' = 'csv'  -- csv格式,和topic中的消息格式保持一致\n" +
                ")";
    
    
    //    public static final String MYSQL_TABLE_SINK = "";
    
        public static void main(String argv[]) throws Exception {
    
            //構建StreamExecutionEnvironment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //構建EnvironmentSettings 并指定Blink Planner
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    
            //構建StreamTableEnvironment
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
    
            //注冊kafka 數(shù)據(jù)源表
            tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_01);
    
            tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_02);
    
            //左表數(shù)據(jù)  543462,1001,1511658000
            //右表數(shù)據(jù)  1001,4238,1511658001
            //不用一開始就給kafka灌入數(shù)據(jù),可以等任務正常啟動沒有數(shù)據(jù)后再輸入數(shù)據(jù),方便觀察現(xiàn)象
    
            //UnBounded 雙流Join 之 Inner Join
            Table unBoundedJoin_inner_join = tEnv.sqlQuery("select a.*,b.* from t1 a inner join t2 b on a.order_id = b.order_id");
    
            DataStream<Tuple2<Boolean, Row>> unBoundedJoin_inner_join_DS = tEnv.toRetractStream(unBoundedJoin_inner_join, Row.class);
    
            //在一開始沒有數(shù)據(jù)時,沒有輸出;當我們往左表的kafka中輸入543462,1001,1511658000時,依舊沒有數(shù)據(jù)下發(fā),符合我們之前所說的言論
            //之后再往右表灌入數(shù)據(jù),此時會有數(shù)據(jù)輸出
            //(true,543462,1001,1511658000,1001,4238,1511658001)
    //        unBoundedJoin_inner_join_DS.print().setParallelism(1).name("unBoundedJoin_inner_join");
    
            //UnBounded 雙流Join 之 Left Join
            //再準備幾條kafka數(shù)據(jù)
            //左表    223813,2042400,1511658002
            //右表    2042400,4104826,1511658001
            //同樣也是先別灌入
    
            Table unBoundedJoin_left_join = tEnv.sqlQuery("select a.*,b.* from t1 a left join t2 b on a.order_id = b.order_id");
    
            DataStream<Tuple2<Boolean, Row>> unBoundedJoin_left_join_DS = tEnv.toRetractStream(unBoundedJoin_left_join, Row.class);
    
    //        unBoundedJoin_left_join_DS.print().setParallelism(1).name("unBoundedJoin_left_join");
            //此時左表輸入223813,2042400,1511658002,發(fā)現(xiàn)數(shù)據(jù)下發(fā),右邊都為NULL
            //輸出:(true,223813,2042400,1511658002,null,null,null)
            //然后再將2042400,4104826,1511658001插入右表中
            //(false,223813,2042400,1511658002,null,null,null)
            //(true,223813,2042400,1511658002,2042400,4104826,1511658001)
            //與我們前面所說一致!先是輸出右邊補齊為NULL的數(shù)據(jù),等能夠Join上了,再撤回剛才的數(shù)據(jù),重新將Join之后的數(shù)據(jù)下發(fā)
            //我們測試的都是左表先到,而右表在等待的情況,那么如果右表先到,左表后到,數(shù)據(jù)結果又是什么樣呢?大家自行嘗試吧
        
            //執(zhí)行任務,必不可少一句話!
            env.execute("雙流join");
        }
    }

Time Interval Join

  • 寫法:
    • ltime = rtime
    • ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
    • ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
  • 目前只支持Inner Join,如果想讓Join不上的數(shù)據(jù)最終也下發(fā),只能使用UnBounded Join
  • 要么都是Event Time 要么都是Process Time,不能混用
  • 同樣,我們也通過代碼來學習如何使用
    //將下面代碼嵌入上面的 env.execute("雙流join") 前面
    //Time Interval 雙流JOIN

    tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_03);

    tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_04);

    //左表數(shù)據(jù)  543462,1001,1511658000
    //右表數(shù)據(jù)  1001,4238,1511658011

    //使用time interval join,并且指定時間范圍為t3.r_t的上下10秒內(nèi)
    Table timeIntervalJoin = tEnv.sqlQuery(""+
            "select t3.*,t4.item_id,t4.ts from t3 join t4 on t3.order_id = t4.order_id " +
            "and t4.r_t between t3.r_t - interval '10' second and t3.r_t + interval '10' second ");

    //因為是time interval join,所以不會有撤回事件發(fā)生,所以使用append流
    DataStream<Row> tiemIntervalJoinDs = tEnv.toAppendStream(timeIntervalJoin, Row.class);

    tiemIntervalJoinDs.print().setParallelism(1).name("timeIntervalJoin");
    //當我們將數(shù)據(jù)輸入各自的kafka topic中后,發(fā)現(xiàn)并沒有數(shù)據(jù)輸出,因為t3.r_t - t4.r_t = -11,已經(jīng)超過了我們指定的時間范圍
    //右表再輸入1001,4238,1511658010
    //輸出:543462,1001,1511658000,2017-11-26T09:00,4238,1511658010
    //time interval join之后可以再接窗口計算,這里就不給大家實際演示了,大家自行操作吧

附錄

  • 因為這次使用的是csv格式的數(shù)據(jù),所以大家記得在pom.xml里面加上依賴
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.10.0</version>
</dependency>
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容