參考文檔
https://github.com/ververica/flink-cdc-connectors
安裝
下載jar包
https://github.com/ververica/flink-cdc-connectors/wiki/Downloads
這里建議使用maven下載
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>
將jar包放到flink集群flink/lib下
注意這里需要放置flink-cdc-connectors和flink-jdbc-connectors
重啟集群
案例:MySQLSource—>Flink SQL—>MysqlSink
在MySQL數(shù)據(jù)庫中新建數(shù)據(jù)庫
新建數(shù)據(jù)庫inventory
新建表products,company作為cdc同步表
新建表result作為products,company join后結(jié)果存放的表
CREATE TABLE `products` (
`id` int NOT NULL,
`name` varchar(45) DEFAULT NULL,
`description` varchar(45) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `company` (
`id` int NOT NULL,
`company` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `result` (
`id` int NOT NULL,
`name` varchar(45) DEFAULT NULL,
`description` varchar(45) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
`company` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
在數(shù)據(jù)庫中手動插入數(shù)據(jù)
啟動SQL-client.sh客戶端
新建動態(tài)表sql
-- creates a mysql cdc table source
--同步products表
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'inventory',
'table-name' = 'products'
);
--同步company表
CREATE TABLE mysql_company (
id INT NOT NULL,
company STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'inventory',
'table-name' = 'company'
);
--存放join之后結(jié)果的表,注意建表時(shí)必須指定主鍵,不然會報(bào)錯(cuò)
CREATE TABLE mysql_result (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
company STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/inventory',
'username' = 'root',
'password' = 'root',
'table-name' = 'result'
);
查詢數(shù)據(jù)
>show tables;
------------------------------------
mysql_binlog
------------------------------------
>select * from mysql_binlog;
顯示以下結(jié)果,證明表數(shù)據(jù)同步成功

數(shù)據(jù)結(jié)果.png
在MySQL客戶端動態(tài)新增數(shù)據(jù),可以觀察到SQL-client客戶端數(shù)據(jù)也一起跟著變化
執(zhí)行以下SQL將表mysql_binlog 和表mysql_company 進(jìn)行左外連接,結(jié)果插入到mysql_result
注意:這里執(zhí)行insert語句來觸發(fā)數(shù)據(jù)同步執(zhí)行
insert into mysql_result (id,name,description,weight,company)
select
a.id,
a.name,
a.description,
a.weight,
b.company
from mysql_binlog a
left join mysql_company b
on a.id = b.id;
生成計(jì)劃

執(zhí)行計(jì)劃.png
執(zhí)行以上語句會生成一個(gè)session job任務(wù),對數(shù)據(jù)進(jìn)行流式處理。
flink cdc connector和 flink jdbc connector區(qū)別
- flink jdbc connector更接近批處理,沒有實(shí)時(shí)同步數(shù)據(jù)的能力
flink cdc connector也有其局限性:
- 支持的數(shù)據(jù)庫:MySQL,PostgreSql
- 由于cdc connector在同步新增數(shù)據(jù)時(shí)是偽裝成為MySQL slave同步MySQL的binlog,僅僅支持同步新增和修改的數(shù)據(jù),對刪除的數(shù)據(jù)無法做出處理。