一. Flink 1.9 新特性
總的變更:https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12344601
發(fā)布時(shí)間:2019.8.22
1. Flink Table API/SQL
1.1 Flink SQL DDL 支持
?? 到目前為止,F(xiàn)link SQL 已經(jīng)支持 DML 語句(如 SELECT,INSERT)。但是外部表(table source 和 table sink)必須通過 Java/Scala 代碼或配置文件的方式注冊。1.9 版本中,支持 SQL DDL 語句的方式注冊和刪除表(CREATE TABLE,DROP TABLE)。不過目前還沒有增加流特定的語法擴(kuò)展來定義時(shí)間戳抽取和 watermark 生成策略等。流式的需求也將會在下一版本中完整支持。
flink1.6
?? - 只支持dml(select和insert)
flink1.9
?? - define/alter/delete sink/source table
?? - define/alter/delete view
?? - define/replace/delete 用戶定義的類型type(eg: CREATE [ OR REPLACE ] TYPE name AS fieldType)
?? - 可以加載外部的function來作為udf使用
1.2 新 Blink Table/SQL Planner
1.2.1 現(xiàn)在有兩個(gè)插件化的查詢處理器來執(zhí)行 Table API 和 SQL:1.9 以前的 Flink 處理器和新的基于 Blink 的處理器。兩個(gè)查詢處理器之間的語義和功能大部分是一致的,但未完全對齊。不過,Blink 的查詢處理器尚未完全集成。因此,1.9 之前的 Flink 處理器仍然是 1.9 版本的默認(rèn)處理器,建議用于生產(chǎn)設(shè)置。
1.2.2 基于 Blink 的查詢處理器還提供了更強(qiáng)大的流處理能力,包括一些社區(qū)期待已久的新功能(如維表 Join,TopN,去重)和聚合場景緩解數(shù)據(jù)傾斜的優(yōu)化,以及內(nèi)置更多常用的函數(shù)。
1.2.3 使用限制(非常細(xì)節(jié)的東西),在后面的版本會支持:
??- batch job如果使用blink planner,需要使用TableEnvironment,不能用BatchTableEnvironment。
??- StreamTableSink的實(shí)現(xiàn)類需要實(shí)現(xiàn)consumeDataStream()方法,而不是emitDataStream()方法。
??- 不支持Table.flatAggregate。
??- batch job不支持session/count window。
??- Blink planner只支持新Catalog API,不再支持ExternalCatalog。
1.3 Table API / SQL 的其他改進(jìn)
1.3.1 重構(gòu) Table API / SQL 的類型系統(tǒng)
??Flink 1.9 實(shí)現(xiàn)了一個(gè)新的數(shù)據(jù)類型系統(tǒng),以便從 Table API 中移除對 Flink TypeInformation的依賴,并提高其對 SQL 標(biāo)準(zhǔn)的遵從性,不過還在進(jìn)行中,預(yù)計(jì)將在下一版本1.10完工,并且在 Flink 1.9 中,UDF 尚未移植到新的類型系統(tǒng)上。如果直接升級到1.9,等1.10完善之后,改動還會比較大。由于blink planner和新的數(shù)據(jù)類型系統(tǒng)是同時(shí)進(jìn)行的,新的planner并沒有支持所有的數(shù)據(jù)類型,并且以后新的planner可能對有的數(shù)據(jù)類型不再進(jìn)行支持(風(fēng)險(xiǎn)點(diǎn))。
1.3.2 為 Table API / SQL 的 Java 用戶去除 Scala 依賴
1.3.3 Table API 的多列和多行轉(zhuǎn)換
1.3.4 重構(gòu)和統(tǒng)一 Catalog API
?? 在此之前,通過 Table API 或 SQL 定義的表都無法持久化保存;從 Flink 1.9 起,這些表的元數(shù)據(jù)可以被持久化到 catalog 中。這意味著用戶可以在 Hive Metastore Catalog 中創(chuàng)建 Kafka 表,并在 query 中直接引用該表。
1.3.5 Hive 集成預(yù)覽(beta)
??最近,社區(qū)開始為 Flink Table API 和 SQL 實(shí)現(xiàn)一個(gè)連接到 Hive Metastore 的外部 catalog。在 Flink 1.9 中,用戶能夠查詢和處理存儲在 Hive 中多種格式的數(shù)據(jù)。 Hive 集成還包括支持在 Flink Table API / SQL 中使用 Hive 的 UDF。
??在以前,Table API / SQL 中定義的表一直是臨時(shí)的。新的 catalog 連接器允許在 Metastore 中持久化存儲那些使用 SQL DDL 語句創(chuàng)建的表。這意味著可以直接連接到 Metastore 并注冊一個(gè)表,例如,Kafka topic 的表。從現(xiàn)在開始,只要 catalog 連接到 Metastore,就可以查詢該表。
??請注意 Flink 1.9 中提供的 Hive 支持目前還是實(shí)驗(yàn)性的,下一個(gè)版本中將穩(wěn)定這些功能。
1.3.6 為hbase增加Upsert table sink factory
1.3.7 為Table API增加map、flatmap、aggregate算子
2. Runtime & core
2.1 細(xì)粒度批作業(yè)恢復(fù) (FLIP-1)
批作業(yè)(DataSet、Table API 和 SQL)從 task 失敗中恢復(fù)的時(shí)間被顯著縮短。在 Flink 1.9 之前,批處理作業(yè)中的 task 失敗是通過取消所有 task 并重新啟動整個(gè)作業(yè)來恢復(fù)的,即作業(yè)從頭開始,所有進(jìn)度都會廢棄。在 1.9 版本中,F(xiàn)link 將中間結(jié)果保留在網(wǎng)絡(luò) shuffle 的邊緣,并使用這些數(shù)據(jù)恢復(fù)僅受故障影響的 tasks,即處在同一個(gè) failover region (故障區(qū))的 tasks。
“Region” 的故障策略也能同時(shí)提升 “embarrassingly parallel” 類型的流作業(yè)恢復(fù)速度,也就是沒有任何像 keyBy、rebalance 等 shuffle 的作業(yè)。當(dāng)這種作業(yè)在恢復(fù)時(shí),只有受影響的故障區(qū) task 需要重啟。對于其他類型的流作業(yè),故障恢復(fù)行為與之前的版本一樣。
2.2 Stop-with-Savepoint,https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=103090212
2.2.1 Rest API中,增加SUSPEND/TERMINATE命令。CheckpointType中增加了一種SYNC_SAVEPOINT類型,在suspending/terminating 作業(yè)的時(shí)候,savepoint采用同步的方式來制作。
2.2.2 “Cancel-with-savepoint”是停止、重啟或升級 Flink 作業(yè)的一個(gè)常用操作。然而,當(dāng)前的實(shí)現(xiàn)并沒有保證輸出到 exactly-once sink 的外部存儲的數(shù)據(jù)持久化。為了改進(jìn)停止作業(yè)時(shí)的端到端語義,F(xiàn)link 1.9 引入了一種新的 SUSPEND 模式,可以帶 savepoint 停止作業(yè),保證了輸出數(shù)據(jù)的一致性??梢允褂?Flink CLI 來 suspend 一個(gè)作業(yè):bin/flink stop -p [:targetSavepointDirectory] :jobId。最終作業(yè)的狀態(tài)會在成功時(shí)設(shè)置成 FINISHED 狀態(tài),方便用戶區(qū)別操作是否失敗。
備注:原來是cancel命令可以指定做不做save point,現(xiàn)在cancel/stop命令中都可以指定做不做save point,只是stop的時(shí)候會先掛起job。
2.3 State Processing API, 從1.9開始,F(xiàn)link state可以對外暴露,在外部可以使用client來查詢(beta)。https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html
2.4 在SQL-Client 的yaml配置文件中支持外部catalog。
2.5 source增加對 Parquet file的支持.
2.6 重構(gòu) Flink WebUI。重新設(shè)計(jì)的 UI 是 1.9.0 的默認(rèn)版本,不過仍保留了切換到舊版 WebUI 的按鈕。
二. Flink 1.10 新特性
- flink1.10新特性
發(fā)布時(shí)間:2020.2.11
1.1 內(nèi)存管理和配置優(yōu)化
原來的問題
?? 原來TaskExecutor內(nèi)存配置有缺點(diǎn),難做資源利用優(yōu)化,比如:
???? - 批和流的配置模型不同;
???? - 流計(jì)算場景,比如RocksDB這種不是基于堆內(nèi)存的state backend,需要用戶進(jìn)行復(fù)雜的配置
優(yōu)化點(diǎn)
?? 1. 管理的內(nèi)存拓展
???? - 批作業(yè)仍然可以繼續(xù)使用heap和off-heap內(nèi)存。
???? - 流作業(yè)如果使用RocksDBStateBackend,只能使用off-heap內(nèi)存
???? - 為了批流集群配置統(tǒng)一,管理內(nèi)存只能使用off-heap
?? 2. 簡化RocksDB配置
???? - 配置RocksDB原來需要手動調(diào)試,比如減小JVM堆大小、設(shè)置Flink使用的堆外內(nèi)存。現(xiàn)在Flink的開箱配置即可完成,且只需要簡單地調(diào)整managed內(nèi)存的大小即可調(diào)整RocksDBStateBackend的內(nèi)存預(yù)算。
1.2 job提交的邏輯統(tǒng)一
?? 原來提交作業(yè)是由環(huán)境變量負(fù)責(zé)的,并且和部署方式有關(guān)(比如:Yarn,Kubernetes, Mesos)。
?? 在flink1.10,作業(yè)提交邏輯抽象出Executor接口,新增加的ExecutorCLI為任意一個(gè)執(zhí)行目標(biāo)提供了統(tǒng)一的配置參數(shù)。另外,引入JobClient來獲取JobExecutionResult,這樣結(jié)果獲取和作業(yè)的提交邏輯解耦。
1.3 原生Kubernetes集成(Beta)
1.4 Table API/SQL:hive集成已經(jīng)可以在生產(chǎn)環(huán)境使用
?? Flink 1.9 推出了beta Hive 集成。
?? 1. Batch SQL 原生分區(qū)支持
?? 2. flink1.10 hive集成還引入許多數(shù)據(jù)讀取方面的優(yōu)化
???? - 投影下推
???? - limit下推
???? - 讀取數(shù)據(jù)時(shí)orc向量化
?? 3. SQL DDL中支持watermark 和計(jì)算列
?? 4. 其他DDL的拓展
???? - UDF
?????? CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
三. Flink 1.11 新特性
- flink1.11新特性,該部分內(nèi)容轉(zhuǎn)自 Flink 1.11.0 發(fā)布,有哪些值得關(guān)注的新特性?
發(fā)布時(shí)間:2020.7.7
改進(jìn)一: 生態(tài)完善和易用性提升
1 Table & SQL 支持 Change Data Capture(CDC)
CDC 被廣泛使用在復(fù)制數(shù)據(jù)、更新緩存、微服務(wù)間同步數(shù)據(jù)、審計(jì)日志等場景,很多公司都在使用開源的 CDC 工具,如 MySQL CDC。通過 Flink 支持在 Table & SQL 中接入和解析 CDC 是一個(gè)強(qiáng)需求,在過往的很多討論中都被提及過,可以幫助用戶以實(shí)時(shí)的方式處理 changelog 流,進(jìn)一步擴(kuò)展 Flink 的應(yīng)用場景,例如把 MySQL 中的數(shù)據(jù)同步到 PG 或 ElasticSearch 中,低延時(shí)的 temporal join 一個(gè) changelog 等。
除了考慮到上面的真實(shí)需求,F(xiàn)link 中定義的“Dynamic Table”概念在流上有兩種模型:append 模式和 update 模式。通過 append 模式把流轉(zhuǎn)化為“Dynamic Table”在之前的版本中已經(jīng)支持,因此在 1.11.0 中進(jìn)一步支持 update 模式也從概念層面完整的實(shí)現(xiàn)了“Dynamic Table”。

