flink學(xué)習(xí)2 流式概念

流式概念

Flink 的 Table API 和 SQL 是流批統(tǒng)一的 API。 這意味著 Table API & SQL 在無論有限的批式輸入還是無限的流式輸入下,都具有相同的語義。 因?yàn)閭鹘y(tǒng)的關(guān)系代數(shù)以及 SQL 最開始都是為了批式處理而設(shè)計(jì)的, 關(guān)系型查詢在流式場景下不如在批式場景下容易懂。

1、狀態(tài)管理

一個(gè)表程序(Table program)可以配置一個(gè) state backend 和多個(gè)不同的 checkpoint 選項(xiàng) 以處理對不同狀態(tài)大小和容錯(cuò)需求。這可以對正在運(yùn)行的 Table API & SQL 管道(pipeline)生成 savepoint,并在這之后用其恢復(fù)應(yīng)用程序的狀態(tài)。

狀態(tài)使用

  • 由于 Table API & SQL 程序是聲明式的,管道內(nèi)的狀態(tài)會(huì)在哪以及如何被使用并不明確。 Planner 會(huì)確認(rèn)是否需要狀態(tài)來得到正確的計(jì)算結(jié)果, 管道會(huì)被現(xiàn)有優(yōu)化規(guī)則集優(yōu)化成盡可能少地使用狀態(tài)。
  • 形如 SELECT ... FROM ... WHERE 這種只包含字段映射或過濾器的查詢的查詢語句通常是無狀態(tài)的管道。
  • 然而諸如 join、 聚合或去重操作需要在 Flink 抽象的容錯(cuò)存儲(chǔ)內(nèi)保存中間結(jié)果。flink進(jìn)行 join 操作
    提供了優(yōu)化窗口和時(shí)段Join聚合 以利用 watermarks概念來讓保持較小的狀態(tài)規(guī)模。

2、動(dòng)態(tài)表

  • Flink 如何在++無界數(shù)據(jù)集++上實(shí)現(xiàn)與++數(shù)據(jù)庫引擎在有界數(shù)據(jù)上的處理++具有相同的語義

1) DataStream 上的關(guān)系查詢

關(guān)系代數(shù) / SQL 流處理
關(guān)系(或表)是有界(多)元組集合。 流是一個(gè)無限元組序列。
對批數(shù)據(jù)(例如關(guān)系數(shù)據(jù)庫中的表)執(zhí)行的查詢可以訪問完整的輸入數(shù)據(jù)。 流式查詢在啟動(dòng)時(shí)不能訪問所有數(shù)據(jù),必須“等待”數(shù)據(jù)流入。
批處理查詢在產(chǎn)生固定大小的結(jié)果后終止。 流查詢不斷地根據(jù)接收到的記錄更新其結(jié)果,并且始終不會(huì)結(jié)束。

如何使用++關(guān)系查詢++和++sql++處理流計(jì)算呢?-- 2)3)4)

2)動(dòng)態(tài)表 & 連續(xù)查詢(Continuous Query)

動(dòng)態(tài)表 是 Flink 的支持流數(shù)據(jù)的 Table API 和 SQL 的核心概念。與表示批處理數(shù)據(jù)的靜態(tài)表不同,動(dòng)態(tài)表是隨時(shí)間變化的。可以像查詢靜態(tài)批處理表一樣查詢它們。

3)在流上定義表

  • 連續(xù)查詢:在動(dòng)態(tài)表上計(jì)算一個(gè)連續(xù)查詢,并生成一個(gè)新的動(dòng)態(tài)表。連續(xù)查詢從不終止,并根據(jù)其輸入表上的更新更新其結(jié)果表。在任何時(shí)候,連續(xù)查詢的結(jié)果在語義上與以批處理模式在輸入表快照上執(zhí)行的相同查詢的結(jié)果相同。

4)表到流的轉(zhuǎn)換

動(dòng)態(tài)表可以像普通數(shù)據(jù)庫表一樣通過 INSERT、UPDATE 和 DELETE 來不斷修改。它可能是一個(gè)只有一行、不斷更新的表,也可能是一個(gè) insert-only 的表,沒有 UPDATE 和 DELETE 修改,或者介于兩者之間的其他表。

3、時(shí)間屬性

背景:
確定性概念“如果一個(gè)操作在重復(fù)相同的輸入值時(shí)能保證計(jì)算出相同的結(jié)果,那么該操作就是確定性的”。流計(jì)算任務(wù)中的動(dòng)態(tài)函數(shù),會(huì)造成不確定性。
時(shí)間屬性:
Flink 可以基于幾種不同的 時(shí)間 概念來處理數(shù)據(jù)。

  • 處理時(shí)間 指的是執(zhí)行具體操作時(shí)的機(jī)器時(shí)間(大家熟知的絕對時(shí)間, 例如 Java的 System.currentTimeMillis()) )
  • 事件時(shí)間 指的是數(shù)據(jù)本身攜帶的時(shí)間。這個(gè)時(shí)間是在事件產(chǎn)生時(shí)的時(shí)間。
  • 攝入時(shí)間 指的是數(shù)據(jù)進(jìn)入 Flink 的時(shí)間;在系統(tǒng)內(nèi)部,會(huì)把它當(dāng)做事件時(shí)間來處理

