Flink系列 - 實(shí)時(shí)數(shù)倉(cāng)之ETL實(shí)戰(zhàn)(二)

一、概述

??上一篇實(shí)戰(zhàn)中我們已經(jīng)使用ogg實(shí)現(xiàn)了mysql數(shù)據(jù)以json的格式同步到了kafka里邊去了,也就是說我們的源端的埋點(diǎn)的數(shù)據(jù)已經(jīng)處理好咯;那么接下來(lái)我們就可以使用 Flink 開始對(duì)數(shù)據(jù)源進(jìn)行處理計(jì)算,當(dāng)然這里值得一提的是:ogg 同步過來(lái)的json數(shù)據(jù)格式是嵌套型的,而且我們的數(shù)據(jù)不是想普通的網(wǎng)站日志那么簡(jiǎn)單,因?yàn)榈脑磾?shù)據(jù)是從數(shù)據(jù)庫(kù)中過來(lái)的-會(huì)涉及到增、刪、改,因此我們要對(duì)剛從源庫(kù)中過來(lái)的數(shù)據(jù)進(jìn)行簡(jiǎn)單的 ETL 處理。廢話不多說,先看下數(shù)據(jù)格式是長(zhǎng)啥樣的:

# 添加
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.796000","pos":"00000000010000036968","after":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.797000","pos":"00000000010000037147","after":{"EID":"102","ENAME":"siling","ESAL":1234.12}}
# 修改
{"table":"bms_st.employees","op_type":"U","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.247000","pos":"00000000010000037501","before":{"EID":"102","ENAME":"siling","ESAL":1234.12},"after":{"EID":"102","ENAME":"sunsiling","ESAL":1000.00}}
# 刪除
{"table":"bms_st.employees","op_type":"D","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.248000","pos":"00000000010000037636","before":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}

??從數(shù)據(jù)格式中可以看得出:op_type 是我們對(duì)數(shù)據(jù)源的增刪改的標(biāo)志,真正的數(shù)據(jù)是在 after 或者 before 的值里邊的。接下來(lái)我們將用 Flink 對(duì)這些數(shù)據(jù)進(jìn)行 ETL處理 并發(fā)往 kafka 供下一層數(shù)倉(cāng)計(jì)算使用:

二、項(xiàng)目結(jié)構(gòu)
image.png

mmain: 程序入口
utils:工具類
entity:實(shí)體類
commonbase:抽象父類
achieve:實(shí)現(xiàn)類

三、項(xiàng)目的實(shí)現(xiàn)
3.1 靜態(tài)的資源文件,用于配置信息 application.properties:
# source kafka config
PJbtServers1: cdh101:9092,cdh102:9092,cdh103:9092
PJgroupId1: test
PJoffsetReset1: latest
PJtopicStr1: piaoju-topic

# sink kafka config
pj-BtServers2: cdh101:9092,cdh102:9092,cdh103:9092
pj-ZkStr2: cdh101:2181,cdh102:2181,cdh102:2181
pj-GroupId2: test
pj-OffsetReset2: latest
pj-TopicStr2: piaoju-to-kafka-topic

# ---------------------------------------------------------------------------------------------------------

# 員工日增薪資
employee_tb_name: bms_st.employees
employee_job_name: EmployeeSource
#employee_create_table: employee_money
#employee_row_col: tb_name VARCHAR, op_type VARCHAR, ts VARCHAR, eId VARCHAR, eName VARCHAR, eSal VARCHAR

3.2 在 utils目錄 下創(chuàng)建獲取以上文件信息值的類 LoadPropertiesFile.java:
import java.io.InputStream;
import java.util.Properties;
/**
 * @author feiniu
 * @create 2020-03-26 9:37
 */
public class LoadPropertiesFile {

    public static String getPropertyFileValues(String proKey){
        String proStr = "";
        try {
            //讀取配置文件
            InputStream is = LoadPropertiesFile.class.getClassLoader().getResourceAsStream("application.properties");
            Properties properties = new Properties();
            properties.load(is);
            proStr = properties.getProperty(proKey);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
        return proStr;
    }

}

3.3 commonbase 目錄下創(chuàng)建抽象類 對(duì)接kafka的數(shù)據(jù),并解析關(guān)鍵字段,代碼架構(gòu)如下:
package com.nfdwsyy.commonbase;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.utils.LoadPropertiesFile;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.text.ParseException;
import java.util.Properties;
/**
 * @author feiniu
 * @create 2020-04-03 20:47
 */
public abstract class SourceCommonBase {

    public void getDataStream(String jobName) throws Exception {

        // 1. 環(huán)境的設(shè)置

       // 2.資源配置文件信息的獲取
      
       // 3.消費(fèi)者接收數(shù)據(jù)并做json的簡(jiǎn)要解析
       
       // 4.抽象方法的設(shè)置

}