為了支持解析和輸出 changelog,如何在外部系統(tǒng)和 Flink 系統(tǒng)之間編解碼這些更新操作是首要解決的問題。考慮到 source 和 sink 是銜接外部系統(tǒng)的一個(gè)橋梁,因此 FLIP-95 在定義全新的 Table source 和 Table sink 接口時(shí)解決了這個(gè)問題。
在公開的 CDC 調(diào)研報(bào)告中,Debezium 和 Canal 是用戶中最流行使用的 CDC 工具,這兩種工具用來同步 changelog 到其它的系統(tǒng)中,如消息隊(duì)列。據(jù)此,F(xiàn)LIP-105 首先支持了 Debezium 和 Canal 這兩種格式,而且 Kafka source 也已經(jīng)可以支持解析上述格式并輸出更新事件,在后續(xù)的版本中會進(jìn)一步支持 Avro(Debezium) 和 Protobuf(Canal)。
CREATE TABLE my_table (
...) WITH (
'connector'='...', -- e.g. 'kafka'
'format'='debezium-json',
'debezium-json.schema-include'='true' -- default: false (Debezium can be configured to include or exclude the message schema)
'debezium-json.ignore-parse-errors'='true' -- default: false
);
2 Table & SQL 支持 JDBC Catalog
1.11.0 之前,用戶如果依賴 Flink 的 source/sink 讀寫關(guān)系型數(shù)據(jù)庫或讀取 changelog 時(shí),必須要手動創(chuàng)建對應(yīng)的 schema。而且當(dāng)數(shù)據(jù)庫中的 schema 發(fā)生變化時(shí),也需要手動更新對應(yīng)的 Flink 作業(yè)以保持一致和類型匹配,任何不匹配都會造成運(yùn)行時(shí)報(bào)錯(cuò)使作業(yè)失敗。用戶經(jīng)常抱怨這個(gè)看似冗余且繁瑣的流程,體驗(yàn)極差。
實(shí)際上對于任何和 Flink 連接的外部系統(tǒng)都可能有類似的上述問題,在 1.11.0 中重點(diǎn)解決了和關(guān)系型數(shù)據(jù)庫對接的這個(gè)問題。FLIP-93 提供了 JDBC catalog 的基礎(chǔ)接口以及 Postgres catalog 的實(shí)現(xiàn),這樣方便后續(xù)實(shí)現(xiàn)與其它類型的關(guān)系型數(shù)據(jù)庫的對接。
1.11.0 版本后,用戶使用 Flink SQL 時(shí)可以自動獲取表的 schema 而不再需要輸入 DDL。除此之外,任何 schema 不匹配的錯(cuò)誤都會在編譯階段提前進(jìn)行檢查報(bào)錯(cuò),避免了之前運(yùn)行時(shí)報(bào)錯(cuò)造成的作業(yè)失敗。這是提升易用性和用戶體驗(yàn)的一個(gè)典型例子。
3 Hive 實(shí)時(shí)數(shù)倉
從 1.9.0 版本開始 Flink 從生態(tài)角度致力于集成 Hive,目標(biāo)打造批流一體的 Hive 數(shù)倉。經(jīng)過前兩個(gè)版本的迭代,已經(jīng)達(dá)到了 batch 兼容且生產(chǎn)可用,在 TPC-DS 10T benchmark 下性能達(dá)到 Hive 3.0 的 7 倍以上。
1.11.0 在 Hive 生態(tài)中重點(diǎn)實(shí)現(xiàn)了實(shí)時(shí)數(shù)倉方案,改善了端到端流式 ETL 的用戶體驗(yàn),達(dá)到了批流一體 Hive 數(shù)倉的目標(biāo)。同時(shí)在兼容性、性能、易用性方面也進(jìn)一步進(jìn)行了加強(qiáng)。
在實(shí)時(shí)數(shù)倉的解決方案中,憑借 Flink 的流式處理優(yōu)勢做到實(shí)時(shí)讀寫 Hive:
- Hive 寫入:FLIP-115 完善擴(kuò)展了 FileSystem connector 的基礎(chǔ)能力和實(shí)現(xiàn),Table/SQL 層的 sink 可以支持各種格式(CSV、Json、Avro、Parquet、ORC),而且支持 Hive table 的所有格式。
- Partition 支持:數(shù)據(jù)導(dǎo)入 Hive 引入 partition 提交機(jī)制來控制可見性,通過sink.partition-commit.trigger 控制 partition 提交的時(shí)機(jī),通過 sink.partition-commit.policy.kind 選擇提交策略,支持 SUCCESS 文件和 metastore 提交。
- Hive 讀?。簩?shí)時(shí)化的流式讀取 Hive,通過監(jiān)控 partition 生成增量讀取新 partition,或者監(jiān)控文件夾內(nèi)新文件生成來增量讀取新文件。
在 Hive 可用性方面的提升:
- FLIP-123 通過 Hive Dialect 為用戶提供語法兼容,這樣用戶無需在 Flink 和 Hive 的 CLI 之間切換,可以直接遷移 Hive 腳本到 Flink 中執(zhí)行。
- 提供 Hive 相關(guān)依賴的內(nèi)置支持,避免用戶自己下載所需的相關(guān)依賴?,F(xiàn)在只需要單獨(dú)下載一個(gè)包,配置 HADOOP_CLASSPATH 就可以運(yùn)行。
- 在 Hive 性能方面,1.10.0 中已經(jīng)支持了 ORC(Hive 2+)的向量化讀取,1.11.0 中我們補(bǔ)全了所有版本的 Parquet 和 ORC 向量化支持來提升性能。
3 全新 Source API
前面也提到過,source 和 sink 是 Flink 對接外部系統(tǒng)的一個(gè)橋梁,對于完善生態(tài)、可用性及端到端的用戶體驗(yàn)是很重要的環(huán)節(jié)。社區(qū)早在一年前就已經(jīng)規(guī)劃了 source 端的徹底重構(gòu),從 FLIP-27 的 ID 就可以看出是很早的一個(gè) feature。但是由于涉及到很多復(fù)雜的內(nèi)部機(jī)制和考慮到各種 source connector 的實(shí)現(xiàn),設(shè)計(jì)上需要考慮的很全面。從 1.10.0 就開始做 POC 的實(shí)現(xiàn),最終趕上了 1.11.0 版本的發(fā)布。
先簡要回顧下 source 之前的主要問題:
- 對用戶而言,在 Flink 中改造已有的 source 或者重新實(shí)現(xiàn)一個(gè)生產(chǎn)級的 source connector 不是一件容易的事情,具體體現(xiàn)在沒有公共的代碼可以復(fù)用,而且需要理解很多 Flink 內(nèi)部細(xì)節(jié)以及實(shí)現(xiàn)具體的 event time 分配、watermark 產(chǎn)出、idleness 監(jiān)測、線程模型等。
- 批和流的場景需要實(shí)現(xiàn)不同的 source。
- partitions/splits/shards 概念在接口中沒有顯式表達(dá),比如 split 的發(fā)現(xiàn)邏輯和數(shù)據(jù)消費(fèi)都耦合在 source function 的實(shí)現(xiàn)中,這樣在實(shí)現(xiàn) Kafka 或 Kinesis 類型的 source 時(shí)增加了復(fù)雜性。
- 在 runtime 執(zhí)行層,checkpoint 鎖被 source function 搶占會帶來一系列問題,框架很難進(jìn)行優(yōu)化。
FLIP-27 在設(shè)計(jì)時(shí)充分考慮了上述的痛點(diǎn):

