Flink?CDC?在新能源制造業(yè)的實(shí)踐

摘要:本文撰寫(xiě)自某新能源企業(yè)的研發(fā)工程師 單葛堯 老師。本文詳細(xì)介紹該新能源企業(yè)的大數(shù)據(jù)平臺(tái)中 CDC 技術(shù)架構(gòu)選型和 Flink CDC 的最佳實(shí)踐。主要有以下幾個(gè)內(nèi)容:

  1. CDC 方案選型
  1. 方案落地實(shí)施
  1. 平臺(tái)的優(yōu)越性
  1. 后續(xù)規(guī)劃

我們是一家專注于新能源動(dòng)力電池制造的企業(yè),致力于推動(dòng)能源技術(shù)的發(fā)展與應(yīng)用。作為一家具有多年行業(yè)經(jīng)驗(yàn)的企業(yè),我們?cè)谛履茉搭I(lǐng)域積累了深厚的技術(shù)實(shí)力和市場(chǎng)認(rèn)知,業(yè)務(wù)涵蓋了新能源產(chǎn)業(yè)鏈的關(guān)鍵環(huán)節(jié)。從上游的裝備制造到下游的應(yīng)用解決方案,為客戶提供了全方位的服務(wù)。

隨著企業(yè)業(yè)務(wù)的不斷發(fā)展,對(duì)數(shù)據(jù)實(shí)時(shí)性的要求日益提高。如何管理復(fù)雜的 Flink 任務(wù),提高任務(wù)的開(kāi)發(fā)效率,降低任務(wù)的運(yùn)維難度,成為我們亟待解決的問(wèn)題。接下來(lái),我將介紹在我們團(tuán)隊(duì)在新能源制造業(yè)中大數(shù)據(jù)平臺(tái) CDC 技術(shù)架構(gòu)的選型和 Flink CDC 的最佳實(shí)踐。

一、CDC 方案選型

目前,我們團(tuán)隊(duì)規(guī)劃的數(shù)據(jù)平臺(tái)架構(gòu)如上圖所示,數(shù)據(jù)源這一層主要以 Oracle 為主,且跨網(wǎng)絡(luò)的分散在各個(gè)基地,考慮到數(shù)據(jù)源支持和時(shí)效性,我們需要支持 Oracle CDC 數(shù)據(jù)采集,結(jié)合我們的業(yè)務(wù)場(chǎng)景,我們調(diào)研了實(shí)時(shí)采集的如下方案:

如上圖所示,從 CDC 機(jī)制、增量同步、斷點(diǎn)續(xù)傳、全量同步、全量+增量、架構(gòu)、數(shù)據(jù)計(jì)算、生態(tài)這八個(gè)方面做了對(duì)比,可以看出其中的佼佼者主要是 Flink CDC 和 Oracle OGG 以及 Debezium。

首先,讓我們了解這三種技術(shù)的基本概況。Flink CDC 是 Apache Flink 的一個(gè)官方子項(xiàng)目,它利用流處理能力來(lái)捕獲和轉(zhuǎn)換源數(shù)據(jù)庫(kù)的變更事件。OGG 是 Oracle提供的一種成熟的商業(yè)化 CDC 技術(shù),廣泛用于數(shù)據(jù)復(fù)制和實(shí)時(shí)數(shù)據(jù)分析。Debezium則是一個(gè)開(kāi)源的分布式 CDC 系統(tǒng),支持多種數(shù)據(jù)庫(kù)和消息中間件,易于擴(kuò)展和集成。

從實(shí)現(xiàn)機(jī)制上來(lái)看,F(xiàn)link CDC 依賴于數(shù)據(jù)庫(kù)的日志或 redo log來(lái)捕捉變更,并通過(guò)Flink的流處理引擎進(jìn)行后續(xù)處理。它的優(yōu)勢(shì)在于與 Flink 生態(tài)的無(wú)縫整合,為批處理和流處理提供了統(tǒng)一的解決方案。此外,F(xiàn)link CDC 支持豐富的轉(zhuǎn)換和處理功能,如窗口操作、狀態(tài)管理和時(shí)間處理,這對(duì)于復(fù)雜的實(shí)時(shí)分析場(chǎng)景非常有益。然而,它的局限性也較為明顯,即對(duì)源數(shù)據(jù)庫(kù)的依賴性較強(qiáng),需要特定的日志格式或訪問(wèn)權(quán)限。

