flink-cdc 之mongoDb源碼分析-1

相當于mysql-cdc的大動作(后面我會講),我讀源碼之后發(fā)現, 這個mongoDb-cdc的實現(2.2.1)代碼不是很復雜,現在簡單記錄一下,方便自己后續(xù)查閱。

如何開始讀源碼?
我建議從怎么使用它入手,我們看到官網教我們這么用:

  SourceFunction<String> sourceFunction = MongoDBSource.<String>builder()
                .hosts("localhost:27017")
                .username("flink")
                .password("flinkpw")
                .databaseList("inventory") // set captured database, support regex
                .collectionList("inventory.products", "inventory.orders") //set captured collections, support regex
                .deserializer(new JsonDebeziumDeserializationSchema())
                .build();

很明顯,我們就從這個MongoDBSource.<String>builder來入手就好了。

MongoDBSource

我們打開這個類 ,看到上面全是一些變量的定義,初始化啥的,就不看了,直接跳到346行:build()方法,畫風直接變了,這個就是我們想要的呀??!
然鵝,我們再看這個方法,最有用的就是最后一句:

         return new DebeziumSourceFunction<>(
                    deserializer, props, null, Validator.getDefaultValidator());

繼續(xù)進入這個 DebeziumSourceFunction 類,注釋就非常棒,給了我們很多干貨!!我總結了一下,有以下幾點:

1  運行時有兩個worker。
  一名worker定期從數據庫中提取記錄并將記錄推送到 Handover 。
  另一個 worker 消費來自 Handover的記錄,并將記錄轉換為 Flink 樣式的數據。
  不使用一個worker 是因為 debezium 在快照階段和流階段有不同的行為。

2   使用 Handover 作為緩沖區(qū),將數據從生產者提交給消費者。
  因為兩個線程不直接相互通信,所以報錯也依賴于 Handover。
  當引擎出現錯誤時,引擎使用 {@linkDebeziumEngine.CompletionCallback} 向 
  Handover 報告錯誤并喚醒消費者檢查錯誤。
 但是,如果錯誤來自 Flink 端,source function 只是關閉引擎并喚醒生產者。

3 如果執(zhí)行被取消或完成(僅快照階段),退出邏輯與錯誤報告中的邏輯相同。

4 source function參與檢查點并保證在故障期間不會丟失任何數據,保證了“exactly once”

5 目前, source function不能在多個并行實例中運行

再往下讀,看到這個類名部分:

@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
        implements CheckpointedFunction, CheckpointListener, ResultTypeQueryable<T> 

RichSourceFunction ---  這個是flink的自定義source固定用法

CheckpointedFunction,CheckpointListener  --- cp相關的接口,里面有初始化狀態(tài),保存狀態(tài)的方法,以及cp成功之后的動作

再往下,都是一些變量,其中值得我們注意的有幾個:

    /** Data for pending but uncommitted offsets. */
    private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
這個變量就是個普通的linkedMap,用來保存沒有提交的offset

//The offsets to restore to, if the consumer restores state from a checkpoint.
    private transient volatile String restoredOffsetState;
這個說的是,如果是從某個cp恢復的,就保存該offset,之所以用string,是為了用 JSON BYTE來編碼

/** Accessor for state in the operator state backend. */
    private transient ListState<byte[]> offsetState;
這個就是整個靈魂,offset的值,看到用的是 ListState,

順藤摸瓜找下去,發(fā)現了這個狀態(tài)值的出處:
    public byte[] snapshotCurrentState() throws Exception {
        // this method assumes that the checkpoint lock is held
        assert Thread.holdsLock(checkpointLock);
        if (debeziumOffset.sourceOffset == null || debeziumOffset.sourcePartition == null) {
            return null;
        }
        return stateSerializer.serialize(debeziumOffset);
    }