- 首先在 Job Manager 和 Task Manager 中分別引入兩種不同的組件 Split Enumerator 和 Source reader,解耦 split 發(fā)現(xiàn)和對應(yīng)的消費(fèi)處理,同時(shí)方便隨意組合不同的策略。比如現(xiàn)有的 Kafka connector 中有多種不同的 partition 發(fā)現(xiàn)策略和實(shí)現(xiàn)耦合在一起,在新的架構(gòu)下,我們只需要實(shí)現(xiàn)一種 source reader,就可以適配多種 split enumerator 的實(shí)現(xiàn)來對應(yīng)不同的 partition 發(fā)現(xiàn)策略。
- 在新架構(gòu)下實(shí)現(xiàn)的 source connector 可以做到批流統(tǒng)一,唯一的小區(qū)別是對批場景的有限輸入,split enumerator 會產(chǎn)出固定數(shù)量的 split 集合并且每個(gè) split 都是有限數(shù)據(jù)集;對于流場景的無限輸入,split enumerator 要么產(chǎn)出無限多的 split 或者 split 自身是無限數(shù)據(jù)集。
- 復(fù)雜的 timestamp assigner 以及 watermark generator 透明的內(nèi)置在 source reader 模塊內(nèi)運(yùn)行,對用戶來說是無感知的。這樣用戶如果想實(shí)現(xiàn)新的 source connector,一般不再需要重復(fù)實(shí)現(xiàn)這部分功能。
目前 Flink 已有的 source connector 會在后續(xù)的版本中基于新架構(gòu)來重新實(shí)現(xiàn),legacy source 也會繼續(xù)維護(hù)幾個(gè)版本保持兼容性,用戶也可以按照 release 文檔中的說明來嘗試體驗(yàn)新 source 的開發(fā)。
4 PyFlink 生態(tài)
眾所周知,Python 語言在機(jī)器學(xué)習(xí)和數(shù)據(jù)分析領(lǐng)域有著廣泛的使用。Flink 從 1.9.0 版本開始發(fā)力兼容 Python 生態(tài),Python 和 Flink 合力為 PyFlink,把 Flink 的實(shí)時(shí)分布式處理能力輸出給 Python 用戶。前兩個(gè)版本 PyFlink 已經(jīng)支持了 Python Table API 和 UDF,在 1.11.0 中擴(kuò)大對 Python 生態(tài)庫 Pandas 的支持以及和 SQL DDL/Client 的集成,同時(shí) Python UDF 性能有了極大的提升。
具體來說,之前普通的 Python UDF 每次調(diào)用只能處理一條數(shù)據(jù),而且在 Java 端和 Python 端都需要序列化/反序列化,開銷很大。1.11.0 中 Flink 支持在 Table & SQL 作業(yè)中自定義和使用向量化 Python UDF,用戶只需要在 UDF 修飾中額外增加一個(gè)參數(shù) udf_type=“pandas” 即可。這樣帶來的好處是:
- 每次調(diào)用可以處理 N 條數(shù)據(jù)。
- 數(shù)據(jù)格式基于 Apache Arrow,大大降低了 Java、Python 進(jìn)程之間的序列化/反序列化開銷。
- 方便 Python 用戶基于 Numpy 和 Pandas 等數(shù)據(jù)分析領(lǐng)域常用的 Python 庫,開發(fā)高性能的 Python UDF。
除此之外,1.11.0 中 PyFlink 還支持:
- PyFlink table 和 Pandas DataFrame 之間無縫切換(FLIP-120),增強(qiáng) Pandas 生態(tài)的易用性和兼容性。
- Table & SQL 中可以定義和使用 Python UDTF(FLINK-14500),不再必需 Java/Scala UDTF。
- Cython 優(yōu)化 Python UDF 的性能(FLIP-121),對比 1.10.0 可以提升 30 倍。
- Python UDF 中用戶自定義 metric(FLIP-112),方便監(jiān)控和調(diào)試 UDF 的執(zhí)行。
上述解讀的都是側(cè)重 API 層面,用戶開發(fā)作業(yè)可以直接感知到的易用性的提升。下面我們看看執(zhí)行引擎層在 1.11.0 中都有哪些值得關(guān)注的變化。
改進(jìn)二. 生產(chǎn)可用性和穩(wěn)定性提升
1 支持 application 模式和 Kubernetes 增強(qiáng)
1.11.0 版本前,F(xiàn)link 主要支持如下兩種模式運(yùn)行:
- Session 模式:提前啟動一個(gè)集群,所有作業(yè)都共享這個(gè)集群的資源運(yùn)行。優(yōu)勢是避免每個(gè)作業(yè)單獨(dú)啟動集群帶來的額外開銷,缺點(diǎn)是隔離性稍差。如果一個(gè)作業(yè)把某個(gè) Task Manager(TM)容器搞掛,會導(dǎo)致這個(gè)容器內(nèi)的所有作業(yè)都跟著重啟。雖然每個(gè)作業(yè)有自己獨(dú)立的 Job Manager(JM)來管理,但是這些 JM 都運(yùn)行在一個(gè)進(jìn)程中,容易帶來負(fù)載上的瓶頸。
- Per-job 模式:為了解決 session 模式隔離性差的問題,每個(gè)作業(yè)根據(jù)資源需求啟動獨(dú)立的集群,每個(gè)作業(yè)的 JM 也是運(yùn)行在獨(dú)立的進(jìn)程中,負(fù)載相對小很多。
以上兩種模式的共同問題是需要在客戶端執(zhí)行用戶代碼,編譯生成對應(yīng)的 Job Graph 提交到集群運(yùn)行。在這個(gè)過程需要下載相關(guān) jar 包并上傳到集群,客戶端和網(wǎng)絡(luò)負(fù)載壓力容易成為瓶頸,尤其當(dāng)一個(gè)客戶端被多個(gè)用戶共享使用。
1.11.0 中引入了 application 模式(FLIP-85)來解決上述問題,按照 application 粒度來啟動一個(gè)集群,屬于這個(gè) application 的所有 job 在這個(gè)集群中運(yùn)行。核心是 Job Graph 的生成以及作業(yè)的提交不在客戶端執(zhí)行,而是轉(zhuǎn)移到 JM 端執(zhí)行,這樣網(wǎng)絡(luò)下載上傳的負(fù)載也會分散到集群中,不再有上述 client 單點(diǎn)上的瓶頸。
用戶可以通過 bin/flink run-application 來使用 application 模式,目前 Yarn 和 Kubernetes(K8s)都已經(jīng)支持這種模式。Yarn application 會在客戶端將運(yùn)行作業(yè)需要的依賴都通過 Yarn Local Resource 傳遞到 JM。K8s application 允許用戶構(gòu)建包含用戶 jar 與依賴的鏡像,同時(shí)會根據(jù)作業(yè)自動創(chuàng)建 TM,并在結(jié)束后銷毀整個(gè)集群,相比 session 模式具有更好的隔離性。K8s 不再有嚴(yán)格意義上的 per-job 模式,application 模式相當(dāng)于 per-job 在集群進(jìn)行提交作業(yè)的實(shí)現(xiàn)。
除了支持 application 模式,F(xiàn)link 原生 K8s 在 1.11.0 中還完善了很多基礎(chǔ)的功能特性(FLINK-14460),以達(dá)到生產(chǎn)可用性的標(biāo)準(zhǔn)。例如 Node Selector、Label、Annotation、Toleration 等。為了更方便的與 Hadoop 集成,也支持根據(jù)環(huán)境變量自動掛載 Hadoop 配置的功能。
2 Checkpoint & Savepoint 優(yōu)化
checkpoint 和 savepoint 機(jī)制一直是 Flink 保持先進(jìn)性的核心競爭力之一,社區(qū)在這個(gè)領(lǐng)域的改動很謹(jǐn)慎,最近的幾個(gè)大版本中幾乎沒有大的功能和架構(gòu)上的調(diào)整。在用戶郵件列表中,我們經(jīng)常能看到用戶反饋和抱怨的相關(guān)問題:比如 checkpoint 長時(shí)間做不出來失敗,savepoint 在作業(yè)重啟后不可用等等。1.11.0 有選擇的解決了一些這方面的常見問題,提高生產(chǎn)可用性和穩(wěn)定性。
1.11.0 之前, savepoint 中 meta 數(shù)據(jù)和 state 數(shù)據(jù)分別保存在兩個(gè)不同的目錄中,這樣如果想遷移 state 目錄很難識別這種映射關(guān)系,也可能導(dǎo)致目錄被誤刪除,對于目錄清理也同樣有麻煩。1.11.0 把兩部分?jǐn)?shù)據(jù)整合到一個(gè)目錄下,這樣方便整體轉(zhuǎn)移和復(fù)用。另外,之前 meta 引用 state 采用的是絕對路徑,這樣 state 目錄遷移后路徑發(fā)生變化也不可用,1.11.0 把 state 引用改成了相對路徑解決了這個(gè)問題(FLINK-5763),這樣 savepoint 的管理維護(hù)、復(fù)用更加靈活方便。
實(shí)際生產(chǎn)環(huán)境中,用戶經(jīng)常遭遇 checkpoint 超時(shí)失敗、長時(shí)間不能完成帶來的困擾。一旦作業(yè) failover 會造成回放大量的歷史數(shù)據(jù),作業(yè)長時(shí)間沒有進(jìn)度,端到端的延遲增加。1.11.0 從不同維度對 checkpoint 的優(yōu)化和提速做了改進(jìn),目標(biāo)實(shí)現(xiàn)分鐘甚至秒級的輕量型 checkpoint。
首先,增加了 Checkpoint Coordinator 通知 task 取消 checkpoint 的機(jī)制(FLINK-8871),這樣避免 task 端還在執(zhí)行已經(jīng)取消的 checkpoint 而對系統(tǒng)帶來不必要的壓力。同時(shí) task 端放棄已經(jīng)取消的 checkpoint,可以更快的參與執(zhí)行 coordinator 新觸發(fā)的 checkpoint,某種程度上也可以避免新 checkpoint 再次執(zhí)行超時(shí)而失敗。這個(gè)優(yōu)化也對后面默認(rèn)開啟 local recovery 提供了便利,task 端可以及時(shí)清理失效 checkpoint 的資源。
其次,在反壓場景下,整個(gè)數(shù)據(jù)鏈路堆積了大量 buffer,導(dǎo)致 checkpoint barrier 排在數(shù)據(jù) buffer 后面,不能被 task 及時(shí)處理對齊,也就導(dǎo)致了 checkpoint 長時(shí)間不能執(zhí)行。1.11.0 中從兩個(gè)維度對這個(gè)問題進(jìn)行解決:
1)嘗試減少數(shù)據(jù)鏈路中的 buffer 總量(FLINK-16428),這樣 checkpoint barrier 可以盡快被處理對齊。
- 上游輸出端控制單個(gè) sub partition 堆積 buffer 的最大閾值(backlog),避免負(fù)載不均場景下單個(gè)鏈路上堆積大量 buffer。
- 在不影響網(wǎng)絡(luò)吞吐性能的情況下合理修改上下游默認(rèn)的 buffer 配置。
- 上下游數(shù)據(jù)傳輸?shù)幕A(chǔ)協(xié)議進(jìn)行了調(diào)整,允許單個(gè)數(shù)據(jù)鏈路可以配置 0 個(gè)獨(dú)占 buffer 而不死鎖,這樣總的 buffer 數(shù)量和作業(yè)并發(fā)規(guī)模解耦。根據(jù)實(shí)際需求在吞吐性能和 checkpoint 速度兩者之間權(quán)衡,自定義 buffer 配比。
這個(gè)優(yōu)化有一部分工作已經(jīng)在 1.11.0 中完成,剩余部分會在下個(gè)版本繼續(xù)推進(jìn)完成。
2)實(shí)現(xiàn)了全新的 unaligned checkpoint 機(jī)制(FLIP-76)從根本上解決了反壓場景下 checkpoint barrier 對齊的問題。實(shí)際上這個(gè)想法早在 1.10.0 版本之前就開始醞釀設(shè)計(jì),由于涉及到很多模塊的大改動,實(shí)現(xiàn)機(jī)制和線程模型也很復(fù)雜。我們實(shí)現(xiàn)了兩種不同方案的原型 POC 進(jìn)行了測試、性能對比,確定了最終的方案,因此直到 1.11.0 才完成了 MVP 版本,這也是 1.11.0 中執(zhí)行引擎層唯一的一個(gè)重量級 feature。其基本思想可以概括為:
- Checkpoint barrier 跨數(shù)據(jù) buffer 傳輸,不在輸入輸出隊(duì)列排隊(duì)等待處理,這樣就和算子的計(jì)算能力解耦,barrier 在節(jié)點(diǎn)之間的傳輸只有網(wǎng)絡(luò)延時(shí),可以忽略不計(jì)。
- 每個(gè)算子多個(gè)輸入鏈路之間不需要等待 barrier 對齊來執(zhí)行 checkpoint,第一個(gè)到的 barrier 就可以提前觸發(fā) checkpoint,這樣可以進(jìn)一步提速 checkpoint,不會因?yàn)閭€(gè)別鏈路的延遲而影響整體。
- 為了和之前 aligned checkpoint 的語義保持一致,所有未被處理的輸入輸出數(shù)據(jù) buffer 都將作為 channel state 在 checkpoint 執(zhí)行時(shí)進(jìn)行快照持久化,在 failover 時(shí)連同 operator state 一同進(jìn)行恢復(fù)。換句話說,aligned 機(jī)制保證的是 barrier 前面所有數(shù)據(jù)必須被處理完,狀態(tài)實(shí)時(shí)體現(xiàn)到 operator state 中;而 unaligned 機(jī)制把 barrier 前面的未處理數(shù)據(jù)所反映的 operator state 延后到 failover restart 時(shí)通過 channel state 回放進(jìn)行體現(xiàn),從狀態(tài)恢復(fù)的角度來說最終都是一致的。注意這里雖然引入了額外的 in-flight buffer 的持久化,但是這個(gè)過程實(shí)際是在 checkpoint 的異步階段完成的,同步階段只是進(jìn)行了輕量級的 buffer 引用,所以不會過多占用算子的計(jì)算時(shí)間而影響吞吐性能。