OGG 作為一種成熟的商業(yè)產(chǎn)品,提供了強(qiáng)大的數(shù)據(jù)捕獲和傳輸能力。它通過(guò)解析源數(shù)據(jù)庫(kù)的日志文件來(lái)捕獲變更,并能夠在不同的系統(tǒng)之間高效地傳輸數(shù)據(jù)。OGG 的優(yōu)勢(shì)在于其穩(wěn)定性和廣泛的兼容性,尤其是在處理大規(guī)模數(shù)據(jù)和高并發(fā)場(chǎng)景時(shí)表現(xiàn)出色。但是,OGG 的成本較高,且對(duì)于開(kāi)源生態(tài)系統(tǒng)的支持不如其他方案那么豐富。

Debezium 則采用了不同的方法,它直接連接到數(shù)據(jù)庫(kù),通過(guò)訂閱 binlog 或 redo log 來(lái)捕獲變更事件。Debezium 的特點(diǎn)是其靈活性和易用性,支持多種數(shù)據(jù)庫(kù)類型,并且可以方便地與 Kafka 等消息中間件集成。這使得 Debezium 非常適合于多源異構(gòu)環(huán)境下的數(shù)據(jù)同步和實(shí)時(shí)分析。但是,Debezium 的性能可能不如Flink CDC和 OGG 那樣出色,尤其是在處理大量數(shù)據(jù)時(shí)可能需要更多的優(yōu)化工作。

在應(yīng)用場(chǎng)景方面,F(xiàn)link CDC 適合于需要復(fù)雜事件處理和流式分析的場(chǎng)景,尤其是那些已經(jīng)采用Flink作為流處理平臺(tái)的組織。OGG 則更適合于對(duì)穩(wěn)定性和性能有極高要求的大型或關(guān)鍵任務(wù)應(yīng)用。而 Debezium 由于其靈活性和易用性,適用于需要快速開(kāi)發(fā)和部署CDC解決方案的場(chǎng)景,尤其是面對(duì)多樣化的數(shù)據(jù)源和技術(shù)棧時(shí)。

此外,考慮到 OGG 成本高,鏈路長(zhǎng),且文檔資源少,而 Debezium 配置復(fù)雜,而且在后續(xù)大數(shù)據(jù)流量或存在復(fù)雜的計(jì)算邏輯時(shí)難以為繼,最終我們選擇了 Flink CDC 作為 Oracle 的實(shí)時(shí)采集的工具方案。

二、方案落地實(shí)施:

我們團(tuán)隊(duì)在 Flink CDC 中的實(shí)施落地主要分為以下幾步:

  1. 確保 Oracle 日志歸檔模式開(kāi)啟,開(kāi)通 Oracle 整庫(kù)補(bǔ)充日志,開(kāi)通表級(jí)別的全列補(bǔ)充日志。

  2. 賦予操作用戶各類權(quán)限,如表的操作權(quán)限,logminer 與 archive log 相關(guān)視圖查詢權(quán)限等。

  3. 開(kāi)啟跨基地網(wǎng)絡(luò)權(quán)限,打通 Hadoop 集群對(duì) Oracle 的訪問(wèn)權(quán)限。

  4. 通過(guò)平臺(tái)編寫(xiě) Flink SQL 任務(wù),將 Oracle 的數(shù)據(jù)直接存入到 Doris 中,下圖為線上作業(yè)效果圖。

開(kāi)發(fā)的過(guò)程中雖然整體順利,但也遇到了幾個(gè)小插曲,在此與各位讀者一起分享

  1. 該配置開(kāi)啟了 Oracle 日志在線挖掘模式,如果沒(méi)有這個(gè)設(shè)置,會(huì)導(dǎo)致生產(chǎn)環(huán)境默認(rèn)策略讀取 log 較慢,且默認(rèn)策略會(huì)寫(xiě)入數(shù)據(jù)字典信息到 redo log 中導(dǎo)致日志量增加較多。但該配置也有缺點(diǎn),不寫(xiě)入數(shù)據(jù)字典到 redo log 中,會(huì)導(dǎo)致無(wú)法處理 DDL 語(yǔ)句。
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
  1. 我們?cè)谕?Oracle11g 的數(shù)據(jù)時(shí),遇到了報(bào)錯(cuò)
Caused by: io.debezium.DebeziumException: Supplemental logging not configured for table xxx. Use command: ALTER TABLE xxx ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS

該錯(cuò)誤是因?yàn)閷?duì)于 Oracle11 版本,Debezium 會(huì)默認(rèn)把 tableIdCaseInsensitive 設(shè)置為 true, 導(dǎo)致表名被更新為小寫(xiě),因此在 Oracle 中查詢不到 這個(gè)表補(bǔ)全日志設(shè)置,導(dǎo)致誤報(bào)這個(gè) Supplemental logging not configured for table 錯(cuò)誤”,我們只需要開(kāi)啟如下設(shè)置即可