  1. 環(huán)境的設(shè)置:
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 開啟 Checkpoint,每 1000毫秒進(jìn)行一次 Checkpoint
        env.enableCheckpointing(1000);
        // Checkpoint 語(yǔ)義設(shè)置為 EXACTLY_ONCE
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        // CheckPoint 的超時(shí)時(shí)間
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 同一時(shí)間,只允許 有 1 個(gè) Checkpoint 在發(fā)生
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        // 兩次 Checkpoint 之間的最小時(shí)間間隔為 500 毫秒
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        // 當(dāng) Flink 任務(wù)取消時(shí),保留外部保存的 CheckPoint 信息
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 當(dāng)有較新的 Savepoint 時(shí),作業(yè)也會(huì)從 Checkpoint 處恢復(fù)
        env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
        // 作業(yè)最多允許 Checkpoint 失敗 1 次(flink 1.9 開始支持)
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);

2.資源配置文件信息的獲?。?/p>

        // 獲取資源配置文件信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", LoadPropertiesFile.getPropertyFileValues("PJbtServers1"));
        properties.setProperty("group.id", LoadPropertiesFile.getPropertyFileValues("PJgroupId1"));
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", LoadPropertiesFile.getPropertyFileValues("PJoffsetReset1")); //value 反序列化

3.消費(fèi)者接收數(shù)據(jù)并做json的簡(jiǎn)要解析:

        FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
                LoadPropertiesFile.getPropertyFileValues("PJtopicStr1"),
                new SimpleStringSchema(),
                properties);

        DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);

        // prase json
        DataStream<String> mStream = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                String table = jsonObject.getString("table");
                String op_type = jsonObject.getString("op_type");
                String op_ts = jsonObject.getString("op_ts");
                String before = jsonObject.getString("before");
                String after = jsonObject.getString("after");

                String resultStr = parseSourceKafkaJson(table,op_type,op_ts,before,after);
                return resultStr;
            }
        });

        // let chirld etl to kafka
        sendToSinkKafka(mStream);

        env.execute(jobName);

4.抽象方法的設(shè)置:

    // let chirld class do it
    public abstract String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException;

    // sink to kafka
    public abstract void sendToSinkKafka(DataStream<String> mStream);

3.4 achieve下創(chuàng)建實(shí)現(xiàn)類,用于對(duì)數(shù)據(jù)進(jìn)行 ETL 處理,類的架構(gòu)設(shè)計(jì)如下:
package com.nfdwsyy.achieve;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.commonbase.SourceCommonBase;
import com.nfdwsyy.entity.Employee;
import com.nfdwsyy.utils.LoadPropertiesFile;
import com.nfdwsyy.utils.MySinkKafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.io.Serializable;
import java.text.ParseException;

/**
 * @author feiniu
 * @create 2020-07-23 10:12
 */
public class EmpSourceAchi extends SourceCommonBase implements Serializable {

    @Override
    public String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException {

        // 1.數(shù)據(jù)的 ETL 處理   (這里根據(jù)實(shí)際情況而定)
   
    }

    @Override
    public void sendToSinkKafka(DataStream<String> mStream) {
        
           // 2.將處理完之后的數(shù)據(jù)發(fā)往 kafka 隊(duì)列 供下游計(jì)算使用
        
    }

    // 3. 調(diào)用父類的處理方法,供主類調(diào)用
    
}

1.數(shù)據(jù)的 ETL 處理:

        String eId = "";
        String eName = "";
        double eSal = 0;
        double after_money = 0;
        double before_money = 0;

        JSONObject jObjBefore = JSON.parseObject(before);
        JSONObject jObjAfter = JSON.parseObject(after);

        System.out.println("在 parseSourceKafkaJson 方法中,table -> "+ table +" , op_type -> "+ op_type +" , op_ts -> "+ op_ts +" , before -> "+ before + " , after -> "+ after);

        String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
        Employee employee = null;
        if (StringUtil.isNullOrEmpty(op_type) || StringUtil.isNullOrEmpty(table)){
            System.out.println("獲取的類型為空哦-> "+ op_type);
        }else if (table.equals(tb_name)){
            switch (op_type){
                case "I":
                    eId = jObjAfter.getString("EID");
                    eName = jObjAfter.getString("ENAME");
                    eSal = Double.parseDouble(jObjAfter.getString("ESAL"));
                    break;
                case "U":
                    eId = jObjAfter.getString("EID");
                    eName = jObjAfter.getString("ENAME");
                    after_money = Double.valueOf(jObjAfter.getString("ESAL"));
                    before_money = Double.valueOf(jObjBefore.getString("ESAL"));
                    eSal = after_money - before_money;
                    break;
                case "D":
                    eId = jObjBefore.getString("EID");
                    eName = jObjBefore.getString("ENAME");
                    eSal = Double.parseDouble("-"+ jObjBefore.getString("ESAL"));
                    break;
            }

            employee = new Employee(tb_name, op_type, op_ts, eId, eName, eSal);
        }

        // the entity must have tb_name
        return JSONObject.toJSONString(employee);