Unaligned checkpoint 在反壓嚴(yán)重的場景下可以明顯加速 checkpoint 的完成時(shí)間,因?yàn)樗辉僖蕾囉谡w的計(jì)算吞吐能力,而和系統(tǒng)的存儲性能更加相關(guān),相當(dāng)于計(jì)算和存儲的解耦。但是它的使用也有一定的局限性,它會增加整體 state 的大小,對存儲 IO 帶來額外的開銷,因此在 IO 已經(jīng)是瓶頸的場景下就不太適合使用 unaligned checkpoint 機(jī)制。
1.11.0 中 unaligned checkpoint 還沒有作為默認(rèn)模式,需要用戶手動配置來開啟,并且只在 exactly-once 模式下生效。但目前還不支持 savepoint 模式,因?yàn)?savepoint 涉及到作業(yè)的 rescale 場景,channel state 目前還不支持 state 拆分,在后面的版本會進(jìn)一步支持,所以 savepoint 目前還是會使用之前的 aligned 模式,在反壓場景下有可能需要很長時(shí)間才能完成。
四. Flink 1.12 新特性
- flink1.12新特性,該部分內(nèi)容轉(zhuǎn)自 官宣 | Apache Flink 1.12.0 正式發(fā)布,流批一體真正統(tǒng)一運(yùn)行!
發(fā)布時(shí)間:2020.12.10
DataStream API 支持批執(zhí)行模式
Flink 的核心 API 最初是針對特定的場景設(shè)計(jì)的,盡管 Table API / SQL 針對流處理和批處理已經(jīng)實(shí)現(xiàn)了統(tǒng)一的 API,但當(dāng)用戶使用較底層的 API 時(shí),仍然需要在批處理(DataSet API)和流處理(DataStream API)這兩種不同的 API 之間進(jìn)行選擇。鑒于批處理是流處理的一種特例,將這兩種 API 合并成統(tǒng)一的 API,有一些非常明顯的好處,比如:
· 可復(fù)用性:作業(yè)可以在流和批這兩種執(zhí)行模式之間自由地切換,而無需重寫任何代碼。因此,用戶可以復(fù)用同一個(gè)作業(yè),來處理實(shí)時(shí)數(shù)據(jù)和歷史數(shù)據(jù)。
· 維護(hù)簡單:統(tǒng)一的 API 意味著流和批可以共用同一組 connector,維護(hù)同一套代碼,并能夠輕松地實(shí)現(xiàn)流批混合執(zhí)行,例如 backfilling 之類的場景。
考慮到這些優(yōu)點(diǎn),社區(qū)已朝著流批統(tǒng)一的 DataStream API 邁出了第一步:支持高效的批處理(FLIP-134)。從長遠(yuǎn)來看,這意味著 DataSet API 將被棄用(FLIP-131),其功能將被包含在 DataStream API 和 Table API / SQL 中。
■ 有限流上的批處理
您已經(jīng)可以使用 DataStream API 來處理有限流(例如文件)了,但需要注意的是,運(yùn)行時(shí)并不“知道”作業(yè)的輸入是有限的。為了優(yōu)化在有限流情況下運(yùn)行時(shí)的執(zhí)行性能,新的 BATCH 執(zhí)行模式,對于聚合操作,全部在內(nèi)存中進(jìn)行,且使用 sort-based shuffle(FLIP-140)和優(yōu)化過的調(diào)度策略(請參見 Pipelined Region Scheduling 了解更多詳細(xì)信息)。因此,DataStream API 中的 BATCH 執(zhí)行模式已經(jīng)非常接近 Flink 1.12 中 DataSet API 的性能。有關(guān)性能的更多詳細(xì)信息,請查看 FLIP-140。

在 Flink 1.12 中,默認(rèn)執(zhí)行模式為 STREAMING,要將作業(yè)配置為以 BATCH 模式運(yùn)行,可以在提交作業(yè)的時(shí)候,設(shè)置參數(shù) execution.runtime-mode:
$ bin/flink run -Dexecution.runtime-mode=BATCH examples/streaming/WordCount.jar
或者通過編程的方式:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeMode.BATCH);
注意:盡管 DataSet API 尚未被棄用,但我們建議用戶優(yōu)先使用具有 BATCH 執(zhí)行模式的 DataStream API 來開發(fā)新的批作業(yè),并考慮遷移現(xiàn)有的 DataSet 作業(yè)。
新的 Data Sink API (Beta)
之前發(fā)布的 Flink 版本中[1],已經(jīng)支持了 source connector 工作在流批兩種模式下,因此在 Flink 1.12 中,社區(qū)著重實(shí)現(xiàn)了統(tǒng)一的 Data Sink API(FLIP-143)。新的抽象引入了 write/commit 協(xié)議和一個(gè)更加模塊化的接口。Sink 的實(shí)現(xiàn)者只需要定義 what 和 how:SinkWriter,用于寫數(shù)據(jù),并輸出需要 commit 的內(nèi)容(例如,committables);Committer 和 GlobalCommitter,封裝了如何處理 committables。框架會負(fù)責(zé) when 和 where:即在什么時(shí)間,以及在哪些機(jī)器或進(jìn)程中 commit。

