摘要:本文整理自小米計(jì)算平臺(tái)高級(jí)工程師胡煥,在 FFA 數(shù)據(jù)集成專(zhuān)場(chǎng)的分享。本篇內(nèi)容主要分為四個(gè)部分:
- 發(fā)展現(xiàn)狀
- 思考實(shí)踐
- 引擎設(shè)計(jì)
- 未來(lái)規(guī)劃
一、發(fā)展現(xiàn)狀

首先介紹一下小米計(jì)算平臺(tái),小米計(jì)算平臺(tái)主要負(fù)責(zé)小米集團(tuán)的數(shù)據(jù)開(kāi)發(fā)平臺(tái)的建設(shè),體現(xiàn)在產(chǎn)品上是小米數(shù)據(jù)工場(chǎng),底層引擎上常見(jiàn)的 Flink、Spark、Iceberg、Hive 等等都是由計(jì)算平臺(tái)在負(fù)責(zé)。
上圖是小米數(shù)據(jù)工場(chǎng)的技術(shù)架構(gòu)圖。
正中間的藍(lán)色高亮框是小米自研的消息中間件 Talos,可以把它替換成大家比較熟悉的 Kafka,這對(duì)今天的分享內(nèi)容來(lái)說(shuō)幾乎沒(méi)有任何差別。
Talos 右下方的藍(lán)色高亮框是表格存儲(chǔ)的技術(shù)選型,小米的數(shù)據(jù)湖技術(shù)選型選擇了 Iceberg,Iceberg 是小米數(shù)據(jù)集成的主要場(chǎng)景之一。
右下角的紅色高亮框就是數(shù)據(jù)集成。小米數(shù)據(jù)集成目前的主要場(chǎng)景還是數(shù)據(jù)的入湖入倉(cāng),但數(shù)據(jù)出湖出倉(cāng)的場(chǎng)景也在快速增長(zhǎng)。我們的最終建設(shè)目標(biāo)是建立各種異構(gòu)數(shù)據(jù)系統(tǒng)之間對(duì)接的能力,所以目前我們把數(shù)據(jù)集成作為一種基礎(chǔ)服務(wù)在建設(shè)。

在產(chǎn)品層面,我們將數(shù)據(jù)集成劃分為四個(gè)主要場(chǎng)景。
- 第一個(gè)主要場(chǎng)景是數(shù)據(jù)采集,主要是一些時(shí)間敏感數(shù)據(jù)的采集,比如客戶(hù)端和服務(wù)端的埋點(diǎn)數(shù)據(jù)的采集、日志文件采集、物聯(lián)網(wǎng)數(shù)據(jù)采集等等。這些場(chǎng)景需要一些專(zhuān)用的采集技術(shù)來(lái)支持,基本上沒(méi)有辦法整合到一個(gè)引擎中,我們將這些數(shù)據(jù)集成場(chǎng)景拆分為獨(dú)立的采集和集成兩個(gè)部分,兩者通過(guò)消息隊(duì)列進(jìn)行對(duì)接。
采集的部分設(shè)計(jì)了單獨(dú)的數(shù)據(jù)采集中心,用來(lái)配置各種采集服務(wù)。這些采集服務(wù)統(tǒng)一都輸出到消息隊(duì)列,這樣我們可以把集成的部分統(tǒng)一起來(lái),通過(guò)組合不同的實(shí)時(shí)集成作業(yè)來(lái)支持各種數(shù)據(jù)集成的場(chǎng)景。
- 第二個(gè)主要場(chǎng)景是實(shí)時(shí)集成作業(yè),目前只支持消息隊(duì)列和數(shù)據(jù)庫(kù)作為上游數(shù)據(jù)源。消息隊(duì)列場(chǎng)景里很大一部分的數(shù)據(jù)來(lái)源就是來(lái)源于數(shù)據(jù)采集中心,不同的采集場(chǎng)景對(duì)應(yīng)的主流實(shí)時(shí)集成場(chǎng)景略有不同,例如埋點(diǎn)場(chǎng)景通常寫(xiě)入到 Iceberg 進(jìn)行進(jìn)一步的加工和分析,日志場(chǎng)景通常不做額外的加工直接寫(xiě)入到 ES 里,還有一些比較特殊的是 Schema On Read 場(chǎng)景,它們直接寫(xiě)入 HDFS 文件。
數(shù)據(jù)庫(kù)場(chǎng)景與消息隊(duì)列場(chǎng)景略有不同,消息隊(duì)列場(chǎng)景里的采集和集成這兩個(gè)部分是完全相互獨(dú)立的,在數(shù)據(jù)庫(kù)場(chǎng)景里,這兩個(gè)部分則是緊密結(jié)合的。
第三個(gè)主要場(chǎng)景是離線(xiàn)集成作業(yè),離線(xiàn)場(chǎng)景相對(duì)來(lái)說(shuō)簡(jiǎn)單很多,略微麻煩一點(diǎn)的就是 MySQL 的分庫(kù)分表和 Doris 這兩個(gè)場(chǎng)景,后面為大家做詳細(xì)介紹。
第四個(gè)主要場(chǎng)景是跨集群同步作業(yè),目前我們只支持 Hive 和 Iceberg 做跨集群的同步,這是因?yàn)闀?huì)涉及到跨國(guó)的數(shù)據(jù)傳輸,在合規(guī)和網(wǎng)絡(luò)方面都會(huì)存在一些額外要求,所以我們盡量避免在采集或集成的環(huán)節(jié)做跨集群同步。
這四個(gè)場(chǎng)景除了數(shù)據(jù)采集中心無(wú)法整合到一個(gè)引擎里,其余三個(gè)場(chǎng)景都通過(guò)小米數(shù)據(jù)集成引擎支持了。

上圖是集成作業(yè)的主要場(chǎng)景的作業(yè)數(shù)量,紅色是實(shí)時(shí)作業(yè),藍(lán)色是離線(xiàn)作業(yè),灰色字體的是存量作業(yè)。
我們將實(shí)時(shí)集成作業(yè)分為消息隊(duì)列場(chǎng)景和數(shù)據(jù)庫(kù)場(chǎng)景,再加上離線(xiàn)集成作業(yè)、跨集群同步作業(yè),構(gòu)成了四個(gè)主要場(chǎng)景。
注意到每個(gè)場(chǎng)景里還有相當(dāng)多的存量作業(yè),在孵化小米數(shù)據(jù)集成引擎以前,這些場(chǎng)景都是用不同的引擎來(lái)支持的。