1)處理時(shí)間

共有三種方法可以定義處理時(shí)間

  • 在創(chuàng)建表的 DDL 中定義:處理時(shí)間屬性可以在創(chuàng)建表的 DDL 中用計(jì)算列的方式定義,用 PROCTIME() 就可以定義處理時(shí)間,函數(shù) PROCTIME() 的返回類型是 TIMESTAMP_LTZ 。
user_action_time AS PROCTIME() -- 聲明一個(gè)額外的列作為處理時(shí)間屬性
  • 在 DataStream 到 Table 轉(zhuǎn)換時(shí)定義
    處理時(shí)間屬性可以在 schema 定義的時(shí)候用 .proctime 后綴來定義。時(shí)間屬性一定不能定義在一個(gè)已有字段上,所以它只能定義在 schema 定義的最后。
  • 使用 TableSource 定義:在實(shí)現(xiàn)了 DefinedProctimeAttribute 的 TableSource 中定義

2)事件時(shí)間

事件時(shí)間允許程序按照數(shù)據(jù)中包含的時(shí)間來處理,這樣可以在有亂序或者晚到的數(shù)據(jù)的情況下產(chǎn)生一致的處理結(jié)果。它可以保證從外部存儲(chǔ)讀取數(shù)據(jù)后產(chǎn)生可以復(fù)現(xiàn)(replayable)的結(jié)果。為了能夠處理亂序的事件,并且區(qū)分正常到達(dá)和晚到的事件,F(xiàn)link 需要從事件中獲取事件時(shí)間并且產(chǎn)生 watermark(watermarks)。 同樣也有3中定義方式

  • 在 DDL 中定義:WATERMARK 語句在一個(gè)已有字段上定義一個(gè) watermark 生成表達(dá)式,同時(shí)標(biāo)記這個(gè)已有字段為時(shí)間屬性字段
  -- 聲明 user_action_time 是事件時(shí)間屬性,并且用 延遲 5 秒的策略來生成 watermark
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
  • 在 DataStream 到 Table 轉(zhuǎn)換時(shí)定義:事件時(shí)間屬性可以用 .rowtime 后綴在定義 DataStream schema 的時(shí)候來定義(可以是在schema的結(jié)尾追加一個(gè)新字段,也可以替換一個(gè)已經(jīng)存在的字段)

  • 使用 TableSource 定義:在實(shí)現(xiàn)了 DefinedRowTimeAttributes 的 TableSource 中定義。

4、時(shí)態(tài)表

時(shí)態(tài)表(Temporal Table)是一張隨時(shí)間變化的表。

版本表: 如果時(shí)態(tài)表中的記錄可以追蹤和并訪問它的歷史版本,這種表我們稱之為版本表,來自數(shù)據(jù)庫的 changelog 可以定義成版本表。

普通表: 如果時(shí)態(tài)表中的記錄僅僅可以追蹤并和它的最新版本,這種表我們稱之為普通表,來自數(shù)據(jù)庫 或 HBase 的表可以定義成普通表。

聲明版本表

在 Flink 中,定義了主鍵約束事件時(shí)間屬性的表就是版本表

-- 定義一張版本表
CREATE TABLE product_changelog (
  product_id STRING,
  product_name STRING,
  product_price DECIMAL(10, 4),
  update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
  PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) 定義主鍵約束
  WATERMARK FOR update_time AS update_time   -- (2) 通過 watermark 定義事件時(shí)間              
) WITH (
  'connector' = 'kafka',
  'topic' = 'products',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'localhost:9092',
  'value.format' = 'debezium-json'
);

聲明版本視圖

Flink 也支持定義版本視圖只要一個(gè)視圖包含主鍵事件時(shí)間便是一個(gè)版本視圖。

CREATE VIEW versioned_rates AS              
SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了事件時(shí)間
  FROM (
      SELECT *,
      ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 unique key,可以作為主鍵
         ORDER BY currency_time DESC) AS rowNum 
      FROM RatesHistory ) -- RatesHistory是一張版本表
WHERE rowNum = 1; 

聲明普通表

普通表的聲明和 Flink 建表 DDL 一致

-- 用 DDL 定義一張 HBase 表,然后我們可以在 SQL 中將其當(dāng)作一張時(shí)態(tài)表使用
-- 'currency' 列是 HBase 表中的 rowKey
 CREATE TABLE LatestRates (   
     currency STRING,   
     fam1 ROW<rate DOUBLE>   
 ) WITH (   
    'connector' = 'hbase-1.4',   
    'table-name' = 'rates',   
    'zookeeper.quorum' = 'localhost:2181'   
 );
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容