這種模塊化的抽象允許為 BATCH 和 STREAMING 兩種執(zhí)行模式,實(shí)現(xiàn)不同的運(yùn)行時(shí)策略,以達(dá)到僅使用一種 sink 實(shí)現(xiàn),也可以使兩種模式都可以高效執(zhí)行。Flink 1.12 中,提供了統(tǒng)一的 FileSink connector,以替換現(xiàn)有的 StreamingFileSink connector (FLINK-19758)。其它的 connector 也將逐步遷移到新的接口。
基于 Kubernetes 的高可用 (HA) 方案
Flink 可以利用 Kubernetes 提供的內(nèi)置功能來實(shí)現(xiàn) JobManager 的 failover,而不用依賴 ZooKeeper。為了實(shí)現(xiàn)不依賴于 ZooKeeper 的高可用方案,社區(qū)在 Flink 1.12(FLIP-144)中實(shí)現(xiàn)了基于 Kubernetes 的高可用方案。該方案與 ZooKeeper 方案基于相同的接口[3],并使用 Kubernetes 的 ConfigMap[4] 對象來處理從 JobManager 的故障中恢復(fù)所需的所有元數(shù)據(jù)。關(guān)于如何配置高可用的 standalone 或原生 Kubernetes 集群的更多詳細(xì)信息和示例,請查閱文檔[5]。
注意:需要注意的是,這并不意味著 ZooKeeper 將被刪除,這只是為 Kubernetes 上的 Flink 用戶提供了另外一種選擇。
其它功能改進(jìn)
■ 將現(xiàn)有的 connector 遷移到新的 Data Source API
在之前的版本中,F(xiàn)link 引入了新的 Data Source API(FLIP-27),以允許實(shí)現(xiàn)同時(shí)適用于有限數(shù)據(jù)(批)作業(yè)和無限數(shù)據(jù)(流)作業(yè)使用的 connector 。在 Flink 1.12 中,社區(qū)從 FileSystem connector(FLINK-19161)出發(fā),開始將現(xiàn)有的 source connector 移植到新的接口。
注意: 新的 source 實(shí)現(xiàn),是完全不同的實(shí)現(xiàn),與舊版本的實(shí)現(xiàn)不兼容。
■ Pipelined Region 調(diào)度 (FLIP-119)
在之前的版本中,F(xiàn)link 對于批作業(yè)和流作業(yè)有兩套獨(dú)立的調(diào)度策略。Flink 1.12 版本中,引入了統(tǒng)一的調(diào)度策略, 該策略通過識別 blocking 數(shù)據(jù)傳輸邊,將 ExecutionGraph 分解為多個(gè) pipelined region。這樣一來,對于一個(gè) pipelined region 來說,僅當(dāng)有數(shù)據(jù)時(shí)才調(diào)度它,并且僅在所有其所需的資源都被滿足時(shí)才部署它;同時(shí)也可以支持獨(dú)立地重啟失敗的 region。對于批作業(yè)來說,新策略可顯著地提高資源利用率,并消除死鎖。
■ 支持 Sort-Merge Shuffle (FLIP-148)
為了提高大規(guī)模批作業(yè)的穩(wěn)定性、性能和資源利用率,社區(qū)引入了 sort-merge shuffle,以替代 Flink 現(xiàn)有的實(shí)現(xiàn)。這種方案可以顯著減少 shuffle 的時(shí)間,并使用較少的文件句柄和文件寫緩存(這對于大規(guī)模批作業(yè)的執(zhí)行非常重要)。在后續(xù)版本中(FLINK-19614),F(xiàn)link 會進(jìn)一步優(yōu)化相關(guān)性能。
注意:該功能是實(shí)驗(yàn)性的,在 Flink 1.12 中默認(rèn)情況下不啟用。要啟用 sort-merge shuffle,需要在 TaskManager 的網(wǎng)絡(luò)配置[6]中設(shè)置合理的最小并行度。
■ Flink WebUI 的改進(jìn) (FLIP-75)
作為對上一個(gè)版本中,F(xiàn)link WebUI 一系列改進(jìn)的延續(xù),F(xiàn)link 1.12 在 WebUI 上暴露了 JobManager 內(nèi)存相關(guān)的指標(biāo)和配置參數(shù)(FLIP-104)。對于 TaskManager 的指標(biāo)頁面也進(jìn)行了更新,為 Managed Memory、Network Memory 和 Metaspace 添加了新的指標(biāo),以反映自 Flink 1.10(FLIP-102)開始引入的 TaskManager 內(nèi)存模型的更改[7]。
Table API/SQL: SQL Connectors 中的 Metadata 處理
如果可以將某些 source(和 format)的元數(shù)據(jù)作為額外字段暴露給用戶,對于需要將元數(shù)據(jù)與記錄數(shù)據(jù)一起處理的用戶來說很有意義。一個(gè)常見的例子是 Kafka,用戶可能需要訪問 offset、partition 或 topic 信息、讀寫 kafka 消息中的 key 或 使用消息 metadata中的時(shí)間戳進(jìn)行時(shí)間相關(guān)的操作。
在 Flink 1.12 中,F(xiàn)link SQL 支持了元數(shù)據(jù)列用來讀取和寫入每行數(shù)據(jù)中 connector 或 format 相關(guān)的列(FLIP-107)。這些列在 CREATE TABLE 語句中使用 METADATA(保留)關(guān)鍵字來聲明。
CREATE TABLE kafka_table (
id BIGINT,
name STRING,
event_time TIMESTAMP(3) METADATA FROM 'timestamp', -- access Kafka 'timestamp' metadata
headers MAP METADATA -- access Kafka 'headers' metadata
) WITH (
'connector' = 'kafka',
'topic' = 'test-topic',
'format' = 'avro'
);
在 Flink 1.12 中,已經(jīng)支持 Kafka 和 Kinesis connector 的元數(shù)據(jù),并且 FileSystem connector 上的相關(guān)工作也已經(jīng)在計(jì)劃中(FLINK-19903)。由于 Kafka record 的結(jié)構(gòu)比較復(fù)雜,社區(qū)還專門為 Kafka connector 實(shí)現(xiàn)了新的屬性[8],以控制如何處理鍵/值對。關(guān)于 Flink SQL 中元數(shù)據(jù)支持的完整描述,請查看每個(gè) connector 的文檔[9]以及 FLIP-107 中描述的用例。
Table API/SQL: Upsert Kafka Connector
在某些場景中,例如讀取 compacted topic 或者輸出(更新)聚合結(jié)果的時(shí)候,需要將 Kafka 消息記錄的 key 當(dāng)成主鍵處理,用來確定一條數(shù)據(jù)是應(yīng)該作為插入、刪除還是更新記錄來處理。為了實(shí)現(xiàn)該功能,社區(qū)為 Kafka 專門新增了一個(gè) upsert connector(upsert-kafka),該 connector 擴(kuò)展自現(xiàn)有的 Kafka connector,工作在 upsert 模式(FLIP-149)下。新的 upsert-kafka connector 既可以作為 source 使用,也可以作為 sink 使用,并且提供了與現(xiàn)有的 kafka connector 相同的基本功能和持久性保證,因?yàn)閮烧咧g復(fù)用了大部分代碼。
要使用 upsert-kafka connector,必須在創(chuàng)建表時(shí)定義主鍵,并為鍵(key.format)和值(value.format)指定序列化反序列化格式。完整的示例,請查看最新的文檔[10]。
Table API/SQL: SQL 中 支持 Temporal Table Join
在之前的版本中,用戶需要通過創(chuàng)建時(shí)態(tài)表函數(shù)(temporal table function) 來支持時(shí)態(tài)表 join(temporal table join) ,而在 Flink 1.12 中,用戶可以使用標(biāo)準(zhǔn)的 SQL 語句 FOR SYSTEM_TIME AS OF(SQL:2011)來支持 join。此外,現(xiàn)在任意包含時(shí)間列和主鍵的表,都可以作為時(shí)態(tài)表,而不僅僅是 append-only 表。這帶來了一些新的應(yīng)用場景,比如將 Kafka compacted topic 或數(shù)據(jù)庫變更日志(來自 Debezium 等)作為時(shí)態(tài)表。
CREATE TABLE orders (
order_id STRING,
currency STRING,
amount INT,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '30' SECOND
) WITH (
…
);
-- Table backed by a Kafka compacted topic
CREATE TABLE latest_rates (
currency STRING,
rate DECIMAL(38, 10),
currency_time TIMESTAMP(3),
WATERMARK FOR currency_time AS currency_time - INTERVAL ‘5’ SECOND,
PRIMARY KEY (currency) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
…
);
-- Event-time temporal table join
SELECT
o.order_id,
o.order_time,
o.amount * r.rate AS amount,
r.currency
FROM orders AS o, latest_rates FOR SYSTEM_TIME AS OF o.order_time r
ON o.currency = r.currency;
上面的示例同時(shí)也展示了如何在 temporal table join 中使用 Flink 1.12 中新增的 upsert-kafka connector。
■ 使用 Hive 表進(jìn)行 Temporal Table Join
用戶也可以將 Hive 表作為時(shí)態(tài)表來使用,F(xiàn)link 既支持自動讀取 Hive 表的最新分區(qū)作為時(shí)態(tài)表(FLINK-19644),也支持在作業(yè)執(zhí)行時(shí)追蹤整個(gè) Hive 表的最新版本作為時(shí)態(tài)表。請參閱文檔,了解更多關(guān)于如何在 temporal table join 中使用 Hive 表的示例。
Table API/SQL 中的其它改進(jìn)
**■ Kinesis Flink SQL Connector (FLINK-18858)
**
從 Flink 1.12 開始,Table API / SQL 原生支持將 Amazon Kinesis Data Streams(KDS)作為 source 和 sink 使用。新的 Kinesis SQL connector 提供了對于增強(qiáng)的Fan-Out(EFO)以及 Sink Partition 的支持。如需了解 Kinesis SQL connector 所有支持的功能、配置選項(xiàng)以及對外暴露的元數(shù)據(jù)信息,請查看最新的文檔。
**■ 在 FileSystem/Hive connector 的流式寫入中支持小文件合并 (FLINK-19345)
**
很多 bulk format,例如 Parquet,只有當(dāng)寫入的文件比較大時(shí),才比較高效。當(dāng) checkpoint 的間隔比較小時(shí),這會成為一個(gè)很大的問題,因?yàn)闀?chuàng)建大量的小文件。在 Flink 1.12 中,F(xiàn)ile Sink 增加了小文件合并功能,從而使得即使作業(yè) checkpoint 間隔比較小時(shí),也不會產(chǎn)生大量的文件。要開啟小文件合并,可以按照文檔[11]中的說明在 FileSystem connector 中設(shè)置 auto-compaction = true 屬性。
■ Kafka Connector 支持 Watermark 下推 (FLINK-20041)
為了確保使用 Kafka 的作業(yè)的結(jié)果的正確性,通常來說,最好基于分區(qū)來生成 watermark,因?yàn)榉謪^(qū)內(nèi)數(shù)據(jù)的亂序程度通常來說比分區(qū)之間數(shù)據(jù)的亂序程度要低很多。Flink 現(xiàn)在允許將 watermark 策略下推到 Kafka connector 里面,從而支持在 Kafka connector 內(nèi)部構(gòu)造基于分區(qū)的 watermark[12]。一個(gè) Kafka source 節(jié)點(diǎn)最終所產(chǎn)生的 watermark 由該節(jié)點(diǎn)所讀取的所有分區(qū)中的 watermark 的最小值決定,從而使整個(gè)系統(tǒng)可以獲得更好的(即更接近真實(shí)情況)的 watermark。該功能也允許用戶配置基于分區(qū)的空閑檢測策略,以防止空閑分區(qū)阻礙整個(gè)作業(yè)的 event time 增長。
■ 新增的 Formats

