flink cdc connector簡單案例

參考文檔

https://github.com/ververica/flink-cdc-connectors

安裝

下載jar包

https://github.com/ververica/flink-cdc-connectors/wiki/Downloads
這里建議使用maven下載

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.11.0</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <!-- add the dependency matching your database -->
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.1.0</version>
        </dependency>

將jar包放到flink集群flink/lib下

注意這里需要放置flink-cdc-connectors和flink-jdbc-connectors

重啟集群

案例:MySQLSource—>Flink SQL—>MysqlSink

在MySQL數(shù)據(jù)庫中新建數(shù)據(jù)庫

新建數(shù)據(jù)庫inventory
新建表products,company作為cdc同步表
新建表result作為products,company join后結(jié)果存放的表

CREATE TABLE `products` (
   `id` int NOT NULL,
   `name` varchar(45) DEFAULT NULL,
   `description` varchar(45) DEFAULT NULL,
   `weight` decimal(10,3) DEFAULT NULL,
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `company` (
   `id` int NOT NULL,
   `company` varchar(45) DEFAULT NULL,
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

CREATE TABLE `result` (
   `id` int NOT NULL,
   `name` varchar(45) DEFAULT NULL,
   `description` varchar(45) DEFAULT NULL,
   `weight` decimal(10,3) DEFAULT NULL,
   `company` varchar(45) DEFAULT NULL,
   PRIMARY KEY (`id`)
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

在數(shù)據(jù)庫中手動插入數(shù)據(jù)

啟動SQL-client.sh客戶端

新建動態(tài)表sql

-- creates a mysql cdc table source
--同步products表
CREATE TABLE mysql_binlog (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3)
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'inventory',
 'table-name' = 'products'
);

--同步company表
CREATE TABLE mysql_company (
 id INT NOT NULL,
 company STRING
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'localhost',
 'port' = '3306',
 'username' = 'root',
 'password' = 'root',
 'database-name' = 'inventory',
 'table-name' = 'company'
);

--存放join之后結(jié)果的表,注意建表時(shí)必須指定主鍵,不然會報(bào)錯(cuò)
CREATE TABLE mysql_result (
 id INT NOT NULL,
 name STRING,
 description STRING,
 weight DECIMAL(10,3),
 company STRING,
 PRIMARY KEY (id) NOT ENFORCED
) WITH (
 'connector' = 'jdbc',
 'url' = 'jdbc:mysql://localhost:3306/inventory',
 'username' = 'root',
 'password' = 'root',
 'table-name' = 'result'
);

查詢數(shù)據(jù)

>show tables;
------------------------------------
mysql_binlog
------------------------------------
>select * from mysql_binlog;

顯示以下結(jié)果,證明表數(shù)據(jù)同步成功

數(shù)據(jù)結(jié)果.png

在MySQL客戶端動態(tài)新增數(shù)據(jù),可以觀察到SQL-client客戶端數(shù)據(jù)也一起跟著變化

執(zhí)行以下SQL將表mysql_binlog 和表mysql_company 進(jìn)行左外連接,結(jié)果插入到mysql_result

注意:這里執(zhí)行insert語句來觸發(fā)數(shù)據(jù)同步執(zhí)行

insert into mysql_result (id,name,description,weight,company)
select 
  a.id,
  a.name,
  a.description,
  a.weight,
  b.company
from mysql_binlog a
left join mysql_company b
on a.id = b.id;

生成計(jì)劃

執(zhí)行計(jì)劃.png

執(zhí)行以上語句會生成一個(gè)session job任務(wù),對數(shù)據(jù)進(jìn)行流式處理。

flink cdc connector和 flink jdbc connector區(qū)別

  1. flink jdbc connector更接近批處理,沒有實(shí)時(shí)同步數(shù)據(jù)的能力

flink cdc connector也有其局限性:

  1. 支持的數(shù)據(jù)庫:MySQL,PostgreSql
  2. 由于cdc connector在同步新增數(shù)據(jù)時(shí)是偽裝成為MySQL slave同步MySQL的binlog,僅僅支持同步新增和修改的數(shù)據(jù),對刪除的數(shù)據(jù)無法做出處理。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容