本文主要介紹如何使用 FLink SQL 自己的 DDL語言來構(gòu)建基于 kafka 的表和 基于Mysql 的表,并直接把從 kafka 接過來的 Json 格式的數(shù)據(jù)轉(zhuǎn)換為 表結(jié)構(gòu)后直接寫入到Mysql,有了這樣的經(jīng)驗之后,大家可以自行修改 DML操作來實現(xiàn)不同的業(yè)務。文章內(nèi)容參考了一些阿里云邪大佬的文章Link,寫的很好。
環(huán)境配置如下:
- zookeeper : 3.4.6
- kafka: 2.12-2.2.1
- mysql: 8.0
以上三個組件全部是通過Docker搭建,我的環(huán)境是使用VirtualBox搭建的Centos7虛擬機,在虛擬機上安裝Docker, 之后在本地主機IDE內(nèi)調(diào)試代碼。其中遇到了不少坑,主要是FLink與Kafka的通信,可以參考我的Docker-Compose文件的配置,已經(jīng)解決了網(wǎng)絡問題。注意KAFKA_ADVERTISED_LISTENERS的地址修改成自己的虛擬機IP地址。
version: '2.1'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_CREATE_TOPICS: "flink:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
mysql:
image: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: 123456
環(huán)境搭建好之后就是正式的代碼部分了:
核心是兩段SQL代碼,分別是用來連接Kafka和MYSQL的。
其中kafka使用json格式來解析。
樣例數(shù)據(jù)({"t":1570,"user_name":"xiaoming","cnt":100})
-- source
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'test-group',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.specific-offsets.0.partition' = '0',
'connector.specific-offsets.0.offset' = '0',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
-- sink
CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'pvuv_sink',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1'
);
在 Java代碼中,可以直接使用tEnv的sqlUpdate()方法來注冊這兩張表,之后就可以直接使用了。具體操作如下:
// 1. 連接kafka構(gòu)建源表
tEnv.sqlUpdate(kafkaSourceSql);
// 2. 定義要輸出的表
tEnv.sqlUpdate(mysqlSinkSql);
// 3. 自定義具體的 DML 操作,這里我直接將kafka寫入到mysql
// 對于Insert Into 操作,同樣還是要使用sqlUpdate()方法
tEnv.sqlUpdate("INSERT INTO sink " +
"SELECT * from log where cnt=100");
可以直接通過mysql的客戶端看到我們的寫入結(jié)果!

mysql.png
以上就是使用 Flink SQL 的 DDL 語言通過不同的外部數(shù)據(jù)源建立表的過程。