■ 利用 Multi-input 算子進(jìn)行 Join 優(yōu)化 (FLINK-19621)
Shuffling 是一個(gè) Flink 作業(yè)中最耗時(shí)的操作之一。為了消除不必要的序列化反序列化開銷、數(shù)據(jù) spilling 開銷,提升 Table API / SQL 上批作業(yè)和流作業(yè)的性能, planner 當(dāng)前會利用上一個(gè)版本中已經(jīng)引入的N元算子(FLIP-92),將由 forward 邊所連接的多個(gè)算子合并到一個(gè) Task 里執(zhí)行。
■ Type Inference for Table API UDAFs (FLIP-65)
Flink 1.12 完成了從 Flink 1.9 開始的,針對 Table API 上的新的類型系統(tǒng)[2]的工作,并在聚合函數(shù)(UDAF)上支持了新的類型系統(tǒng)。從 Flink 1.12 開始,與標(biāo)量函數(shù)和表函數(shù)類似,聚合函數(shù)也支持了所有的數(shù)據(jù)類型。
PyFlink: Python DataStream API
為了擴(kuò)展 PyFlink 的可用性,F(xiàn)link 1.12 提供了對于 Python DataStream API(FLIP-130)的初步支持,該版本支持了無狀態(tài)類型的操作(例如 Map,F(xiàn)latMap,F(xiàn)ilter,KeyBy 等)。如果需要嘗試 Python DataStream API,可以安裝PyFlink,然后按照該文檔[14]進(jìn)行操作,文檔中描述了如何使用 Python DataStream API 構(gòu)建一個(gè)簡單的流應(yīng)用程序。
from pyflink.common.typeinfo import Types
from pyflink.datastream import MapFunction, StreamExecutionEnvironment
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
env = StreamExecutionEnvironment.get_execution_environment()
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
mapped_stream.print()
env.execute("datastream job")
PyFlink 中的其它改進(jìn)
**■ PyFlink Jobs on Kubernetes (FLINK-17480)
**
除了 standalone 部署和 YARN 部署之外,現(xiàn)在也原生支持將 PyFlink 作業(yè)部署在 Kubernetes 上。最新的文檔中詳細(xì)描述了如何在 Kubernetes 上啟動 session 或 application 集群。
**■ 用戶自定義聚合函數(shù) (UDAFs)
**
從 Flink 1.12 開始,您可以在 PyFlink 作業(yè)中定義和使用 Python UDAF 了(FLIP-139)。普通的 UDF(標(biāo)量函數(shù))每次只能處理一行數(shù)據(jù),而 UDAF(聚合函數(shù))則可以處理多行數(shù)據(jù),用于計(jì)算多行數(shù)據(jù)的聚合值。您也可以使用 Pandas UDAF[15](FLIP-137),來進(jìn)行向量化計(jì)算(通常來說,比普通 Python UDAF 快10倍以上)。
注意: 普通 Python UDAF,當(dāng)前僅支持在 group aggregations 以及流模式下使用。如果需要在批模式或者窗口聚合中使用,建議使用 Pandas UDAF。
五. Flink 1.13 新特性
- flink1.13新特性,該部分內(nèi)容轉(zhuǎn)自 官宣|Apache Flink 1.13.0 正式發(fā)布,流處理應(yīng)用更加簡單高效!
發(fā)布時(shí)間:2021.5.3
1. 被動擴(kuò)縮容
Flink 項(xiàng)目的一個(gè)初始目標(biāo),就是希望流處理應(yīng)用可以像普通應(yīng)用一樣簡單和自然,被動擴(kuò)縮容是 Flink 針對這一目標(biāo)上的最新進(jìn)展。
當(dāng)考慮資源管理和部分的時(shí)候,F(xiàn)link 有兩種可能的模式。用戶可以將 Flink 應(yīng)用部署到 k8s、yarn 等資源管理系統(tǒng)之上,并且由 Flink 主動的來管理資源并按需分配和釋放資源。這一模式對于經(jīng)常改變資源需求的作業(yè)和應(yīng)用非常有用,比如批作業(yè)和實(shí)時(shí) SQL 查詢。在這種模式下,F(xiàn)link 所啟動的 Worker 數(shù)量是由應(yīng)用設(shè)置的并發(fā)度決定的。在 Flink 中我們將這一模式叫做主動擴(kuò)縮容。
對于長時(shí)間運(yùn)行的流處理應(yīng)用,一種更適合的模型是用戶只需要將作業(yè)像其它的長期運(yùn)行的服務(wù)一樣啟動起來,而不需要考慮是部署在 k8s、yarn 還是其它的資源管理平臺上,并且不需要考慮需要申請的資源的數(shù)量。相反,它的規(guī)模是由所分配的 worker 數(shù)量來決定的。當(dāng) worker 數(shù)量發(fā)生變化時(shí),F(xiàn)link 自動的改動應(yīng)用的并發(fā)度。在 Flink 中我們將這一模式叫做被動擴(kuò)縮容。
Flink 的 Application 部署模式開啟了使 Flink 作業(yè)更接近普通應(yīng)用(即啟動 Flink 作業(yè)不需要執(zhí)行兩個(gè)獨(dú)立的步驟來啟動集群和提交應(yīng)用)的努力,而被動擴(kuò)縮容完成了這一目標(biāo):用戶不再需要使用額外的工具(如腳本、K8s 算子)來讓 worker 的數(shù)量與應(yīng)用并發(fā)度設(shè)置保持一致。
用戶現(xiàn)在可以將自動擴(kuò)縮容的工具應(yīng)用到 Flink 應(yīng)用之上,就像普通的應(yīng)用程序一樣,只要用戶了解擴(kuò)縮容的代價(jià):有狀態(tài)的流應(yīng)用在擴(kuò)縮容的時(shí)候需要將狀態(tài)重新分發(fā)。
如果想要嘗試被動擴(kuò)縮容,用戶可以增加 scheduler-mode: reactive 這一配置項(xiàng),然后啟動一個(gè)應(yīng)用集群(Standalone 或者 K8s)。更多細(xì)節(jié)見被動擴(kuò)縮容的文檔。
2. 分析應(yīng)用的性能
對所有應(yīng)用程序來說,能夠簡單的分析和理解應(yīng)用的性能是非常關(guān)鍵的功能。這一功能對 Flink 更加重要,因?yàn)?Flink 應(yīng)用一般是數(shù)據(jù)密集的(即需要處理大量的數(shù)據(jù))并且需要在(近)實(shí)時(shí)的延遲內(nèi)給出結(jié)果。
當(dāng) Flink 應(yīng)用處理的速度跟不上數(shù)據(jù)輸入的速度時(shí),或者當(dāng)一個(gè)應(yīng)用占用的資源超過預(yù)期,下文介紹的這些工具可以幫你分析原因。
2.1 瓶頸檢測與反壓監(jiān)控
Flink 性能分析首先要解決的問題經(jīng)常是:哪個(gè)算子是瓶頸?
為了回答這一問題,F(xiàn)link 引入了描述作業(yè)繁忙(即在處理數(shù)據(jù))與反壓(由于下游算子不能及時(shí)處理結(jié)果而無法繼續(xù)輸出)程度的指標(biāo)。應(yīng)用中可能的瓶頸是那些繁忙并且上游被反壓的算子。
Flink 1.13 優(yōu)化了反壓檢測的邏輯(使用基于任務(wù) Mailbox 計(jì)時(shí),而不在再于堆棧采樣),并且重新實(shí)現(xiàn)了作業(yè)圖的 UI 展示:Flink 現(xiàn)在在 UI 上通過顏色和數(shù)值來展示繁忙和反壓的程度。

2.2 Web UI 中的 CPU 火焰圖
Flink 關(guān)于性能另一個(gè)經(jīng)常需要回答的問題:瓶頸算子中的哪部分計(jì)算邏輯消耗巨大?
針對這一問題,一個(gè)有效的可視化工具是火焰圖。它可以幫助回答以下問題:
- 哪個(gè)方法調(diào)現(xiàn)在在占用 CPU?
- 不同方法占用 CPU 的比例如何?
- 一個(gè)方法被調(diào)用的棧是什么樣子的?
火焰圖是通過重復(fù)采樣線程的堆棧來構(gòu)建的。在火焰圖中,每個(gè)方法調(diào)用被表示為一個(gè)矩形,矩形的長度與這個(gè)方法出現(xiàn)在采樣中的次數(shù)成正比?;鹧鎴D在 UI 上的一個(gè)例子如下圖所示。