'debezium.database.tablename.case.insensitive'='false'
  1. 我們的任務(wù)在運(yùn)行一段時(shí)間后報(bào)錯(cuò)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.DataException: file is not a valid field name

這是因?yàn)?schema change event parsing 導(dǎo)致的問(wèn)題,已經(jīng)在3.1版本得到了解決,下載3.1版本的包即可,詳見(jiàn)Flink CDC 項(xiàng)目的 PR #2315。

  1. Flink Oracle CDC connector在采集分區(qū)表的時(shí)候存在問(wèn)題,報(bào)錯(cuò)為
Caused by: io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot

這是由于分區(qū)表未能被成功掃描到,我們進(jìn)入到源碼的類 OracleConnectionUtils 中可以看到

String queryTablesSql = "SELECT OWNER ,TABLE_NAME,TABLESPACE_NAME FROM ALL_TABLES WHERE TABLESPACE_NAME IS NOT NULL AND TABLESPACE_NAME NOT IN ('SYSTEM','SYSAUX') AND NESTED = 'NO' AND TABLE_NAME NOT IN (SELECT PARENT_TABLE_NAME FROM ALL_NESTED_TABLES)";

其中如果需要加入分區(qū)表的話,要在where條件中加入 PARTITIONED = 'YES',重新編譯打包即可解決該問(wèn)題。

當(dāng)然寫(xiě)入Doris的速度也可能存在慢的現(xiàn)象,特別是 StreamLoad 中進(jìn)程 stopped 與返回 loadResult 結(jié)果之間時(shí)間較長(zhǎng)時(shí)的問(wèn)題,得益于 Apache Doris 社區(qū)的幫助,我們總結(jié)以下幾點(diǎn)優(yōu)化方案:

  1. 看BE的資源是不是滿了,檢查 IO、CPU、內(nèi)存

  2. 換成 2pc 導(dǎo)?,? 2pc 會(huì)等待 publish,2pc 導(dǎo)? publish 是異步的

  3. mow 嘗試換成 mor 表

  4. show proc '/transactions' 看看事務(wù)的狀態(tài),如果都是 committed,說(shuō)明 publish 卡住了,如果 io 不?,可以增加 publish 的線程和超時(shí)

publish_version_timeout_second=60
publish_version_worker_count=16
  1. 增加 be.conf 的配置 ,適當(dāng)提?下?的參數(shù)的配置
doris_scanner_thread_pool_thread_num=1024
webserver_num_workers=1024
fragment_pool_queue_size=4096
fragment_pool_thread_num_max=2048
doris_max_remote_scanner_thread_pool_thread_num=2048

三、平臺(tái)的優(yōu)越性:

此外 Apache StreamPark 也給我們帶來(lái)了很多驚喜,在開(kāi)發(fā) FlinkSQL 任務(wù)的過(guò)程中,借助 StreamPark,極大的提升了開(kāi)發(fā)時(shí)的效率,我羅列了我們開(kāi)發(fā)團(tuán)隊(duì)覺(jué)得非常優(yōu)秀的點(diǎn):

1. 多版本Flink管理

在開(kāi)發(fā)過(guò)程中,可能會(huì)因?yàn)樾枨驠link新版本的某項(xiàng)能力,或是某些組件對(duì) Flink版本的強(qiáng)需求而引入多版本的 Flink,在 StreamPark 中,只需要配置一個(gè) Flink 的路徑并為其命名,即可在開(kāi)發(fā)任務(wù)中輕松地選擇我所需要的 Flink 版本。

2. 對(duì)于Git的集成

我們?cè)谝?Apache Dolphinscheduler 時(shí),曾多次接到數(shù)倉(cāng)團(tuán)隊(duì)表明調(diào)度系統(tǒng)未能接入版本控制的遺憾,但在引入實(shí)時(shí)平臺(tái)時(shí),這個(gè)遺憾被彌補(bǔ)了。StreamPark 的 Git 式項(xiàng)目管理,極大的簡(jiǎn)化了我們?cè)诰帉?xiě) JAR 后復(fù)雜的部署鏈路,一鍵編譯、部署、配置,一氣呵成。

3. 強(qiáng)大的作業(yè)運(yùn)維能力及告警機(jī)制