上圖展示的是小米數(shù)據(jù)集成的演進(jìn)。
消息隊(duì)列場(chǎng)景過(guò)去是由 Spark Streaming 支持的,現(xiàn)在換成了 Flink SQL。
數(shù)據(jù)庫(kù)實(shí)時(shí)場(chǎng)景過(guò)去是由自研采集服務(wù)支持的,且只支持 MySQL 數(shù)據(jù)庫(kù),現(xiàn)在整體升級(jí)到了 Flink CDC + Flink SQL,并借助 Flink CDC 支持了更多的數(shù)據(jù)庫(kù)。
離線(xiàn)場(chǎng)景里過(guò)去是由 DataX 支持的,現(xiàn)在我們也都換成了 Flink SQL。
跨集群同步場(chǎng)景過(guò)去由 Hadoop 的 DISTCP 命令來(lái)實(shí)現(xiàn)的,這個(gè)命令只能拷貝底層的文件,在拷貝 Hive 表的時(shí)候需要配合另外的命令來(lái)添加分區(qū),體驗(yàn)非常糟糕,換到了 Flink SQL 就沒(méi)有這個(gè)問(wèn)題了,還增加了對(duì) Iceberg 表的跨集群實(shí)時(shí)拷貝的支持。
直到現(xiàn)在小米還有大量的存量作業(yè)在等待遷移,這些存量作業(yè)的各種引擎給我們帶來(lái)了非常大的維護(hù)負(fù)擔(dān)。一方面占用了大量人力去熟悉和維護(hù)這些引擎,另一方面是我們很難在這些差異巨大的引擎上構(gòu)建出統(tǒng)一的產(chǎn)品,這需要增加很多分支判斷,也很容易出錯(cuò)。所以我們非常迫切地希望把這些場(chǎng)景由同一個(gè)引擎里支持起來(lái),目前來(lái)看,F(xiàn)link 在數(shù)據(jù)集成領(lǐng)域的優(yōu)勢(shì)極其明顯,小米的數(shù)據(jù)集成引擎就是基于 Flink 構(gòu)建的。
二、思考實(shí)踐
首先我們嘗試抽象出幾個(gè)核心概念,并通過(guò)核心問(wèn)題來(lái)界定每個(gè)核心概念的范圍和邊界。

我們將數(shù)據(jù)集成領(lǐng)域的生產(chǎn)實(shí)踐都?xì)w類(lèi)到這三個(gè)層級(jí)上,分別是數(shù)據(jù)集成領(lǐng)域、數(shù)據(jù)集成產(chǎn)品、數(shù)據(jù)集成引擎,這三個(gè)層級(jí)處理的核心問(wèn)題是一層一層遞進(jìn)的。

數(shù)據(jù)集成領(lǐng)域的核心問(wèn)題是連接。
數(shù)據(jù)集成的概念是相對(duì)于數(shù)據(jù)開(kāi)發(fā)的概念來(lái)定義的,上圖的右側(cè)展示的就是數(shù)據(jù)開(kāi)發(fā)領(lǐng)域最常見(jiàn)的技術(shù)棧,左側(cè)展示的則是數(shù)據(jù)集成領(lǐng)域的范圍。
在數(shù)據(jù)開(kāi)發(fā)領(lǐng)域,離線(xiàn)數(shù)倉(cāng)通常使用 Spark+Hive 的技術(shù)棧,實(shí)時(shí)數(shù)倉(cāng)的最新的技術(shù)棧是 Flink+數(shù)據(jù)湖,這里的 Hive 和數(shù)據(jù)湖都可以用 Flink 或 Spark 直接訪(fǎng)問(wèn)。我們通常所說(shuō)的數(shù)據(jù)開(kāi)發(fā)的工作主要就是在離線(xiàn)數(shù)倉(cāng)或數(shù)據(jù)湖內(nèi)進(jìn)行的,我們可以認(rèn)為數(shù)據(jù)開(kāi)發(fā)領(lǐng)域主要是基于現(xiàn)成的可以直接訪(fǎng)問(wèn)的數(shù)據(jù)進(jìn)行開(kāi)發(fā)。
在數(shù)據(jù)集成領(lǐng)域,關(guān)注的重點(diǎn)則是:數(shù)據(jù)在哪里,怎么訪(fǎng)問(wèn)這些數(shù)據(jù),怎么讓數(shù)據(jù)正確的參與到計(jì)算中。圖中最左側(cè)列出來(lái)的是最常見(jiàn)的幾個(gè)數(shù)據(jù)集成場(chǎng)景,在這些場(chǎng)景里我們不能像常規(guī)的數(shù)據(jù)開(kāi)發(fā)一樣直接訪(fǎng)問(wèn)這些數(shù)據(jù)。比如我們肯定不會(huì)在要進(jìn)行數(shù)據(jù)計(jì)算的時(shí)候才實(shí)時(shí)的連接到服務(wù)器甚至客戶(hù)端讀取數(shù)據(jù),即使是最常規(guī)的數(shù)據(jù)庫(kù)場(chǎng)景,我們通常也需要用一張 ODS 表來(lái)代替數(shù)據(jù)庫(kù)參與計(jì)算,在作業(yè)中直連數(shù)據(jù)庫(kù)很容易會(huì)讓數(shù)據(jù)庫(kù)壓力過(guò)大,產(chǎn)生各種異常。
我們希望在數(shù)據(jù)開(kāi)發(fā)過(guò)程中能夠以一種統(tǒng)一的方式訪(fǎng)問(wèn)上游的各種數(shù)據(jù)源,這個(gè)處理過(guò)程就是數(shù)據(jù)集成領(lǐng)域里要解決的核心問(wèn)題:連接。
注意這里強(qiáng)調(diào)的是連接,數(shù)據(jù)集成經(jīng)常與數(shù)據(jù)導(dǎo)入的概念混淆,把數(shù)據(jù)導(dǎo)入到 ODS 表之后再去做數(shù)據(jù)開(kāi)發(fā)是數(shù)據(jù)集成領(lǐng)域的一個(gè)優(yōu)秀實(shí)踐,但不是所有場(chǎng)景都必須要有導(dǎo)入這個(gè)步驟,在 Flink 中只要有 Connector 的支持,在簡(jiǎn)單場(chǎng)景中我們完全可以直接連接數(shù)據(jù)源做數(shù)據(jù)開(kāi)發(fā)。

