flink-cdc 讀取mysql數(shù)據(jù)

通過(guò)flink-cdc的Connector讀取mysql數(shù)據(jù),并寫入到其他系統(tǒng)或者數(shù)據(jù)庫(kù),需要先開(kāi)啟mysql的binlog功能

1. 導(dǎo)入maven 依賴

 <dependencies>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.19</version>
    </dependency>
 <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.6</version>
    </dependency>
    <!-- 引入日志管理相關(guān)依賴-->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>${slf4j.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-to-slf4j</artifactId>
      <version>2.14.0</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.16.18</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.6</version>
    </dependency>
    <dependency>
      <groupId>com.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>2.2.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_2.12</artifactId>
      <version>1.13.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_2.12</artifactId>
      <version>1.13.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>1.13.6</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.12</artifactId>
      <version>1.13.6</version>
      <type>test-jar</type>
    </dependency>
  </dependencies>

2. 新建Flink-cdc測(cè)試類

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

@Slf4j
public class FlinkCDC {


    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("127.0.0.1")
            .port(3306)
            .databaseList("user") // set captured database
            .tableList("user.log_info") // set captured table
            .username("root")
            .password("password")
             // 自定義反序列化方式
            .deserializer(new CustomDeserialization())
             //           StartupOptions.initial() 先全量后增量
//            .startupOptions(StartupOptions.initial())
              //   StartupOptions.latest() 從最新binlog讀取,增量方式
            .startupOptions(StartupOptions.latest())
            .build();


        Configuration config = new Configuration();

//        config.setString("execution.savepoint.path", "file:///D:\\flink\\checkpoints\\cc52b93fd24977e5388f0a19a30d49d2\\chk-87");
        // 啟動(dòng)時(shí)設(shè)置
        if(ArrayUtils.isNotEmpty(args)) {
             String lasCheckpointPath =  args[0];
             // 例如 D:\flink\checkpoints\8bdf5d49bb1b4cda56aaa0a590fc2cef\chk-55
            // 重啟服務(wù)器指定最新checkpoint路徑,從該路徑指定checkpoint位置恢復(fù)讀取數(shù)據(jù)
             config.setString("execution.savepoint.path", lasCheckpointPath);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);

        // enable checkpoint
        env.enableCheckpointing(3000);
//      env.getCheckpointConfig().setCheckpointStorage("file:///D:\\flink\\checkpoints");
// 設(shè)置checkpoint保存位置,這里設(shè)置為本地文件存儲(chǔ)
        env.setStateBackend(new FsStateBackend("file:///D:\\flink\\checkpoints"));
        env
            .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
            // set 4 parallel source tasks
            .setParallelism(4)
            .addSink(new CustomSink()).setParallelism(1);
//            .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

        env.execute("flinkcdc");
    }
}

3.自定義反序列化類

import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

/**
 * 自定義序列化器
 */
public class CustomDeserialization implements DebeziumDeserializationSchema<String> {



    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
        throws Exception {

        JSONObject res = new JSONObject();

        // 獲取數(shù)據(jù)庫(kù)和表名稱
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        Struct value = (Struct) sourceRecord.value();
        // 獲取before數(shù)據(jù)
        Struct before = value.getStruct("before");
        JSONObject beforeJson = new JSONObject();
        if (before != null) {
            Schema beforeSchema = before.schema();
            List<Field> beforeFields = beforeSchema.fields();
            for (Field field : beforeFields) {
                Object beforeValue = before.get(field);
                beforeJson.put(field.name(), beforeValue);
            }
        }

        // 獲取after數(shù)據(jù)
        Struct after = value.getStruct("after");
        JSONObject afterJson = new JSONObject();
        if (after != null) {
            Schema afterSchema = after.schema();
            List<Field> afterFields = afterSchema.fields();
            for (Field field : afterFields) {
                Object afterValue = after.get(field);
                afterJson.put(field.name(), afterValue);
            }
        }

        //獲取操作類型 READ DELETE UPDATE CREATE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        // 將字段寫到j(luò)son對(duì)象中
        res.put("database", database);
        res.put("tableName", tableName);
        res.put("before", beforeJson);
        res.put("after", afterJson);
        res.put("type", type);

        //輸出數(shù)據(jù)
        collector.collect(res.toString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

4.自定義Sink輸出

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

public class CustomSink extends RichSinkFunction {


    @Override
    public void invoke(Object value,Context context) throws Exception {
        String v = value.toString();

         TableData<LogInfo>  tableData = JSON.parseObject(v,  new 
            TypeReference<TableData<LogInfo>>() {});
           System.out.println(t.toString());
            // TODO 保存到其他系統(tǒng)/中間件(mq等)/其他數(shù)據(jù)庫(kù),同學(xué)可以自己根據(jù)情況實(shí)現(xiàn)
            //  發(fā)送到消息隊(duì)列rabbitmq或者kafaka中處理
           // rabbitmqtemplate.send(tableData );
          //  保存數(shù)據(jù)庫(kù)
          //  testMapper.insert(tableData.getAfter());
    }
}


@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableData<T> {

    private String database;

    private String tableName;

    private String update;

    private T before;

    private T after;
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容