StreamPark 有著完善的作業(yè)操作記錄,可以清晰的記錄所有任務(wù) Release、Start、Cancel 的記錄,對(duì)于 yarn application 形式的任務(wù),記錄了每次運(yùn)行的 appId,便于后續(xù)的 yarn 日志翻閱、運(yùn)行狀態(tài)的監(jiān)測(cè)等等。一旦報(bào)錯(cuò),StreamPark 會(huì)迅速發(fā)送告警,如果配置了重啟機(jī)制,可以實(shí)現(xiàn)分鐘級(jí)的重啟效率。

4. Flink SQL 在線開(kāi)發(fā)

StreamPark 的類 IDE 樣式的開(kāi)發(fā)窗口相當(dāng)優(yōu)越,可以幫助我們檢查低級(jí)的語(yǔ)法錯(cuò)誤,簡(jiǎn)單的輕松地便可以配置各類想要配置的參數(shù),如分配 JM、TM 的資源,配置預(yù)備提交的 yarn 隊(duì)列,它的作業(yè)依賴配置更是相當(dāng)簡(jiǎn)易,提供了填寫(xiě) pom 和上傳 jar 包兩種形式,再也無(wú)需將包傻傻地通過(guò)hdfs上傳,極大地提升了我們的開(kāi)發(fā)效率。

四、后續(xù)規(guī)劃:

感謝 Flink CDC,作為 Apache Flink 社區(qū)的重點(diǎn)項(xiàng)目,它提供給了我們開(kāi)箱即用實(shí)時(shí)數(shù)據(jù)采集方案,極大的簡(jiǎn)化了 CDC 的實(shí)施過(guò)程,也規(guī)避了 OGG 昂貴的使用費(fèi)用。

不過(guò)目前 Flink CDC Oracle Connector 也存在一些性能問(wèn)題,眾所周知,Oracle CDC依賴Debezium組件解析 Redo Log 與 Archive Log,Debezium 通過(guò)Oracle 的 Logminer 解析 Log。最大的問(wèn)題就在于 LogMiner 的性能限制, 熟悉 Oracle 的開(kāi)發(fā)者會(huì)知道,LogMiner是運(yùn)行在 Oracle 內(nèi)部, 并且運(yùn)行在日志落地之后, 不可避免地需要消耗數(shù)據(jù)庫(kù)的算力去完成工作, 為了降低這個(gè)不在主流程的進(jìn)程的資源消耗, Oracle 對(duì) LogMiner 做了非常嚴(yán)格的資源限制, 每個(gè) LogMiner 進(jìn)程, 他的資源消耗都不能超過(guò) 1 個(gè) cpu 核心, 在大多數(shù)場(chǎng)景下, 這個(gè)將 LogMiner 的日志解析速度限制在1w條每秒以下, 在一些制造業(yè)的場(chǎng)景下, 這個(gè)速度是遠(yuǎn)遠(yuǎn)不夠的, Oracle 是一個(gè)事務(wù)數(shù)據(jù)庫(kù), 一個(gè)大的 Update, 可能會(huì)帶來(lái)數(shù)十萬(wàn)上百萬(wàn)的更新, 在這種情況下, 每秒1w的解析速度會(huì)使得下游延遲增大到數(shù)分鐘級(jí)別, 更糟糕的是, 在數(shù)據(jù)庫(kù)本身負(fù)載較高的情況下, 由于 LogMiner 的解析與數(shù)據(jù)庫(kù)共享負(fù)載, 會(huì)讓解析速度進(jìn)一步下降。

后續(xù)團(tuán)隊(duì)考慮通過(guò) Oracle 的機(jī)制, 將 redo log 異步傳輸?shù)搅硗庖慌_(tái)沒(méi)有業(yè)務(wù)壓力的 Oracle 實(shí)例上, 然后在另外一臺(tái)機(jī)器上開(kāi)啟并發(fā)切分 scn range 事件,開(kāi)啟 LogMiner 線程并發(fā)的解析對(duì)應(yīng) scn range 事件,順序的處理獲取到的事件。


歡迎大家多多關(guān)注 Flink CDC,從釘釘用戶交流群[1]、微信公眾號(hào)[2]、Slack 頻道[3]、郵件列表[4]加入 CDC 用戶社區(qū),以及在 Flink CDC GitHub 倉(cāng)庫(kù)[5]上參與代碼貢獻(xiàn)!


[1] “ Flink CDC 社區(qū) ② 群”群的釘釘群號(hào):80655011780

[2] ” Flink CDC 公眾號(hào)“的微信號(hào):ApacheFlinkCDC

[3] https://flink.apache.org/what-is-flink/community/#slack

[4] https://flink.apache.org/what-is-flink/community/#mailing-lists

[5] https://github.com/apache/flink-cdc

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容