數(shù)據(jù)集成產(chǎn)品的核心問(wèn)題是效率。
上圖展示了最常見(jiàn)的數(shù)倉(cāng)建模規(guī)范,我們主要關(guān)注其中的 ODS 表。
數(shù)據(jù)開(kāi)發(fā)過(guò)程中經(jīng)常需要多次讀取原始數(shù)據(jù),將原始數(shù)據(jù)導(dǎo)入到 ODS 表,再用 ODS 表替代原始數(shù)據(jù)參與數(shù)據(jù)開(kāi)發(fā),就可以避免重復(fù)連接上游的數(shù)據(jù)源。
現(xiàn)在經(jīng)常提到的用 ELT 替代 ETL 的做法,代表的是 ODS 表設(shè)計(jì)的一個(gè)優(yōu)秀實(shí)踐,我們稱(chēng)之為鏡像同步。鏡像同步要求 ODS 表結(jié)構(gòu)與上游數(shù)據(jù)的結(jié)構(gòu)盡可能保持完全一致,并盡可能的保留上游數(shù)據(jù)的所有細(xì)節(jié),數(shù)據(jù)清洗的步驟則往后放,改為基于 ODS 表實(shí)施,這樣如果清洗邏輯存在問(wèn)題,我們基于 ODS 表進(jìn)行修復(fù)的代價(jià)也非常小。
有鏡像同步這個(gè)優(yōu)秀實(shí)踐作為基礎(chǔ),我們就能夠?qū)?shù)據(jù)集成的過(guò)程標(biāo)準(zhǔn)化了。將鏡像同步的整個(gè)流程中的各種重復(fù)工作固化下來(lái),就形成了數(shù)據(jù)集成產(chǎn)品,從而可以大幅提高數(shù)據(jù)集成工作的開(kāi)發(fā)效率。我們可以認(rèn)為數(shù)據(jù)集成產(chǎn)品就是從數(shù)據(jù)集成領(lǐng)域的各種優(yōu)秀實(shí)踐上發(fā)展而來(lái)的。
這里需要關(guān)注一個(gè)細(xì)節(jié),將數(shù)據(jù)開(kāi)發(fā)的結(jié)果導(dǎo)出到數(shù)據(jù)應(yīng)用的過(guò)程中,同樣存在很多優(yōu)秀實(shí)踐,而且這些優(yōu)秀實(shí)踐的展現(xiàn)形式、底層技術(shù)與數(shù)據(jù)集成產(chǎn)品的相似度都非常高。很多情況下我們會(huì)把這兩種場(chǎng)景整合到一個(gè)產(chǎn)品里,在技術(shù)上這是非常合理的決策,但本質(zhì)上數(shù)據(jù)導(dǎo)出場(chǎng)景更合適的名稱(chēng)是數(shù)據(jù)系統(tǒng)集成,這種做法在某種意義上是拓展了數(shù)據(jù)集成領(lǐng)域的邊界。

數(shù)據(jù)集成引擎的核心問(wèn)題是異構(gòu)。
在底層技術(shù)上,我們需要有相應(yīng)的數(shù)據(jù)集成引擎來(lái)支持我們的數(shù)據(jù)集成產(chǎn)品。在引擎的設(shè)計(jì)中,最核心的問(wèn)題是解決異構(gòu)數(shù)據(jù)系統(tǒng)對(duì)接帶來(lái)的各種問(wèn)題,引擎涉及到的數(shù)據(jù)系統(tǒng)越多,碰到的問(wèn)題和解決方案也就越復(fù)雜。
上圖展示的是其中的一個(gè)例子,不同數(shù)據(jù)系統(tǒng)支持的字段類(lèi)型在語(yǔ)義上有細(xì)微差別,這些語(yǔ)義差別是數(shù)據(jù)集成引擎的主要問(wèn)題來(lái)源之一。
舉個(gè)例子,MySQL 沒(méi)有布爾類(lèi)型,通常我們用 tinyint(1)來(lái)實(shí)現(xiàn)布爾類(lèi)型,但轉(zhuǎn)換為布爾類(lèi)型還需要配置 JDBC 參數(shù)對(duì) Connector 行為做動(dòng)態(tài)的調(diào)整。當(dāng)鏡像同步到 Hive 或者 Iceberg 的時(shí)候,字段類(lèi)型如果沒(méi)有匹配上,就可能會(huì)出現(xiàn)類(lèi)型轉(zhuǎn)換的異常。
Unsigned 也是一個(gè)非常經(jīng)典的問(wèn)題,其中最容易出錯(cuò)的是 Bigint Unsigned。Flink、Hive、Iceberg 都沒(méi)有無(wú)符號(hào)類(lèi)型,如果考慮到精度問(wèn)題,我們就只能使用 decimal(20,0)來(lái)保存 Bigint Unsigned。但很多字段設(shè)計(jì)成 Unsigned 只是希望保證這個(gè)字段沒(méi)有負(fù)數(shù)值,并不會(huì)產(chǎn)生有符號(hào)數(shù)值溢出的情況,所以很多用戶(hù)仍然希望在 ODS 表中繼續(xù)使用 Bigint,這里就很容易導(dǎo)致作業(yè)出現(xiàn)各種問(wèn)題。
直到現(xiàn)在我們還有很多問(wèn)題在解決中,對(duì)于數(shù)據(jù)集成引擎來(lái)說(shuō),解決異構(gòu)數(shù)據(jù)系統(tǒng)對(duì)接帶來(lái)的各種問(wèn)題,屏蔽這些數(shù)據(jù)系統(tǒng)之間的差異,是最核心也是最有挑戰(zhàn)的問(wèn)題。

總結(jié)一下關(guān)鍵詞,數(shù)據(jù)集成領(lǐng)域的核心是連接,數(shù)據(jù)集成產(chǎn)品的核心問(wèn)題是效率,數(shù)據(jù)集成引擎的核心問(wèn)題是異構(gòu)。各種生產(chǎn)實(shí)踐基本都可以對(duì)應(yīng)到這三個(gè)層級(jí)。