而 debeziumOffset.sourceOffset 和 debeziumOffset.sourcePartition 都是hashMap.
可以推斷出這個狀態(tài),會保存debezuim消費的分區(qū)信息,偏移量信息,也符合常識。
其實他這里只是用到了1個Byte[]而已,用ListState是出于什么考慮呢? 
這個問題,我猜測是為了將來做擴展吧。 有了解詳情的,也請留言告知下,多謝了!

往下走,有幾個變量,更為關鍵

 private transient DebeziumEngine<?> engine;
//這個就是debezuim引擎,是flink-cdc的根本所在,能讀數據全部仰仗它了。我們這篇文章先不深入介紹他了,我們知道數據最后靠它去獲取的即可。后續(xù)開一個文章專門寫他。

 /** Consume the events from the engine and commit the offset to the engine. */
  private transient DebeziumChangeConsumer changeConsumer;
consumer會消費engine從遠程數據庫拿到的數據,然后把數據放入 Handover 中(持有鎖時),他放數據到handover前會檢查 list有沒有數據,有數據就先不放了,讓出鎖。

    /** The consumer to fetch records from {@link Handover}. */
 private transient DebeziumChangeFetcher<T> debeziumChangeFetcher;
fetcher是用來消費 Handover 中的數據,讀到數據之后,就會執(zhí)行sourceContext.collect(record)方法,把數據輸出到flink的數據pipeline中了。他和上面的consumer會競爭handover的鎖,搶的話就消費數據,消費完了就清空List, 再釋放鎖。

    /** Buffer the events from the source and record the errors from the debezium. */
    private transient Handover handover;
handover用一個list來保存數據。然后再設置了一個鎖給上述的 changeConsumer 和 debeziumChangeFetcher 來搶。

看完了變量,接下來就是重頭戲了,先是open方法:

    public void open(Configuration parameters) throws Exception {
        validator.validate();
        super.open(parameters);
        ThreadFactory threadFactory =
                new ThreadFactoryBuilder().setNameFormat("debezium-engine").build();
        this.executor = Executors.newSingleThreadExecutor(threadFactory);
        this.handover = new Handover();
        this.changeConsumer = new DebeziumChangeConsumer(handover);
    }
這個的意思是,先校驗,然后創(chuàng)建一個線程執(zhí)行器,創(chuàng)建一個handover(非常重要的家伙),創(chuàng)建一個changeConsumer。

然后我們看到 run方法里

這里關鍵的是以下這些代碼

 this.engine =
                DebeziumEngine.create(Connect.class)
                        .using(properties)
                        .notifying(changeConsumer) 
                        .using(OffsetCommitPolicy.always())
                        .using(
                                (success, message, error) -> {
                                    if (success) {
                                        // Close the handover and prepare to exit.
                                        handover.close();
                                    } else {
                                        handover.reportError(error);
                                    }
                                })
                        .build();

   .notifying(changeConsumer)  ---這一句指的是 engine拿到的數據,將由 changeConsumer來處理。

   // run the engine asynchronously
        executor.execute(engine); --用一個線程來啟動 engine。順便把changeConsumer也啟動了。
        debeziumStarted = true;

     // start the real debezium consumer
        debeziumChangeFetcher.runFetchLoop(); 
//另外一個線程啟動 debeziumChangeFetcher

總結一下:


flink-cdc-原理.JPG
 該圖很清晰的看見,一共有4個重要的組件:
   debezuim引擎負責數據抓取
  changeConsumer負責消費引擎出來的數據以及將數據發(fā)給handover
 handover類似中間件,做短暫的存儲(因為一共就兩個線程搶占鎖,一旦數據過來,會很快被消費)
 fetcher負責消費handover的數據,然后輸出到flink的pipeline中。

最后還有一點:
flink-cdc 的mongodb的實現,前面的mongoDbsource僅僅虛晃一槍,絕大部分邏輯是在 DebeziumSourceFunction中完成的,而其他的四種db(目前flink-cdc就支持5種), oracle,pg,mysql(舊版本), sqlserver,也是調用了該類來實現的。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容