Flink系列 - 實(shí)時(shí)數(shù)倉(cāng)之FlinkCDC實(shí)現(xiàn)動(dòng)態(tài)分流實(shí)戰(zhàn)(十)

??自從Flink出了FlinkCDC之后,我們對(duì)數(shù)據(jù)庫(kù)日志的采集就變得方便了許多了,除去了MaxWell、Cannel、OGG等第三方組件的繁瑣配置,目前實(shí)現(xiàn)CDC有兩種方式:HQL實(shí)現(xiàn) 和 DataStreamAPI實(shí)現(xiàn)(推薦)。
??想更深入的了解CDC可以通過此鏈接進(jìn)行學(xué)習(xí):
??1. 文檔 -> https://ververica.github.io/flink-cdc-connectors/master/
??2. 項(xiàng)目 -> https://github.com/ververica/flink-cdc-connectors

??廢話不多說,今天我們就使用 FlinkCDC 對(duì)業(yè)務(wù)數(shù)據(jù)進(jìn)行動(dòng)態(tài)的分流的實(shí)現(xiàn)。

一、動(dòng)態(tài)分流

??由于FlinkCDC是把全部數(shù)據(jù)統(tǒng)一寫入一個(gè)Topic中, 這樣顯然不利于日后的數(shù)據(jù)處理。所以需要把各個(gè)表拆開處理。但是由于每個(gè)表有不同的特點(diǎn),有些表是維度表,有些表是事實(shí)表。
??在實(shí)時(shí)計(jì)算中一般把維度數(shù)據(jù)寫入存儲(chǔ)容器,一般是方便通過主鍵查詢的數(shù)據(jù)庫(kù)比如HBase,Redis,MySQL等。一般把事實(shí)數(shù)據(jù)寫入流中,進(jìn)行進(jìn)一步處理,最終形成寬表。
??這樣的配置不適合寫在配置文件中,因?yàn)檫@樣的話,業(yè)務(wù)端隨著需求變化每增加一張表,就要修改配置重啟計(jì)算程序。所以這里需要一種動(dòng)態(tài)配置方案,把這種配置長(zhǎng)期保存起來,一旦配置有變化,實(shí)時(shí)計(jì)算可以自動(dòng)感知。

二、實(shí)現(xiàn)流程圖
企業(yè)微信截圖_16385248902249.png

從圖中我們可以看出,把分好的流保存到對(duì)應(yīng)表、主題中:
1)業(yè)務(wù)數(shù)據(jù)保存到Kafka的主題中
2)維度數(shù)據(jù)保存到HBase的表中

三、代碼實(shí)現(xiàn)

3.1)引入 pom.xml 主要的依賴

<dependencies>
      
       <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

         <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.2</version>
        </dependency>       
    
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.2</version>
        </dependency>  
      
        <!-- 如果保存檢查點(diǎn)到 hdfs上,需要引入此依賴 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!--phoenix-->
        <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-core</artifactId>
            <version>4.7.0-HBase-1.1</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

      <dependency>
            <groupId>org.apache.phoenix</groupId>
            <artifactId>phoenix-spark</artifactId>
            <version>5.0.0-HBase-2.0</version>
            <exclusions>
                <exclusion>
                    <groupId>org.glassfish</groupId>
                    <artifactId>javax.el</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

</dependencies>

3.2)主要邏輯代碼