Auto Catalog 特性是小米在數(shù)據(jù)集成領(lǐng)域的一個(gè)核心實(shí)踐,通過(guò) Auto Catalog 特性可以大幅度提升涉及異構(gòu)數(shù)據(jù)系統(tǒng)的作業(yè)的開(kāi)發(fā)效率。
正中間的 SQL 就是通過(guò) Auto Catalog 的方式,實(shí)現(xiàn)的 MySQL 寫(xiě)入 Iceberg 的作業(yè)。右上角的語(yǔ)句是通過(guò) Create Table 語(yǔ)法來(lái)引用 MySQL 表的方式,我們使用 Catalog 語(yǔ)法,也就是下面 SQL 中的“mysql_order.order.orders”三層結(jié)構(gòu)來(lái)引用 MySQL 表的時(shí)候,就可以省略掉 Create Table 語(yǔ)句,在列比較多的情況下,這個(gè) Create Table 語(yǔ)句構(gòu)造很繁瑣也很容易出錯(cuò)。
常規(guī)情況下使用 Catalog 語(yǔ)法,我們需要提前使用 Create Catalog 語(yǔ)句對(duì) Catalog 進(jìn)行注冊(cè),左上角和下方的兩條語(yǔ)句就是 Create Catalog 語(yǔ)句。Auto Catalog 特性是在引擎中自動(dòng)解析 SQL 中的 Catalog 進(jìn)行自動(dòng)注冊(cè),這樣我們就可以省略這兩個(gè) Create Catalog 語(yǔ)句。這不僅可以提高我們的開(kāi)發(fā)效率,最主要還可以避免一些敏感信息的泄露。
省略掉這三個(gè) DDL 語(yǔ)句之后,整個(gè) SQL 就變得非常簡(jiǎn)潔了,對(duì)數(shù)據(jù)開(kāi)發(fā)和數(shù)據(jù)查詢(xún)的效率提升都非常顯著。

Auto Catalog 特性的兩個(gè)環(huán)節(jié)都需要配合底層技術(shù)的支持。
第一個(gè)是在使用 Catalog 語(yǔ)法引用庫(kù)表的環(huán)節(jié)。Catalog 語(yǔ)法雖然很簡(jiǎn)潔,但目前只有少數(shù) Connector 原生提供了相應(yīng)的 Catalog 實(shí)現(xiàn)。
這里我們用了兩個(gè)措施:
- 基于 Netflix Metacat 建設(shè)了統(tǒng)一的元數(shù)據(jù)服務(wù),我們把連接信息和賬號(hào)密碼等敏感信息都保存到 Metacat 里,這就避免了對(duì)外暴露。
- 利用 HiveCatalog 的兼容表特性,在 Flink 里變相實(shí)現(xiàn)其他數(shù)據(jù)系統(tǒng)的 Catalog。主要做法就是用 Metacat 實(shí)現(xiàn) HiveMetaStore 接口,這個(gè)做法有個(gè)缺點(diǎn),就是增加了類(lèi)型轉(zhuǎn)換的復(fù)雜度。比如原生提供的 Iceberg Catalog,它只需要關(guān)注 Iceberg 類(lèi)型與 Flink 類(lèi)型之間的雙向類(lèi)型轉(zhuǎn)換,但如果用 Metacat,類(lèi)型轉(zhuǎn)換過(guò)程就變成了原生類(lèi)型、Metacat 類(lèi)型、Hive 類(lèi)型到最后的 Flink 類(lèi)型的四重類(lèi)型轉(zhuǎn)換,復(fù)雜度顯著提升。所以建議有原生 Catalog 的情況下,盡量使用原生 Catalog。
第二個(gè)是自動(dòng)注冊(cè) Catalog 的環(huán)節(jié)。手工構(gòu)造 DDL 語(yǔ)句比較繁瑣,因?yàn)樗枰B接信息和賬號(hào)密碼,但正因如此構(gòu)造 DDL 語(yǔ)句的過(guò)程本身就包含了授權(quán)步驟。而自動(dòng)注冊(cè) Catalog 就規(guī)避掉了這個(gè)鑒權(quán)過(guò)程,所以我們引入了 Apache Ranger,它是一個(gè)安全管理的框架。我們基于 Apache Ranger 建設(shè)了統(tǒng)一權(quán)限機(jī)制,在 Flink SQL 中做了一個(gè)插件,通過(guò)在 SQL 優(yōu)化器中增加規(guī)則的方式,來(lái)實(shí)施表級(jí)別的鑒權(quán),這樣我們就可以避免用戶(hù)去訪(fǎng)問(wèn)無(wú)權(quán)限的 Catalog 或者庫(kù)表了。

集成作業(yè)是我們最主要的數(shù)據(jù)集成產(chǎn)品,用于提供異構(gòu)的數(shù)據(jù)源導(dǎo)入到 ODS 表的最佳實(shí)踐。集成作業(yè)底層引擎是基于 Flink SQL 的,但與常規(guī)的 Flink SQL 作業(yè)相比,它額外提供了三個(gè)特性:
鏡像同步,即在集成作業(yè)中整合了自動(dòng)創(chuàng)建目標(biāo)表的邏輯。在表的列非常多,或者包含一些很復(fù)雜的嵌套結(jié)構(gòu)類(lèi)型的情況下,這個(gè)特性可以節(jié)省很多工作量。
自動(dòng)同步,即在源表的表結(jié)構(gòu)發(fā)生變動(dòng)的時(shí)候,自動(dòng)將表結(jié)構(gòu)的改動(dòng)同步到目標(biāo)表中。既可以保證數(shù)據(jù)的完整性,同時(shí)也減少了人工介入。
流批一體,即保障一次作業(yè)提交就可以完成整體同步,自動(dòng)無(wú)縫銜接全量同步的批作業(yè)步驟和增量同步的流作業(yè)步驟。
這三個(gè)特性與常規(guī)的 Flink SQL 作業(yè)存在較大差異,所以我們?cè)?Flink SQL 作業(yè)的基礎(chǔ)上整合成了一個(gè)集成作業(yè)的產(chǎn)品,并進(jìn)而發(fā)展出了數(shù)據(jù)集成引擎。

