相當于mysql-cdc的大動作(后面我會講),我讀源碼之后發(fā)現, 這個mongoDb-cdc的實現(2.2.1)代碼不是很復雜,現在簡單記錄一下,方便自己后續(xù)查閱。
如何開始讀源碼?
我建議從怎么使用它入手,我們看到官網教我們這么用:
SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
.hosts("localhost:27017")
.username("flink")
.password("flinkpw")
.databaseList("inventory") // set captured database, support regex
.collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
很明顯,我們就從這個MongoDBSource.<String>builder來入手就好了。
MongoDBSource
我們打開這個類 ,看到上面全是一些變量的定義,初始化啥的,就不看了,直接跳到346行:build()方法,畫風直接變了,這個就是我們想要的呀??!
然鵝,我們再看這個方法,最有用的就是最后一句:
return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
繼續(xù)進入這個 DebeziumSourceFunction 類,注釋就非常棒,給了我們很多干貨!!我總結了一下,有以下幾點:
1 運行時有兩個worker。
一名worker定期從數據庫中提取記錄并將記錄推送到 Handover 。
另一個 worker 消費來自 Handover的記錄,并將記錄轉換為 Flink 樣式的數據。
不使用一個worker 是因為 debezium 在快照階段和流階段有不同的行為。
2 使用 Handover 作為緩沖區(qū),將數據從生產者提交給消費者。
因為兩個線程不直接相互通信,所以報錯也依賴于 Handover。
當引擎出現錯誤時,引擎使用 {@linkDebeziumEngine.CompletionCallback} 向
Handover 報告錯誤并喚醒消費者檢查錯誤。
但是,如果錯誤來自 Flink 端,source function 只是關閉引擎并喚醒生產者。
3 如果執(zhí)行被取消或完成(僅快照階段),退出邏輯與錯誤報告中的邏輯相同。
4 source function參與檢查點并保證在故障期間不會丟失任何數據,保證了“exactly once”
5 目前, source function不能在多個并行實例中運行
再往下讀,看到這個類名部分:
@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T>
RichSourceFunction --- 這個是flink的自定義source固定用法
CheckpointedFunction,CheckpointListener --- cp相關的接口,里面有初始化狀態(tài),保存狀態(tài)的方法,以及cp成功之后的動作
再往下,都是一些變量,其中值得我們注意的有幾個:
/** Data for pending but uncommitted offsets. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
這個變量就是個普通的linkedMap,用來保存沒有提交的offset
//The offsets to restore to, if the consumer restores state from a checkpoint.
private transient volatile String restoredOffsetState;
這個說的是,如果是從某個cp恢復的,就保存該offset,之所以用string,是為了用 JSON BYTE來編碼
/** Accessor for state in the operator state backend. */
private transient ListState<byte[]> offsetState;
這個就是整個靈魂,offset的值,看到用的是 ListState,
順藤摸瓜找下去,發(fā)現了這個狀態(tài)值的出處:
public byte[] snapshotCurrentState() throws Exception {
// this method assumes that the checkpoint lock is held
assert Thread.holdsLock(checkpointLock);
if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
return null;
}
return stateSerializer.serialize(debeziumOffset);
}
而 debeziumOffset.sourceOffset 和 debeziumOffset.sourcePartition 都是hashMap.
可以推斷出這個狀態(tài),會保存debezuim消費的分區(qū)信息,偏移量信息,也符合常識。
其實他這里只是用到了1個Byte[]而已,用ListState是出于什么考慮呢?
這個問題,我猜測是為了將來做擴展吧。 有了解詳情的,也請留言告知下,多謝了!
往下走,有幾個變量,更為關鍵
private transient DebeziumEngine<?> engine;
//這個就是debezuim引擎,是flink-cdc的根本所在,能讀數據全部仰仗它了。我們這篇文章先不深入介紹他了,我們知道數據最后靠它去獲取的即可。后續(xù)開一個文章專門寫他。
/** Consume the events from the engine and commit the offset to the engine. */
private transient DebeziumChangeConsumer changeConsumer;
consumer會消費engine從遠程數據庫拿到的數據,然后把數據放入 Handover 中(持有鎖時),他放數據到handover前會檢查 list有沒有數據,有數據就先不放了,讓出鎖。
/** The consumer to fetch records from {@link Handover}. */
private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
fetcher是用來消費 Handover 中的數據,讀到數據之后,就會執(zhí)行sourceContext.collect(record)方法,把數據輸出到flink的數據pipeline中了。他和上面的consumer會競爭handover的鎖,搶的話就消費數據,消費完了就清空List, 再釋放鎖。
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;
handover用一個list來保存數據。然后再設置了一個鎖給上述的 changeConsumer 和 debeziumChangeFetcher 來搶。
看完了變量,接下來就是重頭戲了,先是open方法:
public void open(Configuration parameters) throws Exception {
validator.validate();
super.open(parameters);
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
}
這個的意思是,先校驗,然后創(chuàng)建一個線程執(zhí)行器,創(chuàng)建一個handover(非常重要的家伙),創(chuàng)建一個changeConsumer。
然后我們看到 run方法里
這里關鍵的是以下這些代碼
this.engine =
DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(changeConsumer)
.using(OffsetCommitPolicy.always())
.using(
(success, message, error) -> {
if (success) {
// Close the handover and prepare to exit.
handover.close();
} else {
handover.reportError(error);
}
})
.build();
.notifying(changeConsumer) ---這一句指的是 engine拿到的數據,將由 changeConsumer來處理。
// run the engine asynchronously
executor.execute(engine); --用一個線程來啟動 engine。順便把changeConsumer也啟動了。
debeziumStarted = true;
// start the real debezium consumer
debeziumChangeFetcher.runFetchLoop();
//另外一個線程啟動 debeziumChangeFetcher
總結一下:
flink-cdc-原理.JPG
該圖很清晰的看見,一共有4個重要的組件:
debezuim引擎負責數據抓取
changeConsumer負責消費引擎出來的數據以及將數據發(fā)給handover
handover類似中間件,做短暫的存儲(因為一共就兩個線程搶占鎖,一旦數據過來,會很快被消費)
fetcher負責消費handover的數據,然后輸出到flink的pipeline中。
最后還有一點:
flink-cdc 的mongodb的實現,前面的mongoDbsource僅僅虛晃一槍,絕大部分邏輯是在 DebeziumSourceFunction中完成的,而其他的四種db(目前flink-cdc就支持5種), oracle,pg,mysql(舊版本), sqlserver,也是調用了該類來實現的。