Flink 雙流Join
概述
- 在之前的Flink教程03里面給大家講過了維表Join,今天來和大家分享一下雙流Join
- 目前Flink雙流Join分成兩類:UnBounded Join 和 Time Interval Join
- 在有些場景下,用哪個都行,不過后者的性能會優(yōu)于前者,而且如果在雙流Join之后想要再進行窗口計算,那么只能使用Time Interval Join,目前的UnBounded Join后面是沒有辦法再進行Event Time的窗口計算
UnBounded Join
- 分為兩種Join,一種是Inner Join,另一種是Outer Join
- Inner Join:雙流Join最大的問題是兩邊的數(shù)據(jù)量是不一樣的,會存在一條流中的數(shù)據(jù)已經(jīng)到達,而另一條流中與之匹配的數(shù)據(jù)還未到達的情況,那么Flink是如何解決這個問題的呢?舉個例子:假設左表先來了3條數(shù)據(jù),Join 的key分別是1、2、3,右表中尚未有數(shù)據(jù)到達,那么Flink 會將左表的那三條數(shù)據(jù)緩存在Join節(jié)點的state中,同時不會有數(shù)據(jù)下發(fā)。此時,右表來了一條key是4的數(shù)據(jù),未能與左表中的key關聯(lián)上,那么這條數(shù)據(jù)同樣也會被緩存在Join節(jié)點的state中。而當右表來了一條key為1的數(shù)據(jù)時,與左表中key為1的數(shù)據(jù)成功關聯(lián),那么此時,會將這兩條數(shù)據(jù)Join起來之后的數(shù)據(jù)下發(fā),而其他尚未匹配上的數(shù)據(jù)會在state中繼續(xù)等待,直到他們的有緣人出現(xiàn),才能夠繼續(xù)前進。
- Outer Join:支持
LEFT JOIN、RIGHT JOIN、FULL OUTER JOIN三種語法,此處我們以LEFT JOIN為例。還是左表先來三條數(shù)據(jù),key分別是1、2、3,不過此時的結果會和上面的不一樣,他們?nèi)齻€雖然還會在Join節(jié)點的state中緩存,但是會將數(shù)據(jù)下發(fā),那么大家會問了,右邊的數(shù)據(jù)怎么辦,此時并沒有Join成功啊,如果下發(fā)數(shù)據(jù)不就存在異常嗎?答:Flink會將右邊的數(shù)據(jù)補上NULL,當右表中key為1、2、3的數(shù)據(jù)出現(xiàn)時,會將剛才下發(fā)的三條數(shù)據(jù)撤回,將右表中的數(shù)據(jù)重新填充到下發(fā)的三條數(shù)據(jù)中,之后,再將這三條數(shù)據(jù)下發(fā);而如果右表先到了,左表尚未到達的話,會一直等待,不會先行下發(fā)再撤回。RIGHT JOIN與之相似,只是一個下發(fā)左邊,一個下發(fā)右邊;FULL OUTER JOIN是兩邊都會下發(fā)和撤回。
- 缺點:
- 因為要存放大量的數(shù)據(jù)在state中,如果左右表的數(shù)據(jù)一直無法匹配,那么久而久之,內(nèi)存很容易就被打爆。解決辦法有加機器和使用RocksDBStateBackend,同時需要配上合理的狀態(tài)清理配置,具體的寫法可以自行翻看官網(wǎng)文檔
- Join之前最好先根據(jù)主鍵去重,不然會緩存大量無用數(shù)據(jù)在Join節(jié)點的state節(jié)點中。舉個栗子:key為1的數(shù)據(jù)因為各種原因出現(xiàn)了三條,而這三條實際上是同一條數(shù)據(jù)。那么,在Join時,如果右表只有一條key為1的數(shù)據(jù),那么只會有一條數(shù)據(jù)下發(fā)(Inner Join)另外兩條一直在死等;或者下發(fā)一條有右邊數(shù)據(jù)的和兩條右邊數(shù)據(jù)為NULL的數(shù)據(jù)(Left Outer Join),同時,這兩條數(shù)據(jù)也會在Join節(jié)點的state中緩存,等待右表的數(shù)據(jù)到達。同樣也會打爆我們的內(nèi)存。去重可以很好的減少Join節(jié)點內(nèi)存的壓力
- 假設現(xiàn)在有A、B、C三條流要進行JOIN,SQL寫法為:
A LEFT JOIN B ON A.KEY1 = B.KEY1 LEFT JOIN C ON B.KEY2 = C.KEY2,如果A與B Join的結果產(chǎn)生了大量B.KEY2為NULL的數(shù)據(jù),那么在與C Join時,必然會出現(xiàn)熱點問題。那么如何解決呢?我們可以交換Join的順序,讓B、C先行Join,產(chǎn)生的結果再與A流進行Join,這樣就能很好的解決熱點問題
- 下面我們通過代碼和運行結果,來看看UnBounded Join的寫法和產(chǎn)生結果
package FlinkSql;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class FlinkSql05 {
public static final String KAFKA_TABLE_SOURCE_DDL_01 = ""+
"CREATE TABLE t1 (\n" +
" user_id BIGINT,\n" +
" order_id BIGINT,\n" +
" ts BIGINT\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定連接類型是kafka\n" +
" 'connector.version' = '0.11', -- 與我們之前Docker安裝的kafka版本要一致\n" +
" 'connector.topic' = 'unBoundedJoin01_t1', -- 之前創(chuàng)建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定從最早消費\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_02 = ""+
"CREATE TABLE t2 (\n" +
" order_id BIGINT,\n" +
" item_id BIGINT,\n" +
" ts BIGINT\n" +
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定連接類型是kafka\n" +
" 'connector.version' = '0.11', -- 與我們之前Docker安裝的kafka版本要一致\n" +
" 'connector.topic' = 'unBoundedJoin01_t2', -- 之前創(chuàng)建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定從最早消費\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_03 = ""+
"CREATE TABLE t3 (\n" +
" user_id BIGINT,\n" +
" order_id BIGINT,\n" +
" ts BIGINT,\n" +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 計算列,因為ts是bigint,沒法作為水印,所以用UDF轉(zhuǎn)成TimeStamp\n"+
" WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定連接類型是kafka\n" +
" 'connector.version' = '0.11', -- 與我們之前Docker安裝的kafka版本要一致\n" +
" 'connector.topic' = 'timeIntervalJoin_01', -- 之前創(chuàng)建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定從最早消費\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
public static final String KAFKA_TABLE_SOURCE_DDL_04 = ""+
"CREATE TABLE t4 (\n" +
" order_id BIGINT,\n" +
" item_id BIGINT,\n" +
" ts BIGINT,\n" +
" r_t AS TO_TIMESTAMP(FROM_UNIXTIME(ts,'yyyy-MM-dd HH:mm:ss'),'yyyy-MM-dd HH:mm:ss'),-- 計算列,因為ts是bigint,沒法作為水印,所以用UDF轉(zhuǎn)成TimeStamp\n"+
" WATERMARK FOR r_t AS r_t - INTERVAL '5' SECOND -- 指定水印生成方式\n"+
") WITH (\n" +
" 'connector.type' = 'kafka', -- 指定連接類型是kafka\n" +
" 'connector.version' = '0.11', -- 與我們之前Docker安裝的kafka版本要一致\n" +
" 'connector.topic' = 'timeIntervalJoin_02', -- 之前創(chuàng)建的topic \n" +
" 'connector.properties.group.id' = 'flink-test-0', -- 消費者組,相關概念可自行百度\n" +
" 'connector.startup-mode' = 'latest-offset', --指定從最早消費\n" +
" 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zk地址\n" +
" 'connector.properties.bootstrap.servers' = 'localhost:9092', -- broker地址\n" +
" 'format.type' = 'csv' -- csv格式,和topic中的消息格式保持一致\n" +
")";
// public static final String MYSQL_TABLE_SINK = "";
public static void main(String argv[]) throws Exception {
//構建StreamExecutionEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//構建EnvironmentSettings 并指定Blink Planner
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//構建StreamTableEnvironment
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
//注冊kafka 數(shù)據(jù)源表
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_01);
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_02);
//左表數(shù)據(jù) 543462,1001,1511658000
//右表數(shù)據(jù) 1001,4238,1511658001
//不用一開始就給kafka灌入數(shù)據(jù),可以等任務正常啟動沒有數(shù)據(jù)后再輸入數(shù)據(jù),方便觀察現(xiàn)象
//UnBounded 雙流Join 之 Inner Join
Table unBoundedJoin_inner_join = tEnv.sqlQuery("select a.*,b.* from t1 a inner join t2 b on a.order_id = b.order_id");
DataStream<Tuple2<Boolean, Row>> unBoundedJoin_inner_join_DS = tEnv.toRetractStream(unBoundedJoin_inner_join, Row.class);
//在一開始沒有數(shù)據(jù)時,沒有輸出;當我們往左表的kafka中輸入543462,1001,1511658000時,依舊沒有數(shù)據(jù)下發(fā),符合我們之前所說的言論
//之后再往右表灌入數(shù)據(jù),此時會有數(shù)據(jù)輸出
//(true,543462,1001,1511658000,1001,4238,1511658001)
// unBoundedJoin_inner_join_DS.print().setParallelism(1).name("unBoundedJoin_inner_join");
//UnBounded 雙流Join 之 Left Join
//再準備幾條kafka數(shù)據(jù)
//左表 223813,2042400,1511658002
//右表 2042400,4104826,1511658001
//同樣也是先別灌入
Table unBoundedJoin_left_join = tEnv.sqlQuery("select a.*,b.* from t1 a left join t2 b on a.order_id = b.order_id");
DataStream<Tuple2<Boolean, Row>> unBoundedJoin_left_join_DS = tEnv.toRetractStream(unBoundedJoin_left_join, Row.class);
// unBoundedJoin_left_join_DS.print().setParallelism(1).name("unBoundedJoin_left_join");
//此時左表輸入223813,2042400,1511658002,發(fā)現(xiàn)數(shù)據(jù)下發(fā),右邊都為NULL
//輸出:(true,223813,2042400,1511658002,null,null,null)
//然后再將2042400,4104826,1511658001插入右表中
//(false,223813,2042400,1511658002,null,null,null)
//(true,223813,2042400,1511658002,2042400,4104826,1511658001)
//與我們前面所說一致!先是輸出右邊補齊為NULL的數(shù)據(jù),等能夠Join上了,再撤回剛才的數(shù)據(jù),重新將Join之后的數(shù)據(jù)下發(fā)
//我們測試的都是左表先到,而右表在等待的情況,那么如果右表先到,左表后到,數(shù)據(jù)結果又是什么樣呢?大家自行嘗試吧
//執(zhí)行任務,必不可少一句話!
env.execute("雙流join");
}
}
Time Interval Join
- 寫法:
ltime = rtime
ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE
ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND
- 目前只支持Inner Join,如果想讓Join不上的數(shù)據(jù)最終也下發(fā),只能使用UnBounded Join
- 要么都是Event Time 要么都是Process Time,不能混用
- 同樣,我們也通過代碼來學習如何使用
//將下面代碼嵌入上面的 env.execute("雙流join") 前面
//Time Interval 雙流JOIN
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_03);
tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL_04);
//左表數(shù)據(jù) 543462,1001,1511658000
//右表數(shù)據(jù) 1001,4238,1511658011
//使用time interval join,并且指定時間范圍為t3.r_t的上下10秒內(nèi)
Table timeIntervalJoin = tEnv.sqlQuery(""+
"select t3.*,t4.item_id,t4.ts from t3 join t4 on t3.order_id = t4.order_id " +
"and t4.r_t between t3.r_t - interval '10' second and t3.r_t + interval '10' second ");
//因為是time interval join,所以不會有撤回事件發(fā)生,所以使用append流
DataStream<Row> tiemIntervalJoinDs = tEnv.toAppendStream(timeIntervalJoin, Row.class);
tiemIntervalJoinDs.print().setParallelism(1).name("timeIntervalJoin");
//當我們將數(shù)據(jù)輸入各自的kafka topic中后,發(fā)現(xiàn)并沒有數(shù)據(jù)輸出,因為t3.r_t - t4.r_t = -11,已經(jīng)超過了我們指定的時間范圍
//右表再輸入1001,4238,1511658010
//輸出:543462,1001,1511658000,2017-11-26T09:00,4238,1511658010
//time interval join之后可以再接窗口計算,這里就不給大家實際演示了,大家自行操作吧
附錄
- 因為這次使用的是csv格式的數(shù)據(jù),所以大家記得在
pom.xml里面加上依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.10.0</version>
</dependency>