MySQL 實(shí)時(shí)集成作業(yè)里我們也多加了三個(gè)額外的特性。
第一個(gè),專(zhuān)用采集賬號(hào)。MySQL 的 Binlog 采集權(quán)限是整個(gè) MySQL 實(shí)例級(jí)別的。也就是說(shuō)只要有了采集權(quán)限,就相當(dāng)于有了該實(shí)例所有 DB 的讀權(quán)限。在小米 MySQL 實(shí)例部署多個(gè) DB 的情況非常普遍,對(duì)用戶(hù)賬號(hào)直接開(kāi)放采集權(quán)限的話(huà),DBA 那邊是完全沒(méi)有辦法接受的。所以我們?nèi)匀谎赜貌杉鳂I(yè)和集成作業(yè)分開(kāi)的一個(gè)架構(gòu),中間通過(guò)消息隊(duì)列對(duì)接。
這樣采集賬號(hào)就只在采集作業(yè)中使用,采集作業(yè)里我們需要去做一個(gè)控制,即我們只向 Topic 輸出單個(gè) DB 的 Binlog。通過(guò)這個(gè)方式,將采集權(quán)限給限制在了 DB 的級(jí)別。
第二個(gè),自動(dòng)斷點(diǎn)續(xù)傳。我們與 DBA 平臺(tái)打通了獲取主從庫(kù)拓?fù)浣Y(jié)構(gòu)的接口。這樣我們就可以?xún)?yōu)先連接從庫(kù)進(jìn)行采集,在從庫(kù)失效的情況下,我們還可以嘗試獲取其他可用的從庫(kù)做自動(dòng)重連。但這里有個(gè)前提是需要 MySQL 開(kāi)啟了 GTID,GTID 是 MySQL 的全局事務(wù)標(biāo)志,在主從庫(kù)中都能保持唯一,GTID 是自動(dòng)斷點(diǎn)續(xù)傳的基礎(chǔ)。
第三個(gè),千表同步連接問(wèn)題。MySQL 實(shí)例上,如果建立的采集作業(yè)太多,就會(huì)給我們的服務(wù)造成壓力,所以我們需要盡可能復(fù)用采集作業(yè)。因?yàn)榍懊嫖覀兙吞峁┝?DB 級(jí)別的 Binlog Topic,我們就直接共享了 DB 級(jí)別的 Binlog Topic。同一個(gè) DB 上的所有表都會(huì)復(fù)用同一個(gè) Binlog Topic。
在表特別多的情況下,Binlog Topic 的消費(fèi)速度仍然有可能會(huì)成為瓶頸,因此我們?cè)谙㈥?duì)列上還增加了按表過(guò)濾的環(huán)節(jié),把消息過(guò)濾的邏輯下推到消息隊(duì)列的服務(wù)端執(zhí)行,這樣能夠有效減少網(wǎng)絡(luò)流量、提高消費(fèi)速度。
整個(gè)架構(gòu)里,我們把采集作業(yè)的部分換成了 Flink CDC,但整體仍然是以消息隊(duì)列為核心的架構(gòu)。

分庫(kù)分表中間件主要有兩種實(shí)現(xiàn)。一種是將分片規(guī)則直接下發(fā)到 Client 端,這種情況中間件對(duì)外會(huì)直接暴露分表或者分庫(kù)的名稱(chēng)。另外一種是基于代理的,對(duì)外展示為單庫(kù)單表,實(shí)際上是由代理服務(wù)去轉(zhuǎn)發(fā)請(qǐng)求到各個(gè)分庫(kù)或者分表里。
第一種分庫(kù)分表的支持相對(duì)簡(jiǎn)單一點(diǎn),我們?cè)?Catalog 語(yǔ)法中拓展了正則匹配的支持,可以顯著的提升這種場(chǎng)景的開(kāi)發(fā)效率。
第二種基于代理的中間件就會(huì)麻煩很多,代理中間件不太友好的地方,一個(gè)是在實(shí)現(xiàn)細(xì)節(jié)上和 MySQL 服務(wù)端有很多差異,另外一個(gè)是通過(guò)代理服務(wù)也無(wú)法采集它的 Binlog。所以關(guān)鍵點(diǎn)還是依賴(lài)于前面提到的元數(shù)據(jù)管理服務(wù),就是我們需要通過(guò) Metacat 去獲取它真實(shí)的拓?fù)浣Y(jié)構(gòu)。
我們現(xiàn)在的實(shí)現(xiàn)是增加一個(gè)特殊的后綴,把它真正的分庫(kù)分表的名稱(chēng)暴露出來(lái),在實(shí)際執(zhí)行的時(shí)候,SQL 語(yǔ)句會(huì)被轉(zhuǎn)換成 UNION ALL 的形式后再執(zhí)行。

上圖是 Doris 寫(xiě)入支持分區(qū)覆蓋語(yǔ)義的案例。
Doris 本身不支持 OVERWRITE 這個(gè)語(yǔ)義,但在實(shí)際場(chǎng)景中,我們有很多用戶(hù)希望使用這個(gè)特性,而 Doris 本身又有類(lèi)似的機(jī)制可以實(shí)現(xiàn)相似效果,只是目前的 Connector 還沒(méi)有支持。
我們?cè)跀?shù)據(jù)集成引擎里加了一個(gè)處理,將 OVERWRITE 這個(gè)語(yǔ)句轉(zhuǎn)換成等價(jià)于右邊的三個(gè) SQL 語(yǔ)句的操作,用 Doris 的臨時(shí)分區(qū)特性來(lái)實(shí)現(xiàn)了 OVERWRITE 的語(yǔ)義。

