簡(jiǎn)介
面對(duì)復(fù)雜的業(yè)務(wù)場(chǎng)景,企業(yè)可能會(huì)選用不同的數(shù)據(jù)庫(kù),這給業(yè)務(wù)之間數(shù)據(jù)交互,數(shù)據(jù)分析等帶來(lái)一定的困擾,對(duì)此,數(shù)據(jù)同步起到很重要的作用,目前業(yè)內(nèi)成熟的數(shù)據(jù)同步組件很多,支持實(shí)時(shí)同步的組件有:Canal,Maxwell,Debezium等等,F(xiàn)link作為實(shí)時(shí)處理引擎,采用一種sql的方式方便快捷的實(shí)現(xiàn)了數(shù)據(jù)同步,筆者今天就以mongo同步到mysql為例做個(gè)演示,flink使用的版本為1.13.5。
Mongo環(huán)境搭建
Flink MongoDB CDC Connector是基于MongoDB Change Streams實(shí)現(xiàn)的,所以單機(jī)版的Mongo DB不支持。MongoDB 提供了副本集和分片集兩種集群模部署模式,副本集相當(dāng)于mysql的主從復(fù)制,集群模式相當(dāng)于多實(shí)例分片存儲(chǔ)集群。筆者在docker中部署了一個(gè)副本集群進(jìn)行演示。
- 創(chuàng)建三個(gè)容器
docker run --name mongo0 -p 27000:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo1 -p 27001:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo2 -p 27002:27017 -d mongo --replSet "mg-cdc"
- 進(jìn)入容器mongo0
docker exec -it mongo0 /bin/bash
- 進(jìn)入客戶端并配置集群
bin/mongosh
//ifconfig 查看宿主機(jī)IP地址
config = {"_id":"mg-cdc",
"members":[
{"_id":0,host:"192.168.1.9:27000"},
{"_id":1,host:"192.168.1.9:27001"},
{"_id":2,host:"192.168.1.9:27002"}
]
}
rs.initiate(config)
rs.status()
如果容器重啟了,會(huì)報(bào)錯(cuò)
MongoServerError: already initialized
需要重新配置集群
rs.reconfig(config)
可能報(bào)如下錯(cuò)誤
MongoServerError: New config is rejected :: caused by :: replSetReconfig should only be run on a writable PRIMARY. Current state REMOVED;
根據(jù)報(bào)錯(cuò)信息加強(qiáng)制指令執(zhí)行
rs.reconfig(config, {force:true})
- 創(chuàng)建Mongo新用戶,給Flink MongoDB CDC使用
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
});
- 測(cè)試changestream
use wlapp;
cursor = db.plan_joined_user.watch()
cursor.next()
Flink CDC 代碼實(shí)現(xiàn)
- 相關(guān)依賴
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.5</version>
</dependency>
- 代碼
public class FlinkCdcSync {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setBufferTimeout(BUFFER_TIMEOUT_MS);
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE mongo_plan_joined_user (" +
" _id STRING," +
" plan_id STRING," +
" user_id STRING," +
" invite_share_log_id STRING," +
" joined_time STRING," +
" target_value STRING," +
" PRIMARY KEY(_id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mongodb-cdc'," +
" 'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002'," +
" 'username' = 'flinkuser'," +
" 'password' = 'flinkpw'," +
" 'database' = 'wlapp'," +
" 'collection' = 'plan_joined_user'" +
")");
tableEnv.executeSql("CREATE TABLE mysql_plan_joined_user (" +
" id STRING," +
" plan_id STRING," +
" user_id STRING," +
" invite_share_log_id STRING," +
" joined_time STRING," +
" target_value STRING," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:3306/wlapp'," +
" 'table-name' = 'plan_joined_user'," +
" 'driver' = 'com.mysql.cj.jdbc.Drive'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'scan.fetch-size' = '200'" +
")");
tableEnv.executeSql("insert into mysql_plan_joined_user select * from mongo_plan_joined_user");
}
}
Flink CDC SQL實(shí)現(xiàn)
- 相關(guān)jar包
- avro-1.11.0.jar
- kafka-clients-2.7.0.jar
- connect-api-2.7.0.jar
- flink-sql-connector-mongodb-cdc-2.2.1.jar
- flink-connector-jdbc_2.11-1.13.5.jar
- mysql-connector-java-8.0.29.jar
- 注意事項(xiàng)
- kafka-client需要適配,本地采用的是2.7.0,過(guò)程中使用1.1.1會(huì)報(bào)錯(cuò)
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
- flink客戶端lib中不應(yīng)該存放flink-connector-mongodb-cdc-2.2.1.jar、flink-connector-debezium-2.2.1.jar 因?yàn)閒link-sql-connector-mongodb-cdc-2.2.1.jar 都已經(jīng)以shade的方式打進(jìn)去了,否則會(huì)報(bào)如下錯(cuò)誤
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:369)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
- 代碼
SET 'table.dml-sync' = 'false';
SET 'state.backend' = 'filesystem';
SET 'state.checkpoints.dir' = 'hdfs://namespace/user/flink/checkpoints';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2min';
SET 'execution.checkpointing.min-pause' = '1min';
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
SET 'execution.checkpointing.prefer-checkpoint-for-recovery' = 'true';
SET 'execution.runtime-mode' = 'streaming';
CREATE TABLE mongo_plan_joined_user (
_id STRING,
plan_id STRING,
user_id STRING,
invite_share_log_id STRING,
joined_time STRING,
target_value STRING,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'wlapp',
'collection' = 'plan_joined_user'
);
CREATE TABLE mysql_plan_joined_user (
id STRING,
plan_id STRING,
user_id STRING,
invite_share_log_id STRING,
joined_time STRING,
target_value STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/data',
'table-name' = 'plan_joined_user',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '123456',
'scan.fetch-size' = '200'
);
INSERT INTO mysql_plan_joined_user SELECT * FROM mongo_plan_joined_user;
結(jié)
本例簡(jiǎn)單測(cè)試了mongo cdc同步mysql的場(chǎng)景,后續(xù)在生產(chǎn)中遇到坑也會(huì)同步更新。