https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html
Exactly-Once 處理
MySQL CDC 連接器是一個(gè) Flink Source 連接器,它將首先讀取表快照塊,然后繼續(xù)讀取 binlog, 無(wú)論是在快照階段還是讀取 binlog 階段,MySQL CDC 連接器都會(huì)在處理時(shí)準(zhǔn)確讀取數(shù)據(jù),即使任務(wù)出現(xiàn)了故障。
啟動(dòng)模式
配置選項(xiàng)scan.startup.mode指定 MySQL CDC 使用者的啟動(dòng)模式。有效枚舉包括:
● initial (默認(rèn)):在第一次啟動(dòng)時(shí)對(duì)受監(jiān)視的數(shù)據(jù)庫(kù)表執(zhí)行初始快照,并繼續(xù)讀取最新的 binlog。
● earliest-offset:跳過(guò)快照階段,從可讀取的最早 binlog 位點(diǎn)開(kāi)始讀取
● latest-offset:首次啟動(dòng)時(shí),從不對(duì)受監(jiān)視的數(shù)據(jù)庫(kù)表執(zhí)行快照, 連接器僅從 binlog 的結(jié)尾處開(kāi)始讀取,這意味著連接器只能讀取在連接器啟動(dòng)之后的數(shù)據(jù)更改。
● specific-offset:跳過(guò)快照階段,從指定的 binlog 位點(diǎn)開(kāi)始讀取。位點(diǎn)可通過(guò) binlog 文件名和位置指定,或者在 GTID 在集群上啟用時(shí)通過(guò) GTID 集合指定。
● timestamp:跳過(guò)快照階段,從指定的時(shí)間戳開(kāi)始讀取 binlog 事件。
使用 SQL:
CREATE TABLE mysql_source (...) WITH (
'connector' = 'mysql-cdc',
'scan.startup.mode' = 'earliest-offset', -- 從最早位點(diǎn)啟動(dòng)
'scan.startup.mode' = 'latest-offset', -- 從最晚位點(diǎn)啟動(dòng)
'scan.startup.mode' = 'specific-offset', -- 從特定位點(diǎn)啟動(dòng)
'scan.startup.mode' = 'timestamp', -- 從特定位點(diǎn)啟動(dòng)
'scan.startup.specific-offset.file' = 'mysql-bin.000003', -- 在特定位點(diǎn)啟動(dòng)模式下指定 binlog 文件名
'scan.startup.specific-offset.pos' = '4', -- 在特定位點(diǎn)啟動(dòng)模式下指定 binlog 位置
'scan.startup.specific-offset.gtid-set' = '24DA167-0C0C-11E8-8442-00059A3C7B00:1-19', -- 在特定位點(diǎn)啟動(dòng)模式下指定 GTID 集合
'scan.startup.timestamp-millis' = '1667232000000' -- 在時(shí)間戳啟動(dòng)模式下指定啟動(dòng)時(shí)間戳
...
)
注意:
- MySQL source 會(huì)在 checkpoint 時(shí)將當(dāng)前位點(diǎn)以 INFO 級(jí)別打印到日志中,日志前綴為 “Binlog offset on checkpoint {checkpoint-id}”。 該日志可以幫助將作業(yè)從某個(gè) checkpoint 的位點(diǎn)開(kāi)始啟動(dòng)的場(chǎng)景。
- 如果捕獲變更的表曾經(jīng)發(fā)生過(guò)表結(jié)構(gòu)變化,從最早位點(diǎn)、特定位點(diǎn)或時(shí)間戳啟動(dòng)可能會(huì)發(fā)生錯(cuò)誤,因?yàn)?Debezium 讀取器會(huì)在內(nèi)部保存當(dāng)前的最新表結(jié)構(gòu),結(jié)構(gòu)不匹配的早期數(shù)據(jù)無(wú)法被正確解析。