前面兩個(gè)例子都是把輸入的 SQL 做了一些處理,實(shí)際執(zhí)行的 SQL 是在數(shù)據(jù)集成引擎內(nèi)生成的。這個(gè)機(jī)制,我們也同樣用在了自動(dòng)同步的特性上。
這里我們抽象出了一個(gè)叫 Schema Job 的概念。Schema Job 總是基于源表的表結(jié)構(gòu),按照最佳實(shí)踐生成一個(gè)目標(biāo)表的表結(jié)構(gòu),再把目標(biāo)表的表結(jié)構(gòu)替換掉,跑完了 Schema Job 我們就可以認(rèn)為源表和目標(biāo)表的表結(jié)構(gòu)已經(jīng)保持一致了。
離線(xiàn)集成作業(yè)支持自動(dòng)同步非常簡(jiǎn)單,只需要在跑 Batch Job 之前執(zhí)行一次 Schema Job 就可以了。
實(shí)時(shí)集成作業(yè)支持自動(dòng)同步會(huì)稍微麻煩一點(diǎn),仍然是先跑一個(gè) Schema Job 把源表和目標(biāo)表的表結(jié)構(gòu)變成一致的,然后再跑起 Stream Job。當(dāng) Stream Job 退出的時(shí)候,我們需要做一個(gè)額外的判斷,如果 Stream Job 是因?yàn)?Schema 變更而退出的,我們就再調(diào)度一個(gè) Schema Job 去保持表結(jié)構(gòu)一致,然后再?lài)L試按照新的表結(jié)構(gòu)跑起 Stream Job,就這樣一直循環(huán)下去。
這里有一個(gè)細(xì)節(jié),對(duì)于數(shù)據(jù)庫(kù)場(chǎng)景,在發(fā)生 DDL 變更時(shí),通常在 CDC Connector 里可以采集到一條 DDL 消息,我們可以用這個(gè) DDL 消息觸發(fā) Stream Job 的退出。但在消息隊(duì)列場(chǎng)景里,消息體的結(jié)構(gòu)變更是不會(huì)產(chǎn)生類(lèi)似 DDL 的消息的,這個(gè)時(shí)候如果我們不做任何處理,這個(gè)作業(yè)會(huì)一直正常的執(zhí)行下去,但這些新的字段可能就被遺漏掉或者丟棄掉了。
這個(gè)時(shí)候我們就依賴(lài)一個(gè)叫做 fail-on-unknown-field 的特性,設(shè)置了這個(gè)特性之后,我們會(huì)實(shí)時(shí)檢查消息結(jié)構(gòu)體中是否有 SQL 中沒(méi)有定義的字段。當(dāng)檢測(cè)到未知字段后,我們就會(huì)令當(dāng)前的 Stream Job 失敗,嘗試觸發(fā) Schema Job 的循環(huán)。
我們?cè)诎虢Y(jié)構(gòu)化的數(shù)據(jù)接入場(chǎng)景上非常依賴(lài)這個(gè)特性,舉例一個(gè)非常經(jīng)典的場(chǎng)景:
很多業(yè)務(wù)的后端團(tuán)隊(duì)和數(shù)據(jù)團(tuán)隊(duì)在組織架構(gòu)上是分開(kāi)的,中間通過(guò)消息隊(duì)列做數(shù)據(jù)對(duì)接。這種場(chǎng)景里,消息的生產(chǎn)端是后端團(tuán)隊(duì)在負(fù)責(zé),消費(fèi)端是數(shù)據(jù)團(tuán)隊(duì)自己建作業(yè)去消費(fèi),消息體很多情況下就是某個(gè)核心領(lǐng)域模型的 JSON。
在很多情況下,后端團(tuán)隊(duì)更新領(lǐng)域模型后,數(shù)據(jù)團(tuán)隊(duì)是不知道的。不做額外處理的情況下,F(xiàn)link SQL 作業(yè)會(huì)一直正常執(zhí)行并忽略消息體中的新字段,甚至在開(kāi)啟了 ignore-parse-errors 特性時(shí)可能導(dǎo)致整個(gè)消息都被丟棄。在這個(gè)場(chǎng)景里我們就可以用 fail-on-unknown-field 特性將作業(yè)主動(dòng)失敗掉,然后提示用戶(hù)更新消息體的 Schema。
基于 fail-on-unknown-field 特性實(shí)施 Schema Evolution 有兩個(gè)前提,第一個(gè)是消息體結(jié)構(gòu)變更不會(huì)特別頻繁,第二個(gè)是消息體結(jié)構(gòu)變更本身是能夠向前兼容的。如果不滿(mǎn)足這兩個(gè)前提,這套方案的可靠性就有很大的隱患。
這種情況下我們需要回歸到 Schema On Read 的高可靠性方案,也就是基于 Hive/HDFS 的方案。Hive 本身有一套非常成熟的 Schema On Read 的工具包,Schema On Read 在寫(xiě)鏈路上不需要解析消息的結(jié)構(gòu),直接把整個(gè)消息體按行存的格式寫(xiě)入到 HDFS 文件上,只在讀這些文件的時(shí)候才需要用到 Schema 去嘗試解析。這樣即使我們的 Schema 與消息體不匹配,也只是影響解析出來(lái)的數(shù)據(jù),原始數(shù)據(jù)本身是不會(huì)丟失的。
比較可惜的是,目前的幾個(gè)主流數(shù)據(jù)湖技術(shù)都是基于列式存儲(chǔ)的,沒(méi)有現(xiàn)成可用的 Schema On Read 方案,這也是我們后期可能要去拓展的一個(gè)方向。

