維表是數(shù)倉中的一個(gè)概念,維表中的維度屬性是觀察數(shù)據(jù)的角度,在建設(shè)離線數(shù)倉的時(shí)候,通常是將維表與事實(shí)表進(jìn)行關(guān)聯(lián)構(gòu)建星型模型。在實(shí)時(shí)數(shù)倉中,同樣也有維表與事實(shí)表的概念,其中事實(shí)表通常存儲(chǔ)在kafka中,維表通常存儲(chǔ)在外部設(shè)備中(比如MySQL,HBase)。對(duì)于每條流式數(shù)據(jù),可以關(guān)聯(lián)一個(gè)外部維表數(shù)據(jù)源,為實(shí)時(shí)計(jì)算提供數(shù)據(jù)關(guān)聯(lián)查詢。維表可能是會(huì)不斷變化的,在維表JOIN時(shí),需指明這條記錄關(guān)聯(lián)維表快照的時(shí)刻。需要注意是,目前Flink SQL的維表JOIN僅支持對(duì)當(dāng)前時(shí)刻維表快照的關(guān)聯(lián)(處理時(shí)間語義),而不支持事實(shí)表rowtime所對(duì)應(yīng)的的維表快照(事件時(shí)間語義)。通過本文你可以了解到:
- 如何使用Flink SQL創(chuàng)建表
- 如何定義Kafka數(shù)據(jù)源表
- 如何定義MySQL數(shù)據(jù)源表
- 什么是Temporal Table Join
- 維表join的案例
Flink SQL創(chuàng)建表
注意:本文的所有操作都是在Flink SQL cli中進(jìn)行的
創(chuàng)建表的語法
CREATE TABLE [catalog_name.][db_name.]table_name
(
{ <column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
-- 定義表字段
<column_definition>:
column_name column_type [COMMENT column_comment]
-- 定義計(jì)算列
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
-- 定義水位線
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
解釋
COMPUTED COLUMN(計(jì)算列)
計(jì)算列是一個(gè)通過column_name AS computed_column_expression生成的虛擬列,產(chǎn)生的計(jì)算列不是物理存儲(chǔ)在數(shù)據(jù)源表中。一個(gè)計(jì)算列可以通過原有數(shù)據(jù)源表中的某個(gè)字段、運(yùn)算符及內(nèi)置函數(shù)生成。比如,定義一個(gè)消費(fèi)金額的計(jì)算列(cost),可以使用表的價(jià)格(price)*數(shù)量(quantity)計(jì)算得到。
計(jì)算列常常被用在定義時(shí)間屬性(見另一篇文章Flink Table API&SQL編程指南之時(shí)間屬性(3),可以通過PROCTIME()函數(shù)定義處理時(shí)間屬性,語法為proc AS PROCTIME()。除此之外,計(jì)算列可以被用作提取事件時(shí)間列,因?yàn)樵嫉氖录r(shí)間可能不是TIMESTAMP(3)類型或者是存在JSON串中。
尖叫提示:
1.在源表上定義計(jì)算列,是在讀取數(shù)據(jù)源之后計(jì)算的,計(jì)算列需要跟在SELECT查詢語句之后;
2.計(jì)算列不能被INSERT語句插入數(shù)據(jù),在INSERT語句中,只能包括實(shí)際的目標(biāo)表的schema,不能包括計(jì)算列
水位線
水位線定義了表的事件時(shí)間屬性,其語法為:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
其中rowtime_column_name表示表中已經(jīng)存在的事件時(shí)間字段,值得注意的是,該事件時(shí)間字段必須是TIMESTAMP(3)類型,即形如yyyy-MM-dd HH:mm:ss,如果不是這種形式的數(shù)據(jù)類型,需要通過定義計(jì)算列進(jìn)行轉(zhuǎn)換。
watermark_strategy_expression定義了水位線生成的策略,該表達(dá)式的返回?cái)?shù)據(jù)類型必須是TIMESTAMP(3)類型。
Flink提供了許多常用的水位線生成策略:
-
嚴(yán)格單調(diào)遞增的水位線:語法為
WATERMARK FOR rowtime_column AS rowtime_column
即直接使用時(shí)間時(shí)間戳作為水位線
-
遞增水位線:語法為
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND -
亂序水位線:語法為
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit -- 比如,允許5秒的亂序 WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
分區(qū)
根據(jù)具體的字段創(chuàng)建分區(qū)表,每一個(gè)分區(qū)會(huì)對(duì)應(yīng)一個(gè)文件路徑
WITH 選項(xiàng)
創(chuàng)建Table source或者Table sink需要指定表的屬性,屬性是以key/value的形式配置的,具體參考其相對(duì)應(yīng)的connector
尖叫提示:
Note:創(chuàng)建表時(shí)指定的表名有三種形式:
(1)catalog_name.db_name.table_name
(2)db_name.table_name
(3)table_name
對(duì)于第一種形式:會(huì)將表注冊到一個(gè)名為‘catalog_name’的catalog以及一個(gè)名為'db_name'd的數(shù)據(jù)庫的元數(shù)據(jù)中;
對(duì)于第二種形式:會(huì)將表注冊到當(dāng)前執(zhí)行環(huán)境的catalog以及名為‘db_name’的數(shù)據(jù)庫的元數(shù)據(jù)中;
對(duì)于第三種形式:會(huì)將表注冊到當(dāng)前執(zhí)行環(huán)境的catalog與數(shù)據(jù)庫的元數(shù)據(jù)中
定義Kafka數(shù)據(jù)表
kafka是構(gòu)建實(shí)時(shí)數(shù)倉常用的數(shù)據(jù)存儲(chǔ)設(shè)備,使用Flink SQL創(chuàng)建kafka數(shù)據(jù)源表的語法如下:
CREATE TABLE MyKafkaTable (
...
) WITH (
'connector.type' = 'kafka', -- 連接類型
'connector.version' = '0.11',-- 必選: 可選的kafka版本有:0.8/0.9/0.10/0.11/universal
'connector.topic' = 'topic_name', -- 必選: 主題名稱
'connector.properties.zookeeper.connect' = 'localhost:2181', -- 必選: zk連接地址
'connector.properties.bootstrap.servers' = 'localhost:9092', -- 必選: Kafka連接地址
'connector.properties.group.id' = 'testGroup', --可選: 消費(fèi)者組
-- 可選:偏移量, earliest-offset/latest-offset/group-offsets/specific-offsets
'connector.startup-mode' = 'earliest-offset',
-- 可選: 當(dāng)偏移量指定為specific offsets,為指定每個(gè)分區(qū)指定具體位置
'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
'connector.sink-partitioner' = '...', -- 可選: sink分區(qū)器,fixed/round-robin/custom
-- 可選: 當(dāng)自定義分區(qū)器時(shí),指定分區(qū)器的類名
'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
'format.type' = '...', -- 必選: 指定格式,支持csv/json/avro
-- 指定update-mode,支持append/retract/upsert
'update-mode' = 'append',
)
尖叫提示:
- 指定具體的偏移量位置:默認(rèn)是從當(dāng)前消費(fèi)者組提交的偏移量開始消費(fèi)
- sink分區(qū):默認(rèn)是盡可能向更多的分區(qū)寫數(shù)據(jù)(每一個(gè)sink并行度實(shí)例只向一個(gè)分區(qū)寫數(shù)據(jù)),也可以自已分區(qū)策略。當(dāng)使用 round-robin分區(qū)器時(shí),可以避免分區(qū)不均衡,但是會(huì)造成Flink實(shí)例與kafka broker之間大量的網(wǎng)絡(luò)連接
- 一致性保證:默認(rèn)sink語義是at-least-once
- Kafka 0.10+ 是時(shí)間戳:從kafka0.10開始,kafka消息附帶一個(gè)時(shí)間戳作為消息的元數(shù)據(jù),表示記錄被寫入kafka主題的時(shí)間,這個(gè)時(shí)間戳可以作為事件時(shí)間屬性( rowtime attribute)
- Kafka 0.11+版本:Flink從1.7開始,支持使用universal版本作為kafka的連接器 ,可以兼容kafka0.11之后版本
定義MySQL數(shù)據(jù)表
CREATE TABLE MySQLTable (
...
) WITH (
'connector.type' = 'jdbc', -- 必選: jdbc方式
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test', -- 必選: JDBC url
'connector.table' = 'jdbc_table_name', -- 必選: 表名
-- 可選: JDBC driver,如果不配置,會(huì)自動(dòng)通過url提取
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'name', -- 可選: 數(shù)據(jù)庫用戶名
'connector.password' = 'password',-- 可選: 數(shù)據(jù)庫密碼
-- 可選, 將輸入進(jìn)行分區(qū)的字段名.
'connector.read.partition.column' = 'column_name',
-- 可選, 分區(qū)數(shù)量.
'connector.read.partition.num' = '50',
-- 可選, 第一個(gè)分區(qū)的最小值.
'connector.read.partition.lower-bound' = '500',
-- 可選, 最后一個(gè)分區(qū)的最大值
'connector.read.partition.upper-bound' = '1000',
-- 可選, 一次提取數(shù)據(jù)的行數(shù),默認(rèn)為0,表示忽略此配置
'connector.read.fetch-size' = '100',
-- 可選, lookup緩存數(shù)據(jù)的最大行數(shù),如果超過改配置,老的數(shù)據(jù)會(huì)被清除
'connector.lookup.cache.max-rows' = '5000',
-- 可選,lookup緩存存活的最大時(shí)間,超過該時(shí)間舊數(shù)據(jù)會(huì)過時(shí),注意cache.max-rows與cache.ttl必須同時(shí)配置
'connector.lookup.cache.ttl' = '10s',
-- 可選, 查詢數(shù)據(jù)最大重試次數(shù)
'connector.lookup.max-retries' = '3',
-- 可選,寫數(shù)據(jù)最大的flush行數(shù),默認(rèn)5000,超過改配置,會(huì)觸發(fā)刷數(shù)據(jù)
'connector.write.flush.max-rows' = '5000',
--可選,flush數(shù)據(jù)的間隔時(shí)間,超過該時(shí)間,會(huì)通過一個(gè)異步線程flush數(shù)據(jù),默認(rèn)是0s
'connector.write.flush.interval' = '2s',
-- 可選, 寫數(shù)據(jù)失敗最大重試次數(shù)
'connector.write.max-retries' = '3'
)
Temporal Table Join
使用語法
SELECT column-names
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.proctime [AS <alias2>]
ON table1.column-name1 = table2.key-name1
注意:目前,僅支持INNER JOIN與LEFT JOIN。在join的時(shí)候需要使用 FOR SYSTEM_TIME AS OF ,其中table1.proctime表示table1的proctime處理時(shí)間屬性(計(jì)算列)。使用FOR SYSTEM_TIME AS OF table1.proctime表示當(dāng)左邊表的記錄與右邊的維表join時(shí),只匹配當(dāng)前處理時(shí)間維表所對(duì)應(yīng)的的快照數(shù)據(jù)。
樣例
SELECT
o.amout, o.currency, r.rate, o.amount * r.rate
FROM
Orders AS o
JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
ON r.currency = o.currency
使用說明
- 僅支持Blink planner
- 僅支持SQL,目前不支持Table API
- 目前不支持基于事件時(shí)間(event time)的temporal table join
- 維表可能會(huì)不斷變化,JOIN行為發(fā)生后,維表中的數(shù)據(jù)發(fā)生了變化(新增、更新或刪除),則已關(guān)聯(lián)的維表數(shù)據(jù)不會(huì)被同步變化
- 維表和維表不能進(jìn)行JOIN
- 維表必須指定主鍵。維表JOIN時(shí),ON的條件必須包含所有主鍵的等值條件
維表Join案例
背景
Kafka中有一份用戶行為數(shù)據(jù),包括pv,buy,cart,fav行為;MySQL中有一份省份區(qū)域的維表數(shù)據(jù)?,F(xiàn)將兩種表進(jìn)行JOIN,統(tǒng)計(jì)每個(gè)區(qū)域的購買行為數(shù)量。
步驟
維表存儲(chǔ)在MySQL中,如下創(chuàng)建維表數(shù)據(jù)源:
CREATE TABLE dim_province (
province_id BIGINT, -- 省份id
province_name VARCHAR, -- 省份名稱
region_name VARCHAR -- 區(qū)域名稱
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.10.203:3306/mydw',
'connector.table' = 'dim_province',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '123qwe',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
事實(shí)表存儲(chǔ)在kafka中,數(shù)據(jù)為用戶點(diǎn)擊行為,格式為JSON,具體數(shù)據(jù)樣例如下:
{"user_id":63401,"item_id":6244,"cat_id":143,"action":"pv","province":3,"ts":1573445919}
{"user_id":9164,"item_id":2817,"cat_id":611,"action":"fav","province":28,"ts":1573420486}
創(chuàng)建kafka數(shù)據(jù)源表,如下:
CREATE TABLE user_behavior (
user_id BIGINT, -- 用戶id
item_id BIGINT, -- 商品id
cat_id BIGINT, -- 品類id
action STRING, -- 用戶行為
province INT, -- 用戶所在的省份
ts BIGINT, -- 用戶行為發(fā)生的時(shí)間戳
proctime as PROCTIME(), -- 通過計(jì)算列產(chǎn)生一個(gè)處理時(shí)間列
eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件時(shí)間
WATERMARK FOR eventTime as eventTime - INTERVAL '5' SECOND -- 在eventTime上定義watermark
) WITH (
'connector.type' = 'kafka', -- 使用 kafka connector
'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
'connector.topic' = 'user_behavior', -- kafka主題
'connector.startup-mode' = 'earliest-offset', -- 偏移量,從起始 offset 開始讀取
'connector.properties.group.id' = 'group1', -- 消費(fèi)者組
'connector.properties.zookeeper.connect' = 'kms-2:2181,kms-3:2181,kms-4:2181', -- zookeeper 地址
'connector.properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', -- kafka broker 地址
'format.type' = 'json' -- 數(shù)據(jù)源格式為 json
);
創(chuàng)建MySQL的結(jié)果表,表示區(qū)域銷量
CREATE TABLE region_sales_sink (
region_name STRING, -- 區(qū)域名稱
buy_cnt BIGINT -- 銷量
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://192.168.10.203:3306/mydw',
'connector.table' = 'top_region', -- MySQL中的待插入數(shù)據(jù)的表
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = '123qwe',
'connector.write.flush.interval' = '1s'
);
用戶行為數(shù)據(jù)與省份維表數(shù)據(jù)join
CREATE VIEW user_behavior_detail AS
SELECT
u.user_id,
u.item_id,
u.cat_id,
u.action,
p.province_name,
p.region_name
FROM user_behavior AS u LEFT JOIN dim_province FOR SYSTEM_TIME AS OF u.proctime AS p
ON u.province = p.province_id;
計(jì)算區(qū)域的銷量,并將計(jì)算結(jié)果寫入MySQL
INSERT INTO region_sales_sink
SELECT
region_name,
COUNT(*) buy_cnt
FROM user_behavior_detail
WHERE action = 'buy'
GROUP BY region_name;
結(jié)果查看:
Flink SQL> select * from region_sales_sink; -- 在Flink SQL cli中查看

mysql> select * from top_region; -- 查看MySQL的數(shù)據(jù)

總結(jié)
本文主要介紹了FlinkSQL的維表join,使用的方式為Temporal Table Join。首先介紹了Flink SQL創(chuàng)建表的基本語法,并對(duì)其中的細(xì)節(jié)進(jìn)行了描述。接著介紹了如何創(chuàng)建Kafka以及MySQL的數(shù)據(jù)源表。然后介紹了Temporal Table Join的基本概念與使用語法。最后,給出了一個(gè)完整的維表JOIN案例。