火焰圖的文檔 包括啟用這一功能的更多細(xì)節(jié)和指令。
2.3 State 訪問延遲指標(biāo)
另一個(gè)可能的性能瓶頸是 state backend,尤其是當(dāng)作業(yè)的 state 超過內(nèi)存容量而必須使用 RocksDB state backend 時(shí)。
這里并不是想說 RocksDB 性能不夠好(我們非常喜歡 RocksDB?。?,但是它需要滿足一些條件才能達(dá)到最好的性能。例如,用戶可能很容易遇到非故意的在云上由于使用了錯(cuò)誤的磁盤資源類型而不能滿足 RockDB 的 IO 性能需求的問題。
基于 CPU 火焰圖,新的 State Backend 的延遲指標(biāo)可以幫助用戶更好的判斷性能不符合預(yù)期是否是由 State Backend 導(dǎo)致的。例如,如果用戶發(fā)現(xiàn) RocksDB 的單次訪問需要幾毫秒的時(shí)間,那么就需要查看內(nèi)存和 I/O 的配置。這些指標(biāo)可以通過設(shè)置 state.backend.rocksdb.latency-track-enabled 這一選項(xiàng)來啟用。這些指標(biāo)是通過采樣的方式來監(jiān)控性能的,所以它們對 RocksDB State Backend 的性能影響是微不足道的。
3. 通過 Savepoint 來切換 State Backend
用戶現(xiàn)在可以在從一個(gè) Savepoint 重啟時(shí)切換一個(gè) Flink 應(yīng)用的 State Backend。這使得 Flink 應(yīng)用不再被限制只能使用應(yīng)用首次運(yùn)行時(shí)選擇的 State Backend。
基于這一功能,用戶現(xiàn)在可以首先使用一個(gè) HashMap State Backend(純內(nèi)存的 State Backend),如果后續(xù)狀態(tài)變得過大的話,就切換到 RocksDB State Backend 中。
在實(shí)現(xiàn)層,F(xiàn)link 現(xiàn)在統(tǒng)一了所有 State Backend 的 Savepoint 格式來實(shí)現(xiàn)這一功能。
4. K8s 部署時(shí)使用用戶指定的 Pod 模式
原生 kubernetes 部署(Flink 主動要求 K8s 來啟動 Pod)中,現(xiàn)在可以使用自定義的 Pod 模板。
使用這些模板,用戶可以使用一種更符合 K8s 的方式來設(shè)置 JM 和 TM 的 Pod,這種方式比 Flink K8s 集成內(nèi)置的配置項(xiàng)更加靈活。
5. 生產(chǎn)可用的 Unaligned Checkpoint
Unaligned Checkpoint 目前已達(dá)到了生產(chǎn)可用的狀態(tài),我們鼓勵用戶在存在反壓的情況下試用這一功能。
具體來說,F(xiàn)link 1.13 中引入的這些功能使 Unaligned Checkpoint 更容易使用:
- 用戶現(xiàn)在使用 Unaligned Checkpoint 時(shí)也可以擴(kuò)縮容應(yīng)用。如果用戶需要因?yàn)樾阅茉虿荒苁褂?Savepoint而必須使用 Retained checkpoint 時(shí),這一功能會非常方便。
- 對于沒有反壓的應(yīng)用,啟用 Unaligned Checkpoint 現(xiàn)在代價(jià)更小。Unaligned Checkpoint 現(xiàn)在可以通過超時(shí)來自動觸發(fā),即一個(gè)應(yīng)用默認(rèn)會使用 Aligned Checkpoint(不存儲傳輸中的數(shù)據(jù)),而只在對齊超過一定時(shí)間范圍時(shí)自動切換到 Unaligned Checkpoint(存儲傳輸中的數(shù)據(jù))。
關(guān)于如何啟用 Unaligned Checkpoint 可以參考相關(guān)文檔。
6. 機(jī)器學(xué)習(xí)遷移到單獨(dú)的倉庫
為了加速 Flink 機(jī)器學(xué)習(xí)的進(jìn)展(流批統(tǒng)一的機(jī)器學(xué)習(xí)),現(xiàn)在 Flink 機(jī)器學(xué)習(xí)開啟了新的 flink-ml 倉庫。我們采用類似于 Stateful Function 項(xiàng)目的管理方式,通過使用一個(gè)單獨(dú)的倉庫從而簡化代碼合并的流程并且可以進(jìn)行單獨(dú)的版本發(fā)布,從而提高開發(fā)的效率。
用戶可以關(guān)注 Flink 在機(jī)器學(xué)習(xí)方面的進(jìn)展,比如與 Alink(Flink 常用機(jī)器學(xué)習(xí)算法套件)的互操作以及 Flink 與 Tensorflow 的集成。
二、SQL / Table API 進(jìn)展
與之前的版本類似,SQL 和 Table API 仍然在所有開發(fā)中占用很大的比例。
1. 通過 Table-valued 函數(shù)來定義時(shí)間窗口
在流式 SQL 查詢中,一個(gè)最經(jīng)常使用的是定義時(shí)間窗口。Flink 1.13 中引入了一種新的定義窗口的方式:通過 Table-valued 函數(shù)。這一方式不僅有更強(qiáng)的表達(dá)能力(允許用戶定義新的窗口類型),并且與 SQL 標(biāo)準(zhǔn)更加一致。
Flink 1.13 在新的語法中支持 TUMBLE 和 HOP 窗口,在后續(xù)版本中也會支持 SESSION 窗口。我們通過以下兩個(gè)例子來展示這一方法的表達(dá)能力:
- 例 1:一個(gè)新引入的 CUMULATE 窗口函數(shù),它可以支持按特定步長擴(kuò)展的窗口,直到達(dá)到最大窗口大?。?/li>
SELECT window_time, window_start, window_end, SUM(price) AS total_price
FROM TABLE(CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, window_time;
- 例 2:用戶在 table-valued 窗口函數(shù)中可以訪問窗口的起始和終止時(shí)間,從而使用戶可以實(shí)現(xiàn)新的功能。例如,除了常規(guī)的基于窗口的聚合和 Join 之外,用戶現(xiàn)在也可以實(shí)現(xiàn)基于窗口的 Top-K 聚合:
SELECT window_time, ...
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY total_price DESC)
as rank
FROM t
) WHERE rank <= 100;
2. 提高 DataStream API 與 Table API / SQL 的互操作能力
這一版本極大的簡化了 DataStream API 與 Table API 混合的程序。
Table API 是一種非常方便的應(yīng)用開發(fā)接口,因?yàn)檫@經(jīng)支持表達(dá)式的程序編寫并提供了大量的內(nèi)置函數(shù)。但是有時(shí)候用戶也需要切換回 DataStream,例如當(dāng)用戶存在表達(dá)能力、靈活性或者 State 訪問的需求時(shí)。
Flink 新引入的 StreamTableEnvironment.toDataStream()/.fromDataStream() 可以將一個(gè) DataStream API 聲明的 Source 或者 Sink 當(dāng)作 Table 的 Source 或者 Sink 來使用。主要的優(yōu)化包括:
- DataStream 與 Table API 類型系統(tǒng)的自動轉(zhuǎn)換。
- Event Time 配置的無縫集成,Watermark 行為的高度一致性。
- Row 類型(即 Table API 中數(shù)據(jù)的表示)有了極大的增強(qiáng),包括 toString() / hashCode() 和 equals() 方法的優(yōu)化,按名稱訪問字段值的支持與稀疏表示的支持。
Table table = tableEnv.fromDataStream(
dataStream,
Schema.newBuilder()
.columnByMetadata("rowtime", "TIMESTAMP(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build());
DataStream<Row> dataStream = tableEnv.toDataStream(table)
.keyBy(r -> r.getField("user"))
.window(...);
3. SQL Client: 初始化腳本和語句集合 (Statement Sets)
SQL Client 是一種直接運(yùn)行和部署 SQL 流或批作業(yè)的簡便方式,用戶不需要編寫代碼就可以從命令行調(diào)用 SQL,或者作為 CI / CD 流程的一部分。
這個(gè)版本極大的提高了 SQL Client 的功能?,F(xiàn)在基于所有通過 Java 編程(即通過編程的方式調(diào)用 TableEnvironment 來發(fā)起查詢)可以支持的語法,現(xiàn)在 SQL Client 和 SQL 腳本都可以支持。這意味著 SQL 用戶不再需要添加膠水代碼來部署他們的SQL作業(yè)。
3.1 配置簡化和代碼共享
Flink 后續(xù)將不再支持通過 Yaml 的方式來配置 SQL Client(注:目前還在支持,但是已經(jīng)被標(biāo)記為廢棄)。作為替代,SQL Client 現(xiàn)在支持使用一個(gè)初始化腳本在主 SQL 腳本執(zhí)行前來配置環(huán)境。
這些初始化腳本通??梢栽诓煌瑘F(tuán)隊(duì)/部署之間共享。它可以用來加載常用的 catalog,應(yīng)用通用的配置或者定義標(biāo)準(zhǔn)的視圖。
./sql-client.sh -i init1.sql init2.sql -f sqljob.sql
3.2 更多的配置項(xiàng)
通過增加配置項(xiàng),優(yōu)化 SET / RESET 命令,用戶可以更方便的在 SQL Client 和 SQL 腳本內(nèi)部來控制執(zhí)行的流程。
3.3 通過語句集合來支持多查詢
多查詢允許用戶在一個(gè) Flink 作業(yè)中執(zhí)行多個(gè) SQL 查詢(或者語句)。這對于長期運(yùn)行的流式 SQL 查詢非常有用。
語句集可以用來將一組查詢合并為一組同時(shí)執(zhí)行。
以下是一個(gè)可以通過 SQL Client 來執(zhí)行的 SQL 腳本的例子。它初始化和配置了執(zhí)行多查詢的環(huán)境。這一腳本包括了所有的查詢和所有的環(huán)境初始化和配置的工作,從而使它可以作為一個(gè)自包含的部署組件。
-- set up a catalog
CREATE CATALOG hive_catalog WITH ('type' = 'hive');
USE CATALOG hive_catalog;
-- or use temporary objects
CREATE TEMPORARY TABLE clicks (
user_id BIGINT,
page_id BIGINT,
viewtime TIMESTAMP
) WITH (
'connector' = 'kafka',
'topic' = 'clicks',
'properties.bootstrap.servers' = '...',
'format' = 'avro'
);
-- set the execution mode for jobs
SET execution.runtime-mode=streaming;
-- set the sync/async mode for INSERT INTOs
SET table.dml-sync=false;
-- set the job's parallelism
SET parallism.default=10;
-- set the job name
SET pipeline.name = my_flink_job;
-- restore state from the specific savepoint path
SET execution.savepoint.path=/tmp/flink-savepoints/savepoint-bb0dab;
BEGIN STATEMENT SET;
INSERT INTO pageview_pv_sink
SELECT page_id, count(1) FROM clicks GROUP BY page_id;
INSERT INTO pageview_uv_sink
SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;
4. Hive 查詢語法兼容性
用戶現(xiàn)在在 Flink 上也可以使用 Hive SQL 語法。除了 Hive DDL 方言之外,F(xiàn)link現(xiàn)在也支持常用的 Hive DML 和 DQL 方言。
為了使用 Hive SQL 方言,需要設(shè)置 table.sql-dialect 為 hive 并且加載 HiveModule。后者非常重要,因?yàn)楸仨氁虞d Hive 的內(nèi)置函數(shù)后才能正確實(shí)現(xiàn)對 Hive 語法和語義的兼容性。例子如下:
CREATE CATALOG myhive WITH ('type' = 'hive'); -- setup HiveCatalog
USE CATALOG myhive;
LOAD MODULE hive; -- setup HiveModule
USE MODULES hive,core;
SET table.sql-dialect = hive; -- enable Hive dialect
SELECT key, value FROM src CLUSTER BY key; -- run some Hive queries
需要注意的是, Hive 方言中不再支持 Flink 語法的 DML 和 DQL 語句。如果要使用 Flink 語法,需要切換回 default 的方言配置。
5. 優(yōu)化的 SQL 時(shí)間函數(shù)
在數(shù)據(jù)處理中時(shí)間處理是一個(gè)重要的任務(wù)。但是與此同時(shí),處理不同的時(shí)區(qū)、日期和時(shí)間是一個(gè)日益復(fù)雜的任務(wù)。
在 Flink 1.13 中,我們投入了大量的精力來簡化時(shí)間函數(shù)的使用。我們調(diào)整了時(shí)間相關(guān)函數(shù)的返回類型使其更加精確,例如 PROCTIME(),CURRENT_TIMESTAMP() 和 NOW()。
其次,用戶現(xiàn)在還可以基于一個(gè) TIMESTAMP_LTZ 類型的列來定義 Event Time 屬性,從而可以優(yōu)雅的在窗口處理中支持夏令時(shí)。
用戶可以參考 Release Note 來查看該部分的完整變更。
三、PyFlink 核心優(yōu)化
這個(gè)版本對 PyFlink 的改進(jìn)主要是使基于 Python 的 DataStream API 與 Table API 與 Java/scala 版本的對應(yīng)功能更加一致。
1. Python DataStream API 中的有狀態(tài)算子
在 Flink 1.13 中,Python 程序員可以享受到 Flink 狀態(tài)處理 API 的所有能力。在 Flink 1.12 版本重構(gòu)過的 Python DataStream API 現(xiàn)在已經(jīng)擁有完整的狀態(tài)訪問能力,從而使用戶可以將數(shù)據(jù)的信息記錄到 state 中并且在后續(xù)訪問。
帶狀態(tài)的處理能力是許多依賴跨記錄狀態(tài)共享(例如 Window Operator)的復(fù)雜數(shù)據(jù)處理場景的基礎(chǔ)。
以下例子展示了一個(gè)自定義的計(jì)算窗口的實(shí)現(xiàn):
class CountWindowAverage(FlatMapFunction):
def __init__(self, window_size):
self.window_size = window_size
def open(self, runtime_context: RuntimeContext):
descriptor = ValueStateDescriptor("average", Types.TUPLE([Types.LONG(), Types.LONG()]))
self.sum = runtime_context.get_state(descriptor)
def flat_map(self, value):
current_sum = self.sum.value()
if current_sum is None:
current_sum = (0, 0)
# update the count
current_sum = (current_sum[0] + 1, current_sum[1] + value[1])
# if the count reaches window_size, emit the average and clear the state
if current_sum[0] >= self.window_size:
self.sum.clear()
yield value[0], current_sum[1] // current_sum[0]
else:
self.sum.update(current_sum)
ds = ... # type: DataStream
ds.key_by(lambda row: row[0]) \
.flat_map(CountWindowAverage(5))
2. PyFlink DataStream API 中的用戶自定義窗口
Flink 1.13 中 PyFlink DataStream 接口增加了對用戶自定義窗口的支持,現(xiàn)在用戶可以使用標(biāo)準(zhǔn)窗口之外的窗口定義。
由于窗口是處理無限數(shù)據(jù)流的核心機(jī)制 (通過將流切分為多個(gè)有限的『桶』),這一功能極大的提高的 API 的表達(dá)能力。
3. PyFlink Table API 中基于行的操作
Python Table API 現(xiàn)在支持基于行的操作,例如用戶對行數(shù)據(jù)的自定義函數(shù)。這一功能使得用戶可以使用非內(nèi)置的數(shù)據(jù)處理函數(shù)。
一個(gè)使用 map() 操作的 Python Table API 示例如下:
@udf(result_type=DataTypes.ROW(
[DataTypes.FIELD("c1", DataTypes.BIGINT()),
DataTypes.FIELD("c2", DataTypes.STRING())]))
def increment_column(r: Row) -> Row:
return Row(r[0] + 1, r[1])
table = ... # type: Table
mapped_result = table.map(increment_column)
除了 map(),這一 API 還支持 flat_map(),aggregate(),flat_aggregate() 和其它基于行的操作。這使 Python Table API 的功能與 Java Table API 的功能更加接近。
4. PyFlink DataStream API 支持 Batch 執(zhí)行模式
對于有限流,PyFlink DataStream API 現(xiàn)在已經(jīng)支持 Flink 1.12 DataStream API 中引入的 Batch 執(zhí)行模式。
通過復(fù)用數(shù)據(jù)有限性來跳過 State backend 和 Checkpoint 的處理,Batch 執(zhí)行模式可以簡化運(yùn)維,并且提高有限流處理的性能。
四、其它優(yōu)化
1. 基于 Hugo 的 Flink 文檔
Flink 文檔從 JekyII 遷移到了 Hugo。如果您發(fā)現(xiàn)有問題,請務(wù)必通知我們,我們非常期待用戶對新的界面的感受。
2. Web UI 支持歷史異常
Flink Web UI 現(xiàn)在可以展示導(dǎo)致作業(yè)失敗的 n 次歷史異常,從而提升在一個(gè)異常導(dǎo)致多個(gè)后續(xù)異常的場景下的調(diào)試體驗(yàn)。用戶可以在異常歷史中找到根異常。
3. 優(yōu)化失敗 Checkpoint 的異常和失敗原因的匯報(bào)
Flink 現(xiàn)在提供了失敗或被取消的 Checkpoint 的統(tǒng)計(jì),從而使用戶可以更簡單的判斷 Checkpoint 失敗的原因,而不需要去查看日志。
Flink 之前的版本只有在 Checkpoint 成功的時(shí)候才會匯報(bào)指標(biāo)(例如持久化數(shù)據(jù)的大小、觸發(fā)時(shí)間等)。
4. 提供『恰好一次』一致性的 JDBC Sink
從 1.13 開始,通過使用事務(wù)提交數(shù)據(jù),JDBC Sink 可以對支持 XA 事務(wù)的數(shù)據(jù)庫提供『恰好一次』的一致性支持。這一特性要求目標(biāo)數(shù)據(jù)庫必須有(或鏈接到)一個(gè) XA 事務(wù)處理器。
這一 Sink 現(xiàn)在只能在 DataStream API 中使用。用戶可以通過 JdbcSink.exactlyOnceSink(…) 來創(chuàng)建這一 Sink(或者通過顯式初始化一個(gè) JdbcXaSinkFunction)。
5. PyFlink Table API 在 Group 窗口上支持用戶自定義的聚合函數(shù)
PyFlink Table API 現(xiàn)在對 Group 窗口同時(shí)支持基于 Python 的用戶自定義聚合函數(shù)(User-defined Aggregate Functions, UDAFs)以及 Pandas UDAFs。這些函數(shù)對許多數(shù)據(jù)分析或機(jī)器學(xué)習(xí)訓(xùn)練的程序非常重要。
在 Flink 1.13 之前,這些函數(shù)僅能在無限的 Group-by 聚合場景下使用。Flink 1.13 優(yōu)化了這一限制。
6. Batch 執(zhí)行模式下 Sort-merge Shuffle 優(yōu)化
Flink 1.13 優(yōu)化了針對批處理程序的 Sort-merge Blocking Shuffle 的性能和內(nèi)存占用情況。這一 Shuffle 模式是在Flink 1.12 的 FLIP-148 中引入的。
這一優(yōu)化避免了大規(guī)模作業(yè)下不斷出現(xiàn) OutOfMemoryError: Direct Memory 的問題,并且通過 I/O 調(diào)度和 broadcast 優(yōu)化提高了性能(尤其是在機(jī)械硬盤上)。
7. HBase 連接器支持異步維表查詢和查詢緩存
HBase Lookup Table Source 現(xiàn)在可以支持異步查詢模式和查詢緩存。這極大的提高了使用這一 Source 的 Table / SQL 維表 Join 的性能,并且在一些典型情況下可以減少對 HBase 的 I/O 請求數(shù)量。
在之前的版本中,HBase Lookup Source 僅支持同步通信,從而導(dǎo)致作業(yè)吞吐以及資源利用率降低。
8. 升級 Flink 1.13 需要注意的改動
- FLINK-21709 – 老的 Table & SQL API 計(jì)劃器已經(jīng)被標(biāo)記為廢棄,并且將在 Flink 1.14 中被刪除。Blink 計(jì)劃器在若干版本之前已經(jīng)被設(shè)置為默認(rèn)計(jì)劃器,并且將成為未來版本中的唯一計(jì)劃器。這意味著 BatchTableEnvironment 和 DataSet API 互操作后續(xù)也將不再支持。用戶需要切換到統(tǒng)一的 TableEnvironment 來編寫流或者批的作業(yè)。
- FLINK-22352 – Flink 社區(qū)決定廢棄對 Apache mesos 的支持,未來有可能會進(jìn)一步刪除這部分功能。用戶最好能夠切換到其它的資源管理系統(tǒng)上。
- FLINK-21935 – state.backend.async 這一配置已經(jīng)被禁用了,因?yàn)楝F(xiàn)在 Flink 總是會異步的來保存快照(即之前的配置默認(rèn)值),并且現(xiàn)在沒有實(shí)現(xiàn)可以支持同步的快照保存操作。
- FLINK-17012 – Task 的 RUNNING 狀態(tài)被細(xì)分為兩步:INITIALIZING 和 RUNNING。Task 的 INITIALIZING 階段包括加載 state 和在啟用 unaligned checkpoint 時(shí)恢復(fù) In-flight 數(shù)據(jù)的過程。通過顯式區(qū)分這兩種狀態(tài),監(jiān)控系統(tǒng)可以更好的區(qū)分任務(wù)是否已經(jīng)在實(shí)際工作。
- FLINK-21698 – NUMERIC 和 TIMESTAMP 類型之間的直接轉(zhuǎn)換存在問題,現(xiàn)在已經(jīng)被禁用,例如 CAST(numeric AS TIMESTAMP(3))。用戶應(yīng)該使用 TO_TIMESTAMP(FROM_UNIXTIME(numeric)) 來代替。
- FLINK-22133 – 新的 Source 接口有一個(gè)小的不兼容的修改,即 SplitEnumerator.snapshotState() 方法現(xiàn)在多接受一個(gè) checkpoint id 參數(shù)來表示正在進(jìn)行的 snapshot 操作所屬的 checkpoint 的 id。
- FLINK-19463 – 由于老的 Statebackend 接口承載了過多的語義并且容易引起困惑,這一接口被標(biāo)記為廢棄。這是一個(gè)純 API 層的改動,而并不會影響應(yīng)用運(yùn)行時(shí)。對于如何升級現(xiàn)有作業(yè),請參考 作業(yè)遷移指引 。