public static void main(String[] args) throws Exception {

        //TODO 1.獲取執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 設(shè)置CK&狀態(tài)后端
        //env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
        //env.enableCheckpointing(5000L);
        //env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //env.getCheckpointConfig().setCheckpointTimeout(10000L);
        //env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        //env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);

        //env.setRestartStrategy(RestartStrategies.fixedDelayRestart());

        //TODO 2.消費(fèi)Kafka ods_base_db 主題數(shù)據(jù)創(chuàng)建流
        String sourceTopic = "ods_base_db";
        String groupId = "base_db_app_211212";
        DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));

        //TODO 3.將每行數(shù)據(jù)轉(zhuǎn)換為JSON對(duì)象并過濾(delete) 主流
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
                .filter(new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject value) throws Exception {
                        //取出數(shù)據(jù)的操作類型
                        String type = value.getString("type");

                        return !"delete".equals(type);
                    }
                });

        //TODO 4.使用FlinkCDC消費(fèi)配置表并處理成    廣播流
        SourceFunction<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("your hostname")
                .port(3306)
                .databaseList("lcy_db") // monitor all tables under inventory database
                .tableList("lcy_db.table_process")
                .username("luchangyin")
                .password("123456")
                .deserializer(new MyStringDeserializationSchema()) // 自定義:converts SourceRecord to JSON String
                .startupOptions(StartupOptions.initial()) // .initial() latest
                .build();

        DataStream<String> tbProcessDStream = env.addSource(mySqlSource).name("source-cdc-table_process");
        MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
        BroadcastStream<String> broadcastStream = tbProcessDStream.broadcast(mapStateDescriptor);

        //TODO 5.連接主流和廣播流
        BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);

        //TODO 6.分流  處理數(shù)據(jù)  廣播流數(shù)據(jù),主流數(shù)據(jù)(根據(jù)廣播流數(shù)據(jù)進(jìn)行處理)
        OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {};
        SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new MyTableProcessFunction(hbaseTag, mapStateDescriptor));

        //TODO 7.提取Kafka流數(shù)據(jù)和HBase流數(shù)據(jù)
        DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);

        //TODO 8.將Kafka數(shù)據(jù)寫入Kafka主題,將HBase數(shù)據(jù)寫入Phoenix表
        kafka.print("Kafka >>>>>>>>");
        hbase.print("HBase >>>>>>>");

//  ------- 入 HBASE 和 Kafka 的自定義函數(shù)自個(gè)去實(shí)現(xiàn),這里不做講述 --------- 
//        hbase.addSink(new DimSinkFunction());
//        kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
//            @Override
//            public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long ts) {
//                return new ProducerRecord<>(jsonObject.getString("sinkTable"),jsonObject.getString("after").getBytes());
//            }
//        }));


        //TODO 9.啟動(dòng)任務(wù)
        env.execute("BaseDBApp");
    }

3.3)自定義反序列化類(這里接觸過FlinkCDC都知道,接受過來的日志數(shù)據(jù)格式非常多,因此我們需要自定義獲取我們需要的具體通用數(shù)據(jù)),在 utils 包下:

public class MyStringDeserializationSchema implements DebeziumDeserializationSchema<String> {

    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

        //構(gòu)建結(jié)果對(duì)象
        JSONObject result = new JSONObject();

        //獲取數(shù)據(jù)庫(kù)名稱&表名稱
        String topic = sourceRecord.topic();
        String[] fields = topic.split("\\.");
        String database = fields[1];
        String tableName = fields[2];

        //獲取數(shù)據(jù)
        Struct value = (Struct) sourceRecord.value();

        //After
        Struct after = value.getStruct("after");
        JSONObject data = new JSONObject();
        if (after != null) { //delete數(shù)據(jù),則after為null
            Schema schema = after.schema();
            List<Field> fieldList = schema.fields();

            for (int i = 0; i < fieldList.size(); i++) {
                Field field = fieldList.get(i);
                Object fieldValue = after.get(field);
                data.put(field.name(), fieldValue);
            }
        }

        //Before
        Struct before = value.getStruct("before");
        JSONObject beforeData = new JSONObject();
        if (before != null) { //delete數(shù)據(jù),則after為null
            Schema schema = before.schema();
            List<Field> fieldList = schema.fields();

            for (int i = 0; i < fieldList.size(); i++) {
                Field field = fieldList.get(i);
                Object fieldValue = before.get(field);
                beforeData.put(field.name(), fieldValue);
            }
        }

        //獲取時(shí)間
//        long ts = Long.parseLong(value.get("ts_ms").toString());
//        String dt = MyDateUtils.getTimestamp2Fm(ts);

        //獲取操作類型 CREATE UPDATE DELETE
        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        String type = operation.toString().toLowerCase();
        if ("create".equals(type)) {
            type = "insert";
        }

        //封裝數(shù)據(jù)
        result.put("database", database);
        result.put("tableName", tableName);
        result.put("after", data);
        result.put("before", beforeData);
        result.put("type", type);
//        result.put("ts", ts);
//        result.put("dt", dt);

        //輸出封裝好的數(shù)據(jù)
        collector.collect(result.toJSONString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

}

3.4)創(chuàng)建實(shí)體類 TableProcess (對(duì)應(yīng)我們的通用配置表):