這里再分享一下 TiDB 百億級(jí)單表實(shí)時(shí)集成的案例。
TIDB 是一款非常優(yōu)秀的國(guó)產(chǎn)分布式開(kāi)源數(shù)據(jù)庫(kù),從數(shù)據(jù)集成的角度來(lái)看,它有兩個(gè)非常顯著的特點(diǎn):第一是單表的數(shù)據(jù)量能夠支持非常大的規(guī)模,可以上到百億行/數(shù)十 TB 的規(guī)模;第二是支持快照機(jī)制,這對(duì)流批一體是非常友好的特性。
我們?cè)?TIDB 實(shí)時(shí)集成的開(kāi)發(fā)過(guò)程中,碰到的主要困難都是在全量同步步驟中寫(xiě)入 Iceberg 的過(guò)程發(fā)生的。這里最主要的問(wèn)題是,Iceberg 的 Flink Connector 實(shí)現(xiàn)只提供了 Stream Writer,Stream Writer 在數(shù)據(jù)量巨大的批處理場(chǎng)景下的性能比較差,我們主要做了兩個(gè)優(yōu)化。
上圖展示的是 write-distribution-mode 的優(yōu)化,從上圖可以看到集成作業(yè)的邏輯非常簡(jiǎn)單,作業(yè)通過(guò) TableSourceScan 從 TIDB 讀數(shù)據(jù),再通過(guò) IcebergStreamWriter 往 Iceberg 里寫(xiě)數(shù)據(jù)。TableSourceScan 在讀到數(shù)據(jù)之后,怎么把數(shù)據(jù)發(fā)送給 IcebergStreamWriter 呢?這里就是 Iceberg 的 write-distribution-mode 的配置。
目前有兩種模式,左上方是 None 模式,這個(gè)模式里 Writer 不占用單獨(dú)的 stage,而是直接在 TableSourceScan 的 TaskManager 上寫(xiě)入 Iceberg 中。這個(gè)模式少了一個(gè) shuffle 階段,如果 TableSourceScan 的數(shù)據(jù)分布比較均勻,它的入湖速度就會(huì)非常快。但因?yàn)?Iceberg 每個(gè) Writer 寫(xiě)入每個(gè)分區(qū)的時(shí)候都會(huì)產(chǎn)生一批寫(xiě)入文件,這樣寫(xiě)入文件數(shù)量就等于 Partition 數(shù)量乘以 Writer 的數(shù)量。當(dāng)表規(guī)模很大的時(shí)候就會(huì)產(chǎn)生大量的小文件,對(duì) Compaction 和 HDFS NameNode 造成很大的壓力。
左下方是 Hash 模式,這個(gè)模式專(zhuān)門(mén)為小文件數(shù)量做優(yōu)化,保證每個(gè) Partition 只能由一個(gè) Writer 寫(xiě)入。但分配 Partition 到 Writer 時(shí)是用的哈希算法進(jìn)行分配,因?yàn)?Partition 的數(shù)量本身就非常少,用哈希算法去分配的時(shí)候,幾乎無(wú)法避免的會(huì)產(chǎn)生數(shù)據(jù)傾斜的問(wèn)題。
這兩種模式在流場(chǎng)景下表現(xiàn)都很不錯(cuò),但是在 TIDB 的全量同步的過(guò)程中,它的問(wèn)題就會(huì)被放大到令我們無(wú)法接受。所以我們就引入了 RoundRobin 模式,主要還是在哈希模式的基礎(chǔ)上去解決數(shù)據(jù)傾斜的問(wèn)題。
我們分析了幾個(gè)最常見(jiàn)的分區(qū)函數(shù),通過(guò)設(shè)定一個(gè)特殊排序,按照順序逐個(gè)把 Partition 分配到 Writer,來(lái)確保 Partition 與 Writer 的均衡。這里用到了 PartitionCustom 的分區(qū)方法,通過(guò)自定義的 Partitioner 對(duì)分區(qū)進(jìn)行匹配,目前完成適配的分區(qū)函數(shù)如下:
Bucket 分區(qū)函數(shù),只需要將 bucket_id 按 Writer 數(shù)量取模就可以達(dá)到理論上最好的均衡效果。
Truncate 分區(qū)函數(shù),只能支持?jǐn)?shù)值型,用分區(qū)名稱(chēng)除以分區(qū)函數(shù)的寬度,就可以得到一個(gè)連續(xù)的整數(shù)值,再按 Writer 數(shù)量取模即可,這個(gè)方式在常見(jiàn)場(chǎng)景和合理配置下可以達(dá)到最優(yōu)的均衡效果,但如果寬度設(shè)置過(guò)大,反而可能導(dǎo)致數(shù)據(jù)被集中在少數(shù) Writer 中。
Identity 分區(qū)函數(shù),只能支持?jǐn)?shù)值型,將分區(qū)名稱(chēng)代表的數(shù)值取整,再按 Writer 數(shù)量取模,當(dāng)分區(qū)名稱(chēng)連續(xù)變化時(shí)效果比較好,分區(qū)名稱(chēng)是離散值時(shí)效果較差。
RoundRobin 模式在常見(jiàn)場(chǎng)景的效果非常顯著,實(shí)際測(cè)出來(lái)的性能相比前兩者能有三倍的提升。

上圖左邊展示的是 Iceberg 實(shí)現(xiàn) Row Level Delete 的核心邏輯。Iceberg 有個(gè) Delete Storage 來(lái)緩存 Checkpoint 期間的所有新增操作,更新和刪除操作會(huì)根據(jù) Delete Storage 中是否有相應(yīng)的記錄,決定是寫(xiě)入到 eq-delete-file 還是 pos-delete-file,正常情況下 Iceberg 一次 Checkpoint 會(huì)提交三個(gè)數(shù)據(jù)文件。
在做大表的全量同步時(shí),Delete Storage 經(jīng)常緩存了太多數(shù)據(jù)觸發(fā) OOM,我們最終決定在全量同步的階段跳過(guò) Delete Storage 的步驟,因?yàn)槿客诫A段只有新增,沒(méi)有更新和刪除,實(shí)際上用不到 Delete Storage。
這個(gè)效果非常顯著,我們終于成功的支持了百億級(jí)別單表的全量同步。

但跳過(guò) Delete Storage 會(huì)帶來(lái)一個(gè)問(wèn)題,增量同步過(guò)程是必須依賴(lài) Delete Storage 的,這就導(dǎo)致全量同步和增量同步無(wú)法一起執(zhí)行,基于 HybridSource 的方案就不適合使用這個(gè)優(yōu)化措施了。
我們只能將 TIDB 實(shí)時(shí)集成作業(yè)拆分成兩個(gè)單獨(dú)的作業(yè):Batch Job 和 Stream Job。
Batch Job 基于快照讀取 TiDB 的全量數(shù)據(jù),它會(huì)配置前面說(shuō)的各個(gè)優(yōu)化項(xiàng),并行度設(shè)置也會(huì)大一點(diǎn)。
Batch Job 執(zhí)行完成后,再執(zhí)行 Stream Job,Stream Job 從消息隊(duì)列中按照快照時(shí)間點(diǎn)接上全量同步的進(jìn)度,繼續(xù)消費(fèi) CDC 事件執(zhí)行實(shí)時(shí)同步的步驟,這個(gè)階段的配置是單獨(dú)優(yōu)化的,并行度也會(huì)設(shè)置的相對(duì)小一些。
我們將這兩個(gè)作業(yè)的調(diào)動(dòng)邏輯放在 Flink Application 中實(shí)施,這樣在用戶(hù)層面看起來(lái)就只調(diào)度了一個(gè)作業(yè),但在實(shí)際執(zhí)行的過(guò)程中,F(xiàn)link Application 會(huì)按需調(diào)度不同的 Flink Job。

這里重溫一下 Flink Application 這個(gè)概念,在 Flink on Yarn 模式中,F(xiàn)link 作業(yè)的 jar 包提交到 Yarn 集群后,在 main 方法中跑的邏輯就是 Flink Application。Flink Application 跑在 JobManager 的節(jié)點(diǎn)上,但邏輯上仍然是兩個(gè)獨(dú)立的模塊。而且我們可以在 Flink Application 中提交多個(gè) Flink Job。
目前我們?cè)?Flink Application 中是串行調(diào)度各個(gè) Flink Job,這樣在狀態(tài)恢復(fù)的時(shí)候就會(huì)比較簡(jiǎn)單,因?yàn)槊看沃挥幸粋€(gè) Flink Job 需要恢復(fù)。從本質(zhì)上來(lái)說(shuō),外部調(diào)度 Flink Job 也能達(dá)到完全一致的效果,只是 Flink Application 中剛好有一個(gè)比較合適的地方可以放這些邏輯。
小米數(shù)據(jù)集成引擎的核心邏輯就是跑在 Flink Application 中的。
三、引擎設(shè)計(jì)

