流式概念
Flink 的 Table API 和 SQL 是流批統(tǒng)一的 API。 這意味著 Table API & SQL 在無論有限的批式輸入還是無限的流式輸入下,都具有相同的語義。 因?yàn)閭鹘y(tǒng)的關(guān)系代數(shù)以及 SQL 最開始都是為了批式處理而設(shè)計(jì)的, 關(guān)系型查詢在流式場景下不如在批式場景下容易懂。
1、狀態(tài)管理
一個(gè)表程序(Table program)可以配置一個(gè) state backend 和多個(gè)不同的 checkpoint 選項(xiàng) 以處理對不同狀態(tài)大小和容錯(cuò)需求。這可以對正在運(yùn)行的 Table API & SQL 管道(pipeline)生成 savepoint,并在這之后用其恢復(fù)應(yīng)用程序的狀態(tài)。
狀態(tài)使用
- 由于 Table API & SQL 程序是聲明式的,管道內(nèi)的狀態(tài)會(huì)在哪以及如何被使用并不明確。 Planner 會(huì)確認(rèn)是否需要狀態(tài)來得到正確的計(jì)算結(jié)果, 管道會(huì)被現(xiàn)有優(yōu)化規(guī)則集優(yōu)化成盡可能少地使用狀態(tài)。
- 形如 SELECT ... FROM ... WHERE 這種只包含字段映射或過濾器的查詢的查詢語句通常是無狀態(tài)的管道。
- 然而諸如 join、 聚合或去重操作需要在 Flink 抽象的容錯(cuò)存儲(chǔ)內(nèi)保存中間結(jié)果。flink進(jìn)行 join 操作
提供了優(yōu)化窗口和時(shí)段Join聚合 以利用 watermarks概念來讓保持較小的狀態(tài)規(guī)模。
2、動(dòng)態(tài)表
- Flink 如何在++無界數(shù)據(jù)集++上實(shí)現(xiàn)與++數(shù)據(jù)庫引擎在有界數(shù)據(jù)上的處理++具有相同的語義
1) DataStream 上的關(guān)系查詢
| 關(guān)系代數(shù) / SQL | 流處理 |
|---|---|
| 關(guān)系(或表)是有界(多)元組集合。 | 流是一個(gè)無限元組序列。 |
| 對批數(shù)據(jù)(例如關(guān)系數(shù)據(jù)庫中的表)執(zhí)行的查詢可以訪問完整的輸入數(shù)據(jù)。 | 流式查詢在啟動(dòng)時(shí)不能訪問所有數(shù)據(jù),必須“等待”數(shù)據(jù)流入。 |
| 批處理查詢在產(chǎn)生固定大小的結(jié)果后終止。 | 流查詢不斷地根據(jù)接收到的記錄更新其結(jié)果,并且始終不會(huì)結(jié)束。 |
如何使用++關(guān)系查詢++和++sql++處理流計(jì)算呢?-- 2)3)4)
2)動(dòng)態(tài)表 & 連續(xù)查詢(Continuous Query)
動(dòng)態(tài)表 是 Flink 的支持流數(shù)據(jù)的 Table API 和 SQL 的核心概念。與表示批處理數(shù)據(jù)的靜態(tài)表不同,動(dòng)態(tài)表是隨時(shí)間變化的。可以像查詢靜態(tài)批處理表一樣查詢它們。
3)在流上定義表
- 連續(xù)查詢:在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢,并生成一個(gè)新的動(dòng)態(tài)表。連續(xù)查詢從不終止,并根據(jù)其輸入表上的更新更新其結(jié)果表。在任何時(shí)候,連續(xù)查詢的結(jié)果在語義上與以批處理模式在輸入表快照上執(zhí)行的相同查詢的結(jié)果相同。
4)表到流的轉(zhuǎn)換
動(dòng)態(tài)表可以像普通數(shù)據(jù)庫表一樣通過 INSERT、UPDATE 和 DELETE 來不斷修改。它可能是一個(gè)只有一行、不斷更新的表,也可能是一個(gè) insert-only 的表,沒有 UPDATE 和 DELETE 修改,或者介于兩者之間的其他表。
3、時(shí)間屬性
背景:
確定性概念“如果一個(gè)操作在重復(fù)相同的輸入值時(shí)能保證計(jì)算出相同的結(jié)果,那么該操作就是確定性的”。流計(jì)算任務(wù)中的動(dòng)態(tài)函數(shù),會(huì)造成不確定性。
時(shí)間屬性:
Flink 可以基于幾種不同的 時(shí)間 概念來處理數(shù)據(jù)。
- 處理時(shí)間 指的是執(zhí)行具體操作時(shí)的機(jī)器時(shí)間(大家熟知的絕對時(shí)間, 例如 Java的 System.currentTimeMillis()) )
- 事件時(shí)間 指的是數(shù)據(jù)本身攜帶的時(shí)間。這個(gè)時(shí)間是在事件產(chǎn)生時(shí)的時(shí)間。
- 攝入時(shí)間 指的是數(shù)據(jù)進(jìn)入 Flink 的時(shí)間;在系統(tǒng)內(nèi)部,會(huì)把它當(dāng)做事件時(shí)間來處理
1)處理時(shí)間
共有三種方法可以定義處理時(shí)間
- 在創(chuàng)建表的 DDL 中定義:處理時(shí)間屬性可以在創(chuàng)建表的 DDL 中用計(jì)算列的方式定義,用 PROCTIME() 就可以定義處理時(shí)間,函數(shù) PROCTIME() 的返回類型是 TIMESTAMP_LTZ 。
user_action_time AS PROCTIME() -- 聲明一個(gè)額外的列作為處理時(shí)間屬性
- 在 DataStream 到 Table 轉(zhuǎn)換時(shí)定義
處理時(shí)間屬性可以在 schema 定義的時(shí)候用 .proctime 后綴來定義。時(shí)間屬性一定不能定義在一個(gè)已有字段上,所以它只能定義在 schema 定義的最后。 - 使用 TableSource 定義:在實(shí)現(xiàn)了 DefinedProctimeAttribute 的 TableSource 中定義
2)事件時(shí)間
事件時(shí)間允許程序按照數(shù)據(jù)中包含的時(shí)間來處理,這樣可以在有亂序或者晚到的數(shù)據(jù)的情況下產(chǎn)生一致的處理結(jié)果。它可以保證從外部存儲(chǔ)讀取數(shù)據(jù)后產(chǎn)生可以復(fù)現(xiàn)(replayable)的結(jié)果。為了能夠處理亂序的事件,并且區(qū)分正常到達(dá)和晚到的事件,F(xiàn)link 需要從事件中獲取事件時(shí)間并且產(chǎn)生 watermark(watermarks)。 同樣也有3中定義方式
- 在 DDL 中定義:WATERMARK 語句在一個(gè)已有字段上定義一個(gè) watermark 生成表達(dá)式,同時(shí)標(biāo)記這個(gè)已有字段為時(shí)間屬性字段
-- 聲明 user_action_time 是事件時(shí)間屬性,并且用 延遲 5 秒的策略來生成 watermark
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
在 DataStream 到 Table 轉(zhuǎn)換時(shí)定義:事件時(shí)間屬性可以用 .rowtime 后綴在定義 DataStream schema 的時(shí)候來定義(可以是在schema的結(jié)尾追加一個(gè)新字段,也可以替換一個(gè)已經(jīng)存在的字段)
使用 TableSource 定義:在實(shí)現(xiàn)了 DefinedRowTimeAttributes 的 TableSource 中定義。
4、時(shí)態(tài)表
時(shí)態(tài)表(Temporal Table)是一張隨時(shí)間變化的表。
版本表: 如果時(shí)態(tài)表中的記錄可以追蹤和并訪問它的歷史版本,這種表我們稱之為版本表,來自數(shù)據(jù)庫的 changelog 可以定義成版本表。
普通表: 如果時(shí)態(tài)表中的記錄僅僅可以追蹤并和它的最新版本,這種表我們稱之為普通表,來自數(shù)據(jù)庫 或 HBase 的表可以定義成普通表。
聲明版本表
在 Flink 中,定義了主鍵約束和事件時(shí)間屬性的表就是版本表
-- 定義一張版本表
CREATE TABLE product_changelog (
product_id STRING,
product_name STRING,
product_price DECIMAL(10, 4),
update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
PRIMARY KEY(product_id) NOT ENFORCED, -- (1) 定義主鍵約束
WATERMARK FOR update_time AS update_time -- (2) 通過 watermark 定義事件時(shí)間
) WITH (
'connector' = 'kafka',
'topic' = 'products',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'value.format' = 'debezium-json'
);
聲明版本視圖
Flink 也支持定義版本視圖只要一個(gè)視圖包含主鍵和事件時(shí)間便是一個(gè)版本視圖。
CREATE VIEW versioned_rates AS
SELECT currency, rate, currency_time -- (1) `currency_time` 保留了事件時(shí)間
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY currency -- (2) `currency` 是去重 query 的 unique key,可以作為主鍵
ORDER BY currency_time DESC) AS rowNum
FROM RatesHistory ) -- RatesHistory是一張版本表
WHERE rowNum = 1;
聲明普通表
普通表的聲明和 Flink 建表 DDL 一致
-- 用 DDL 定義一張 HBase 表,然后我們可以在 SQL 中將其當(dāng)作一張時(shí)態(tài)表使用
-- 'currency' 列是 HBase 表中的 rowKey
CREATE TABLE LatestRates (
currency STRING,
fam1 ROW<rate DOUBLE>
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'rates',
'zookeeper.quorum' = 'localhost:2181'
);