@Data
public class TableProcess {
    //動(dòng)態(tài)分流Sink常量
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //來源表
    String sourceTable;
    //操作類型 insert,update,delete
    String operateType;
    //輸出類型 hbase kafka
    String sinkType;
    //輸出表(主題)
    String sinkTable;
    //輸出字段
    String sinkColumns;
    //主鍵字段
    String sinkPk;
    //建表擴(kuò)展
    String sinkExtend;
}

3.5)創(chuàng)建合并流的處理類 MyTableProcessFunction :

public class MyTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {

    private OutputTag<JSONObject> objectOutputTag;
    private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
    private Connection connection;

    public MyTableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
        this.objectOutputTag = objectOutputTag;
        this.mapStateDescriptor = mapStateDescriptor;
    }

    @Override
    public void open(Configuration parameters) {
        try {
//            System.out.println("鳳凰驅(qū)動(dòng):"+ ShoppingConfig.PHOENIX_DRIVER +" , 鳳凰服務(wù):"+ ShoppingConfig.PHOENIX_SERVER);
            Class.forName(ShoppingConfig.PHOENIX_DRIVER);
            connection = DriverManager.getConnection(ShoppingConfig.PHOENIX_SERVER);
//            System.out.println("鏈接鳳凰的對(duì)象為 -> "+ connection);
        } catch (Exception e) {
            System.out.println("鏈接鳳凰失敗-> "+ e.toString());
            e.printStackTrace();
        }
    }

    @Override
    public void processBroadcastElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
        //TODO 1.獲取并解析數(shù)據(jù)
        JSONObject jsonObject = JSON.parseObject(s);
        String data = jsonObject.getString("after");
        TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);

        //TODO 2.建表
        if(TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())){
            checkTable(tableProcess.getSinkTable(),
                    tableProcess.getSinkColumns(),
                    tableProcess.getSinkPk(),
                    tableProcess.getSinkExtend());
        }

        //TODO 3.寫入狀態(tài),廣播出去
        BroadcastState<String,TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
        String key = tableProcess.getSourceTable() +"-"+ tableProcess.getOperateType();
        broadcastState.put(key, tableProcess);
    }

    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1.獲取狀態(tài)數(shù)據(jù)
        ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
        String key = value.getString("tableName") +"-"+ value.getString("type");
        TableProcess tableProcess = broadcastState.get(key);

        if (tableProcess != null){
            //TODO 2.過濾字段
            filterColumn(value.getJSONObject("after"), tableProcess.getSinkColumns());

            //TODO 3.分流
            //將輸出表/主題信息寫入Value
            value.put("sinkTable",tableProcess.getSinkTable());
            String sinkType = tableProcess.getSinkType();
            if(TableProcess.SINK_TYPE_KAFKA.equals(sinkType)){
                //Kafka數(shù)據(jù),寫入主流
                out.collect(value);
            }else if(TableProcess.SINK_TYPE_HBASE.equals(sinkType)){
                //HBase數(shù)據(jù),寫入側(cè)輸出流
                ctx.output(objectOutputTag, value);
            }
        }else {
            System.out.println("該組合key:"+ key +"不存在!");
        }

    }

    private void filterColumn(JSONObject data, String sinkColumns) {

        String[] fields = sinkColumns.split(",");
        List<String> columns = Arrays.asList(fields);
        data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
    }

    //建表語句 : create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx;
    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {

        PreparedStatement preparedStatement = null;

        try{
            if (sinkPk == null){
                sinkPk = "id";
            }

            if(sinkExtend == null){
                sinkExtend = "";
            }

            StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
                    .append(ShoppingConfig.HBASE_SCHEMA)
                    .append(".")
                    .append(sinkTable)
                    .append("(");

            String[] fields = sinkColumns.split(",");
            for(int i = 0; i < fields.length; i++){
                String field = fields[i];
                //判斷是否為主鍵
                if (sinkPk.equals(field)){
                    createTableSQL.append(field).append(" varchar primary key ");
                }else{
                    createTableSQL.append(field).append(" varchar ");
                }

                //判斷是否為最后一個(gè)字段,如果不是,則添加","
                if (i < fields.length - 1){
                    createTableSQL.append(",");
                }
            }

            createTableSQL.append(")").append(sinkExtend);

            //打印建表語句
            System.out.println(createTableSQL);

            //預(yù)編譯
            preparedStatement = connection.prepareStatement(createTableSQL.toString());
            //執(zhí)行
            preparedStatement.execute();
        }catch (SQLException e){
            throw new RuntimeException("Phoenix表"+ sinkTable +"建表失敗!");
        }finally {
            if (preparedStatement != null){
                try{
                    preparedStatement.close();
                }catch (SQLException e){
                    e.printStackTrace();
                }
            }
        }

    }

}

