Flink CDC實(shí)戰(zhàn)之Mongo同步Mysql

簡(jiǎn)介

面對(duì)復(fù)雜的業(yè)務(wù)場(chǎng)景,企業(yè)可能會(huì)選用不同的數(shù)據(jù)庫(kù),這給業(yè)務(wù)之間數(shù)據(jù)交互,數(shù)據(jù)分析等帶來(lái)一定的困擾,對(duì)此,數(shù)據(jù)同步起到很重要的作用,目前業(yè)內(nèi)成熟的數(shù)據(jù)同步組件很多,支持實(shí)時(shí)同步的組件有:Canal,Maxwell,Debezium等等,F(xiàn)link作為實(shí)時(shí)處理引擎,采用一種sql的方式方便快捷的實(shí)現(xiàn)了數(shù)據(jù)同步,筆者今天就以mongo同步到mysql為例做個(gè)演示,flink使用的版本為1.13.5。

Mongo環(huán)境搭建

Flink MongoDB CDC Connector是基于MongoDB Change Streams實(shí)現(xiàn)的,所以單機(jī)版的Mongo DB不支持。MongoDB 提供了副本集和分片集兩種集群模部署模式,副本集相當(dāng)于mysql的主從復(fù)制,集群模式相當(dāng)于多實(shí)例分片存儲(chǔ)集群。筆者在docker中部署了一個(gè)副本集群進(jìn)行演示。

  1. 創(chuàng)建三個(gè)容器

docker run --name mongo0 -p 27000:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo1 -p 27001:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo2 -p 27002:27017 -d mongo --replSet "mg-cdc"

  1. 進(jìn)入容器mongo0

docker exec -it mongo0 /bin/bash

  1. 進(jìn)入客戶端并配置集群
bin/mongosh

//ifconfig 查看宿主機(jī)IP地址
config = {"_id":"mg-cdc",
          "members":[
          {"_id":0,host:"192.168.1.9:27000"},
          {"_id":1,host:"192.168.1.9:27001"},
          {"_id":2,host:"192.168.1.9:27002"}
          ]
}
 
rs.initiate(config)
rs.status()

如果容器重啟了,會(huì)報(bào)錯(cuò)

MongoServerError: already initialized

需要重新配置集群

rs.reconfig(config)

可能報(bào)如下錯(cuò)誤

MongoServerError: New config is rejected :: caused by :: replSetReconfig should only be run on a writable PRIMARY. Current state REMOVED;

根據(jù)報(bào)錯(cuò)信息加強(qiáng)制指令執(zhí)行

rs.reconfig(config, {force:true})

  1. 創(chuàng)建Mongo新用戶,給Flink MongoDB CDC使用
use admin;
db.createUser({
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    { role: "read", db: "admin" },
    { role: "readAnyDatabase", db: "admin" }
  ]
});
  1. 測(cè)試changestream
use wlapp;
cursor = db.plan_joined_user.watch()
cursor.next()

Flink CDC 代碼實(shí)現(xiàn)

  1. 相關(guān)依賴
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mongodb-cdc</artifactId>
    <version>2.2.1</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
    <version>2.2.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>2.7.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.13.5</version>
</dependency>
  1. 代碼
public class FlinkCdcSync {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setBufferTimeout(BUFFER_TIMEOUT_MS);
        env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE mongo_plan_joined_user (" +
                "  _id STRING," +
                "  plan_id STRING," +
                "  user_id STRING," +
                "  invite_share_log_id STRING," +
                "  joined_time STRING," +
                "  target_value STRING," +
                "  PRIMARY KEY(_id) NOT ENFORCED" +
                ") WITH (" +
                "  'connector' = 'mongodb-cdc'," +
                "  'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002'," +
                "  'username' = 'flinkuser'," +
                "  'password' = 'flinkpw'," +
                "  'database' = 'wlapp'," +
                "  'collection' = 'plan_joined_user'" +
                ")");

        tableEnv.executeSql("CREATE TABLE mysql_plan_joined_user (" +
                "  id STRING," +
                "  plan_id STRING," +
                "  user_id STRING," +
                "  invite_share_log_id STRING," +
                "  joined_time STRING," +
                "  target_value STRING," +
                "  PRIMARY KEY (id) NOT ENFORCED" +
                ") WITH (" +
                "   'connector' = 'jdbc'," +
                "   'url' = 'jdbc:mysql://localhost:3306/wlapp'," +
                "   'table-name' = 'plan_joined_user'," +
                "   'driver' = 'com.mysql.cj.jdbc.Drive'," +
                "   'username' = 'root'," +
                "   'password' = '123456'," +
                "   'scan.fetch-size' = '200'" +
                ")");

        tableEnv.executeSql("insert into mysql_plan_joined_user select * from mongo_plan_joined_user");
    }
}

Flink CDC SQL實(shí)現(xiàn)

  1. 相關(guān)jar包
  • avro-1.11.0.jar
  • kafka-clients-2.7.0.jar
  • connect-api-2.7.0.jar
  • flink-sql-connector-mongodb-cdc-2.2.1.jar
  • flink-connector-jdbc_2.11-1.13.5.jar
  • mysql-connector-java-8.0.29.jar
  1. 注意事項(xiàng)
  • kafka-client需要適配,本地采用的是2.7.0,過(guò)程中使用1.1.1會(huì)報(bào)錯(cuò)
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
  • flink客戶端lib中不應(yīng)該存放flink-connector-mongodb-cdc-2.2.1.jar、flink-connector-debezium-2.2.1.jar 因?yàn)閒link-sql-connector-mongodb-cdc-2.2.1.jar 都已經(jīng)以shade的方式打進(jìn)去了,否則會(huì)報(bào)如下錯(cuò)誤
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
    at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:369)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
  1. 代碼
SET 'table.dml-sync' = 'false';
SET 'state.backend' = 'filesystem';
SET 'state.checkpoints.dir' = 'hdfs://namespace/user/flink/checkpoints';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2min';
SET 'execution.checkpointing.min-pause' = '1min';
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
SET 'execution.checkpointing.prefer-checkpoint-for-recovery' = 'true';
SET 'execution.runtime-mode' = 'streaming';

CREATE TABLE mongo_plan_joined_user (
    _id STRING,
    plan_id STRING,
    user_id STRING,
    invite_share_log_id STRING,
    joined_time STRING,
    target_value STRING,
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'wlapp',
    'collection' = 'plan_joined_user'
);

CREATE TABLE mysql_plan_joined_user (
    id STRING,
    plan_id STRING,
    user_id STRING,
    invite_share_log_id STRING,
    joined_time STRING,
    target_value STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/data',
    'table-name' = 'plan_joined_user',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = '123456',
    'scan.fetch-size' = '200'
);

INSERT INTO mysql_plan_joined_user SELECT * FROM mongo_plan_joined_user;

結(jié)

本例簡(jiǎn)單測(cè)試了mongo cdc同步mysql的場(chǎng)景,后續(xù)在生產(chǎn)中遇到坑也會(huì)同步更新。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容