2.將處理完之后的數(shù)據(jù)發(fā)往 kafka :

        DataStream<String> mS = mStream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String s) throws Exception {
                if (StringUtil.isNullOrEmpty(s)){
                    return false;
                } else {
                    return true;

                }
            }
        });

        String broker_list = LoadPropertiesFile.getPropertyFileValues("pj-BtServers2");
        String topic = LoadPropertiesFile.getPropertyFileValues("pj-TopicStr2");
        String groupId = LoadPropertiesFile.getPropertyFileValues("pj-GroupId2");
        String offsetReset = LoadPropertiesFile.getPropertyFileValues("pj-OffsetReset2");
        // the entity must have tb_name
        String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
       // 發(fā)往 Kafka 的自定義類
        mS.addSink(new MySinkKafka(broker_list, topic, groupId, offsetReset, tb_name)).name("employee_tb_name");

  1. 調(diào)用父類的處理方法,供主類調(diào)用 :
    // transfer the parent method
    public void successKafka2KafkaMethod(){

        try {
            String jobName = LoadPropertiesFile.getPropertyFileValues("employee_job_name");
            getDataStream(jobName +" Source");
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

??到這里整體上算是弄完了,但是要注意的一點(diǎn)是數(shù)據(jù)發(fā)往 kafka 的類是需要我們?nèi)プ远x的,接下來(lái)我們?cè)偃?chuàng)建一個(gè)數(shù)據(jù)發(fā)往 kafka 的工具類:

package com.nfdwsyy.utils;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

/**
 * @author feiniu
 * @create 2020-04-04 10:29
 */
public class MySinkKafka extends RichSinkFunction<String> {

    private Properties props = null;
    private KafkaProducer producer = null;
    private ProducerRecord record = null;

    private String broker_list;
    private String topic;
    private String groupId;
    private String offsetReset;
    private String sourceTbName;

    public MySinkKafka(String broker_list, String topic, String groupId, String offsetReset, String sourceTbName) {
        this.broker_list = broker_list;
        this.topic = topic;
        this.groupId = groupId;
        this.offsetReset = offsetReset;
        this.sourceTbName = sourceTbName;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("group.id", groupId);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        props.put("auto.offset.reset", offsetReset); //value 反序列化

        producer = new KafkaProducer<String, String>(props);

    }

    @Override
    public void invoke(String value, Context context) {

        if(value.equals("") || value.equals("null")) {
            System.out.println("Sink 中 invoke 方法過來(lái)的字符串值-> "+ value);
        } else {
            JSONObject jObjNew = JSON.parseObject(value);
            String tb_name = jObjNew.getString("tb_name");

            System.out.println("表明對(duì)比 -> " + tb_name + " --- " + sourceTbName);

            if (tb_name.equals(sourceTbName)) {
                record = new ProducerRecord<String, String>(topic, null, null, value);
                producer.send(record);
                System.out.println("發(fā)送數(shù)據(jù): " + value);
                producer.flush();
            }
        }

    }

    @Override
    public void close() throws Exception {
        super.close();
    }

}

  1. 創(chuàng)建主類,調(diào)用ETL方法:
package com.nfdwsyy.mmain;

import com.nfdwsyy.achieve.EmpSourceAchi;

/**
 * @author feiniu
 * @create 2020-07-23 10:55
 */
public class EmployeeMain01 {

    public static void main(String[] args){

        EmpSourceAchi empAchi = new EmpSourceAchi();
        empAchi.successKafka2KafkaMethod();

    }

}

??好了,全部代碼都寫完了,接下來(lái)我們可以去測(cè)試使用咯。

四、本地測(cè)試 并 打包部署上 yarn
4.1 本地測(cè)試

??運(yùn)行程序之后對(duì)數(shù)據(jù)庫(kù)的源表進(jìn)行增刪改,即可在控制臺(tái)看到發(fā)往kafka的數(shù)據(jù),這里不做本地測(cè)試。

4.2 部署上 yarn 服務(wù)器

打包并上傳至服務(wù)器的指定目錄,然后執(zhí)行如下命令部署應(yīng)用:


bin/flink run -m yarn-cluster -ynm oggsyncflinkjob -d -c com.nfdwsyy.mmain.EmployeeMain01 /opt/mycdhflink/myjar/Kafka2FlinkETL2Kafka.jar

這時(shí)候我們可以在頁(yè)面上部署情況了:
image.png

image.png

接下來(lái)我們?cè)賳?dòng)接收ETL之后的消費(fèi)者:

bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic piaoju-to-kafka-topic --from-beginning

源庫(kù)中對(duì)表數(shù)據(jù)操作:
image.png

處理過后的數(shù)據(jù)如下圖:
image.png

??從處理結(jié)果的數(shù)據(jù)看來(lái),其實(shí)它已經(jīng)變成是一個(gè)處理過增刪改操作之后最簡(jiǎn)單的 json串了,那么至于如果對(duì)這些處理過后的數(shù)據(jù)進(jìn)行計(jì)算如聚合等那都是小菜一碟了;原創(chuàng)不易,轉(zhuǎn)載必須注明出處;欲知如何計(jì)算,請(qǐng)看下回分曉,哈哈哈哈。。。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容