3.6)在MySQL中創(chuàng)建我們的配置表

-- 配置表
CREATE TABLE `table_process` (
     `source_table` varchar(200) NOT NULL COMMENT '來源表 ',
     `operate_type` varchar(200) NOT NULL COMMENT '操作類型 insert,update,delete',
     `sink_type` varchar(200) DEFAULT NULL COMMENT '輸出類型 hbase kafka',
     `sink_table` varchar(200) DEFAULT NULL COMMENT '輸出表 (主題 )',
     `sink_columns` varchar(2000) DEFAULT NULL COMMENT '輸出字段 ',
     `sink_pk` varchar(200) DEFAULT NULL COMMENT '主鍵字段 ',
     `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表擴(kuò)展 ',
     PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('order_detail','insert','hbase','order_detail','id,order_id,sku_name,sku_num,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('user_info','read','hbase','user_info','id,name,phone_num,user_level,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('base_attr_info','read','kafka','base_attr_info','id,attr_name,category_id');
select * from table_process;

四、聯(lián)調(diào)測(cè)試

4.1)部分kafka源數(shù)據(jù):

kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"鐘表","id":10},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"鞋靴","id":11},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"母嬰","id":12},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"禮品箱包","id":13},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"食品飲料、保健食品","id":14},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"珠寶","id":15},"type":"read","tableName":"base_category1"}

4.2)配置表部分?jǐn)?shù)據(jù):

配置表的數(shù)據(jù) : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"kafka","sink_table":"base_attr_info","source_table":"base_attr_info","sink_columns":"id,attr_name,category_id"},"type":"read","tableName":"table_process"}
配置表的數(shù)據(jù) : > {"database":"lcy_db","before":{},"after":{"operate_type":"insert","sink_type":"hbase","sink_table":"order_detail","source_table":"order_detail","sink_columns":"id,order_id,sku_name,sku_num,create_time"},"type":"read","tableName":"table_process"}
配置表的數(shù)據(jù) : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"hbase","sink_table":"user_info","source_table":"user_info","sink_columns":"id,name,phone_num,user_level,create_time"},"type":"read","tableName":"table_process"}

4.3)分流結(jié)果數(shù)據(jù):

# 入hbase的維表數(shù)據(jù)
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"時(shí)心邦","user_level":"1","phone_num":"13794339138","id":3998},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"宋厚慶","user_level":"1","phone_num":"13274778653","id":3999},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"康素云","user_level":"1","phone_num":"13739911376","id":4000},"type":"read","tableName":"user_info"}
# 入kafka的事實(shí)表數(shù)據(jù)
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"香水彩妝","id":112},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"面部護(hù)膚","id":113},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":473,"attr_name":"香調(diào)","id":114},"type":"read","tableName":"base_attr_info"}

五、總結(jié)

??到這里我們已經(jīng)完成了動(dòng)態(tài)分流的主題實(shí)現(xiàn),之前要是沒接觸過,說容易也不容易,但是說難也不是很難;之所以總結(jié)這個(gè)動(dòng)態(tài)分流主要是在項(xiàng)目中還是挺重要的,畢竟原業(yè)務(wù)系統(tǒng)日志數(shù)據(jù)過來之后會(huì)統(tǒng)一放在同一個(gè)topic中,即使你在代碼中使用判斷有多少個(gè)業(yè)務(wù)表然后在發(fā)不作業(yè)也行,不過這樣的弊端是如果源業(yè)務(wù)系統(tǒng)有新增表的話必須要添加判斷然后再重新發(fā)布作業(yè),這樣是不利于我們?cè)谏a(chǎn)上的操作的,那么我們的動(dòng)態(tài)分流技術(shù)就可以很好的避免了此類的弊端,如果使用了動(dòng)態(tài)分流,那么如果業(yè)務(wù)表中有新增數(shù)據(jù),我們只需要在配置表中添加新表的信息即可,即我們只需要維護(hù)這個(gè)配置表即可,程序不需要?jiǎng)?,這樣大大的提升了開發(fā)成本和維護(hù)效率。over了。。。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容