上圖是小米數(shù)據(jù)集成引擎的總體架構(gòu)圖,目前稱(chēng)這個(gè)引擎為 MIDI(Mi Data Integration)。MIDI 的核心邏輯都跑在 Flink Application 中,F(xiàn)link Application 會(huì)在適當(dāng)?shù)臅r(shí)間調(diào)度三種作業(yè):Batch Job、Stream Job、Schema Job。
MIDI 的輸入我們稱(chēng)之為 MIDI SQL,是在 Flink SQL 的基礎(chǔ)上增加了一些自定義語(yǔ)法,MIDI SQL 目前只支持簡(jiǎn)單的數(shù)據(jù)集成場(chǎng)景。
從 MIDI SQL 中我們會(huì)解析出三張表,Source Table 是上圖最左邊源數(shù)據(jù)系統(tǒng)中的表,Sink Table 是右邊的目標(biāo)數(shù)據(jù)系統(tǒng)中的表,Middle Table 是下方的長(zhǎng)條,代表的是包含源表 CDC 事件的 Topic。此外還有一個(gè)叫做 Application State Backend 的概念,主要用來(lái)記錄 Flink Job 的執(zhí)行情況。

上圖是一個(gè)典型的數(shù)據(jù)庫(kù)實(shí)時(shí)集成作業(yè)的時(shí)序圖,包含了自動(dòng)同步和流批一體的特性。
MIDI 首先會(huì)調(diào)度一個(gè) Schema Job 去保證源表和目標(biāo)表的表結(jié)構(gòu)完全一致,然后生成 Batch SQL 并調(diào)度一個(gè) Batch Job 來(lái)執(zhí)行全量同步的步驟。全量同步步驟采用批模式執(zhí)行,并行度設(shè)置相對(duì)高一些。
之后 MIDI 會(huì)獲取全量同步的進(jìn)度點(diǎn),然后按進(jìn)度點(diǎn)生成對(duì)應(yīng)的 Stream SQL,并調(diào)度 Stream Job 接上之前的進(jìn)度繼續(xù)執(zhí)行實(shí)時(shí)的增量同步的步驟。實(shí)時(shí)同步階段采用流模式執(zhí)行,并行度設(shè)置相對(duì)會(huì)低一點(diǎn)。
MIDI 執(zhí)行完每一個(gè) Flink Job 都會(huì)記錄一個(gè)執(zhí)行日志,也就是 Flink Application State Backend 的作用。它實(shí)際上就是一個(gè)文本文件,與 Checkpoint 目錄放在一起。當(dāng)作業(yè)中斷后再恢復(fù)的時(shí)候,MIDI 會(huì)先從執(zhí)行日志里找到當(dāng)時(shí)正在跑的那個(gè) Flink Job,再去執(zhí)行相應(yīng)的恢復(fù)動(dòng)作。
再回到時(shí)序圖里。如果源表 Schema 沒(méi)有任何變更,Stream Job 跑起來(lái)之后會(huì)一直執(zhí)行。當(dāng)源表發(fā)生了 Schema 變更,就會(huì)觸發(fā)作業(yè)退出,然后進(jìn)入一個(gè)循環(huán)。我們會(huì)嘗試調(diào)度 Schema Job 來(lái)完成表結(jié)構(gòu)同步,再基于新的表結(jié)構(gòu)調(diào)度 Stream Job。這樣我們就能始終保持當(dāng)前正在跑的 Stream Job,一定是以最新的表結(jié)構(gòu)在進(jìn)行同步。
在這個(gè)設(shè)計(jì)思路下,Stream Job 成為了一種特殊的有界流,Stream Job 的生命周期與它執(zhí)行的 Stream SQL 的 Schema 是強(qiáng)綁定的,Schema 失效后,Stream Job 也就相應(yīng)的結(jié)束退出了。

這是我們定義的 MIDI-SQL,主要引入三個(gè)自定義語(yǔ)法:Auto、Stream、Stream With。
Auto 語(yǔ)法是用來(lái)開(kāi)啟自動(dòng)同步的,它必須與“Select *”共同使用。在實(shí)際執(zhí)行的時(shí)候,“Auto *”會(huì)被替換為當(dāng)前最新的表結(jié)構(gòu)字段。
Stream 語(yǔ)法其實(shí)早就被 Flink 拋棄了,現(xiàn)在 Flink SQL 語(yǔ)法上并不區(qū)分批作業(yè)和流作業(yè),主要以 Source 是否為有界流來(lái)確定執(zhí)行模式。但 MIDI 因?yàn)槭褂昧?Catalog 語(yǔ)法來(lái)引用庫(kù)表,同樣的 SQL 語(yǔ)句,使用 CDC Connector 時(shí)就是流模式,使用 JDBC Connector 時(shí)就是批模式,我們無(wú)法通過(guò) SQL 語(yǔ)句區(qū)分兩種情況,所以就把 Stream 語(yǔ)法又加回來(lái)了,用來(lái)判斷執(zhí)行模式,在 Catalog 中選擇不同的 Connector。
Stream With 語(yǔ)法是用來(lái)實(shí)施流批一體的,MIDI 的流批一體方案還是基于消息隊(duì)列的,我們用這個(gè)語(yǔ)法將消息隊(duì)列與源表關(guān)聯(lián)起來(lái)。
四、未來(lái)規(guī)劃

未來(lái)我們將對(duì)上圖提到的幾點(diǎn)進(jìn)行探索,這里重點(diǎn)提三個(gè)點(diǎn):
Schema On Read 場(chǎng)景支持:基于 Flink 和數(shù)據(jù)湖的方案更適合的是結(jié)構(gòu)化數(shù)據(jù)集成的場(chǎng)景,在半結(jié)構(gòu)化和非結(jié)構(gòu)化場(chǎng)景里,Schema On Read 仍然是一個(gè)最佳實(shí)踐,未來(lái)我們希望繼續(xù)探索如何在數(shù)據(jù)湖技術(shù)上提供 Schema On Read 的支持。
智能數(shù)據(jù)補(bǔ)償:這是我們嘗試增加的第四種 Flink Job,我們希望定時(shí)執(zhí)行這個(gè)步驟,自動(dòng)的增量的對(duì)源表和目標(biāo)表的數(shù)據(jù)做比對(duì)和補(bǔ)償。
引擎特性打磨:MIDI 目前仍然在比較初期的階段,很多特性還需要打磨,目前正在整理部分特性反饋到社區(qū)共同建設(shè)。