flink使用15-在Flink SQL中連接kafka和mysql

本文主要介紹如何使用 FLink SQL 自己的 DDL語言來構(gòu)建基于 kafka 的表和 基于Mysql 的表,并直接把從 kafka 接過來的 Json 格式的數(shù)據(jù)轉(zhuǎn)換為 表結(jié)構(gòu)后直接寫入到Mysql,有了這樣的經(jīng)驗之后,大家可以自行修改 DML操作來實現(xiàn)不同的業(yè)務。文章內(nèi)容參考了一些阿里云邪大佬的文章Link,寫的很好。

環(huán)境配置如下:

  • zookeeper : 3.4.6
  • kafka: 2.12-2.2.1
  • mysql: 8.0

以上三個組件全部是通過Docker搭建,我的環(huán)境是使用VirtualBox搭建的Centos7虛擬機,在虛擬機上安裝Docker, 之后在本地主機IDE內(nèi)調(diào)試代碼。其中遇到了不少坑,主要是FLink與Kafka的通信,可以參考我的Docker-Compose文件的配置,已經(jīng)解決了網(wǎng)絡問題。注意KAFKA_ADVERTISED_LISTENERS的地址修改成自己的虛擬機IP地址。

version: '2.1'
services:
  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka:2.12-2.2.1
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_CREATE_TOPICS: "flink:1:1"
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  mysql:
    image: mysql
    command: --default-authentication-plugin=mysql_native_password
    restart: always
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: 123456

環(huán)境搭建好之后就是正式的代碼部分了:

核心是兩段SQL代碼,分別是用來連接Kafka和MYSQL的。

其中kafka使用json格式來解析。

樣例數(shù)據(jù)({"t":1570,"user_name":"xiaoming","cnt":100})

-- source
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'earliest-offset',
    'connector.properties.0.key' = 'group.id',
    'connector.properties.0.value' = 'test-group',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'localhost:9092',
    'connector.specific-offsets.0.partition' = '0',
    'connector.specific-offsets.0.offset' = '0',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);
-- sink
CREATE TABLE pvuv_sink (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
    'connector.table' = 'pvuv_sink',
    'connector.username' = 'root',
    'connector.password' = '123456',
    'connector.write.flush.max-rows' = '1'
);

在 Java代碼中,可以直接使用tEnv的sqlUpdate()方法來注冊這兩張表,之后就可以直接使用了。具體操作如下:

// 1. 連接kafka構(gòu)建源表
tEnv.sqlUpdate(kafkaSourceSql);

// 2. 定義要輸出的表
tEnv.sqlUpdate(mysqlSinkSql);

// 3. 自定義具體的 DML 操作,這里我直接將kafka寫入到mysql
// 對于Insert Into 操作,同樣還是要使用sqlUpdate()方法
tEnv.sqlUpdate("INSERT INTO sink " +
"SELECT * from log where cnt=100");

可以直接通過mysql的客戶端看到我們的寫入結(jié)果!

mysql.png

以上就是使用 Flink SQL 的 DDL 語言通過不同的外部數(shù)據(jù)源建立表的過程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容