一、概述
??上一篇實(shí)戰(zhàn)中我們已經(jīng)使用ogg實(shí)現(xiàn)了mysql數(shù)據(jù)以json的格式同步到了kafka里邊去了,也就是說我們的源端的埋點(diǎn)的數(shù)據(jù)已經(jīng)處理好咯;那么接下來(lái)我們就可以使用 Flink 開始對(duì)數(shù)據(jù)源進(jìn)行處理計(jì)算,當(dāng)然這里值得一提的是:ogg 同步過來(lái)的json數(shù)據(jù)格式是嵌套型的,而且我們的數(shù)據(jù)不是想普通的網(wǎng)站日志那么簡(jiǎn)單,因?yàn)榈脑磾?shù)據(jù)是從數(shù)據(jù)庫(kù)中過來(lái)的-會(huì)涉及到增、刪、改,因此我們要對(duì)剛從源庫(kù)中過來(lái)的數(shù)據(jù)進(jìn)行簡(jiǎn)單的 ETL 處理。廢話不多說,先看下數(shù)據(jù)格式是長(zhǎng)啥樣的:
# 添加
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.796000","pos":"00000000010000036968","after":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}
{"table":"bms_st.employees","op_type":"I","op_ts":"2012-04-12 14:23:13.177344","current_ts":"2012-04-12T14:23:19.797000","pos":"00000000010000037147","after":{"EID":"102","ENAME":"siling","ESAL":1234.12}}
# 修改
{"table":"bms_st.employees","op_type":"U","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.247000","pos":"00000000010000037501","before":{"EID":"102","ENAME":"siling","ESAL":1234.12},"after":{"EID":"102","ENAME":"sunsiling","ESAL":1000.00}}
# 刪除
{"table":"bms_st.employees","op_type":"D","op_ts":"2012-04-12 14:24:37.789810","current_ts":"2012-04-12T14:24:44.248000","pos":"00000000010000037636","before":{"EID":"101","ENAME":"changyin","ESAL":6666.66}}
??從數(shù)據(jù)格式中可以看得出:op_type 是我們對(duì)數(shù)據(jù)源的增刪改的標(biāo)志,真正的數(shù)據(jù)是在 after 或者 before 的值里邊的。接下來(lái)我們將用 Flink 對(duì)這些數(shù)據(jù)進(jìn)行 ETL處理 并發(fā)往 kafka 供下一層數(shù)倉(cāng)計(jì)算使用:
二、項(xiàng)目結(jié)構(gòu)

mmain: 程序入口
utils:工具類
entity:實(shí)體類
commonbase:抽象父類
achieve:實(shí)現(xiàn)類
三、項(xiàng)目的實(shí)現(xiàn)
3.1 靜態(tài)的資源文件,用于配置信息 application.properties:
# source kafka config
PJbtServers1: cdh101:9092,cdh102:9092,cdh103:9092
PJgroupId1: test
PJoffsetReset1: latest
PJtopicStr1: piaoju-topic
# sink kafka config
pj-BtServers2: cdh101:9092,cdh102:9092,cdh103:9092
pj-ZkStr2: cdh101:2181,cdh102:2181,cdh102:2181
pj-GroupId2: test
pj-OffsetReset2: latest
pj-TopicStr2: piaoju-to-kafka-topic
# ---------------------------------------------------------------------------------------------------------
# 員工日增薪資
employee_tb_name: bms_st.employees
employee_job_name: EmployeeSource
#employee_create_table: employee_money
#employee_row_col: tb_name VARCHAR, op_type VARCHAR, ts VARCHAR, eId VARCHAR, eName VARCHAR, eSal VARCHAR
3.2 在 utils目錄 下創(chuàng)建獲取以上文件信息值的類 LoadPropertiesFile.java:
import java.io.InputStream;
import java.util.Properties;
/**
* @author feiniu
* @create 2020-03-26 9:37
*/
public class LoadPropertiesFile {
public static String getPropertyFileValues(String proKey){
String proStr = "";
try {
//讀取配置文件
InputStream is = LoadPropertiesFile.class.getClassLoader().getResourceAsStream("application.properties");
Properties properties = new Properties();
properties.load(is);
proStr = properties.getProperty(proKey);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
return proStr;
}
}
3.3 commonbase 目錄下創(chuàng)建抽象類 對(duì)接kafka的數(shù)據(jù),并解析關(guān)鍵字段,代碼架構(gòu)如下:
package com.nfdwsyy.commonbase;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.utils.LoadPropertiesFile;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import java.text.ParseException;
import java.util.Properties;
/**
* @author feiniu
* @create 2020-04-03 20:47
*/
public abstract class SourceCommonBase {
public void getDataStream(String jobName) throws Exception {
// 1. 環(huán)境的設(shè)置
// 2.資源配置文件信息的獲取
// 3.消費(fèi)者接收數(shù)據(jù)并做json的簡(jiǎn)要解析
// 4.抽象方法的設(shè)置
}
- 環(huán)境的設(shè)置:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟 Checkpoint,每 1000毫秒進(jìn)行一次 Checkpoint
env.enableCheckpointing(1000);
// Checkpoint 語(yǔ)義設(shè)置為 EXACTLY_ONCE
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// CheckPoint 的超時(shí)時(shí)間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 同一時(shí)間,只允許 有 1 個(gè) Checkpoint 在發(fā)生
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 兩次 Checkpoint 之間的最小時(shí)間間隔為 500 毫秒
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 當(dāng) Flink 任務(wù)取消時(shí),保留外部保存的 CheckPoint 信息
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 當(dāng)有較新的 Savepoint 時(shí),作業(yè)也會(huì)從 Checkpoint 處恢復(fù)
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
// 作業(yè)最多允許 Checkpoint 失敗 1 次(flink 1.9 開始支持)
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
2.資源配置文件信息的獲?。?/p>
// 獲取資源配置文件信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", LoadPropertiesFile.getPropertyFileValues("PJbtServers1"));
properties.setProperty("group.id", LoadPropertiesFile.getPropertyFileValues("PJgroupId1"));
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //key 反序列化
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", LoadPropertiesFile.getPropertyFileValues("PJoffsetReset1")); //value 反序列化
3.消費(fèi)者接收數(shù)據(jù)并做json的簡(jiǎn)要解析:
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer011<>(
LoadPropertiesFile.getPropertyFileValues("PJtopicStr1"),
new SimpleStringSchema(),
properties);
DataStream<String> stream = env.addSource(myConsumer).setParallelism(1);
// prase json
DataStream<String> mStream = stream.map(new MapFunction<String, String>() {
@Override
public String map(String s) throws Exception {
JSONObject jsonObject = JSON.parseObject(s);
String table = jsonObject.getString("table");
String op_type = jsonObject.getString("op_type");
String op_ts = jsonObject.getString("op_ts");
String before = jsonObject.getString("before");
String after = jsonObject.getString("after");
String resultStr = parseSourceKafkaJson(table,op_type,op_ts,before,after);
return resultStr;
}
});
// let chirld etl to kafka
sendToSinkKafka(mStream);
env.execute(jobName);
4.抽象方法的設(shè)置:
// let chirld class do it
public abstract String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException;
// sink to kafka
public abstract void sendToSinkKafka(DataStream<String> mStream);
3.4 achieve下創(chuàng)建實(shí)現(xiàn)類,用于對(duì)數(shù)據(jù)進(jìn)行 ETL 處理,類的架構(gòu)設(shè)計(jì)如下:
package com.nfdwsyy.achieve;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.nfdwsyy.commonbase.SourceCommonBase;
import com.nfdwsyy.entity.Employee;
import com.nfdwsyy.utils.LoadPropertiesFile;
import com.nfdwsyy.utils.MySinkKafka;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;
import org.apache.flink.streaming.api.datastream.DataStream;
import java.io.Serializable;
import java.text.ParseException;
/**
* @author feiniu
* @create 2020-07-23 10:12
*/
public class EmpSourceAchi extends SourceCommonBase implements Serializable {
@Override
public String parseSourceKafkaJson(String table, String op_type, String op_ts, String before, String after) throws ParseException {
// 1.數(shù)據(jù)的 ETL 處理 (這里根據(jù)實(shí)際情況而定)
}
@Override
public void sendToSinkKafka(DataStream<String> mStream) {
// 2.將處理完之后的數(shù)據(jù)發(fā)往 kafka 隊(duì)列 供下游計(jì)算使用
}
// 3. 調(diào)用父類的處理方法,供主類調(diào)用
}
1.數(shù)據(jù)的 ETL 處理:
String eId = "";
String eName = "";
double eSal = 0;
double after_money = 0;
double before_money = 0;
JSONObject jObjBefore = JSON.parseObject(before);
JSONObject jObjAfter = JSON.parseObject(after);
System.out.println("在 parseSourceKafkaJson 方法中,table -> "+ table +" , op_type -> "+ op_type +" , op_ts -> "+ op_ts +" , before -> "+ before + " , after -> "+ after);
String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
Employee employee = null;
if (StringUtil.isNullOrEmpty(op_type) || StringUtil.isNullOrEmpty(table)){
System.out.println("獲取的類型為空哦-> "+ op_type);
}else if (table.equals(tb_name)){
switch (op_type){
case "I":
eId = jObjAfter.getString("EID");
eName = jObjAfter.getString("ENAME");
eSal = Double.parseDouble(jObjAfter.getString("ESAL"));
break;
case "U":
eId = jObjAfter.getString("EID");
eName = jObjAfter.getString("ENAME");
after_money = Double.valueOf(jObjAfter.getString("ESAL"));
before_money = Double.valueOf(jObjBefore.getString("ESAL"));
eSal = after_money - before_money;
break;
case "D":
eId = jObjBefore.getString("EID");
eName = jObjBefore.getString("ENAME");
eSal = Double.parseDouble("-"+ jObjBefore.getString("ESAL"));
break;
}
employee = new Employee(tb_name, op_type, op_ts, eId, eName, eSal);
}
// the entity must have tb_name
return JSONObject.toJSONString(employee);
2.將處理完之后的數(shù)據(jù)發(fā)往 kafka :
DataStream<String> mS = mStream.filter(new FilterFunction<String>() {
@Override
public boolean filter(String s) throws Exception {
if (StringUtil.isNullOrEmpty(s)){
return false;
} else {
return true;
}
}
});
String broker_list = LoadPropertiesFile.getPropertyFileValues("pj-BtServers2");
String topic = LoadPropertiesFile.getPropertyFileValues("pj-TopicStr2");
String groupId = LoadPropertiesFile.getPropertyFileValues("pj-GroupId2");
String offsetReset = LoadPropertiesFile.getPropertyFileValues("pj-OffsetReset2");
// the entity must have tb_name
String tb_name = LoadPropertiesFile.getPropertyFileValues("employee_tb_name");
// 發(fā)往 Kafka 的自定義類
mS.addSink(new MySinkKafka(broker_list, topic, groupId, offsetReset, tb_name)).name("employee_tb_name");
- 調(diào)用父類的處理方法,供主類調(diào)用 :
// transfer the parent method
public void successKafka2KafkaMethod(){
try {
String jobName = LoadPropertiesFile.getPropertyFileValues("employee_job_name");
getDataStream(jobName +" Source");
} catch (Exception e) {
e.printStackTrace();
}
}
??到這里整體上算是弄完了,但是要注意的一點(diǎn)是數(shù)據(jù)發(fā)往 kafka 的類是需要我們?nèi)プ远x的,接下來(lái)我們?cè)偃?chuàng)建一個(gè)數(shù)據(jù)發(fā)往 kafka 的工具類:
package com.nfdwsyy.utils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* @author feiniu
* @create 2020-04-04 10:29
*/
public class MySinkKafka extends RichSinkFunction<String> {
private Properties props = null;
private KafkaProducer producer = null;
private ProducerRecord record = null;
private String broker_list;
private String topic;
private String groupId;
private String offsetReset;
private String sourceTbName;
public MySinkKafka(String broker_list, String topic, String groupId, String offsetReset, String sourceTbName) {
this.broker_list = broker_list;
this.topic = topic;
this.groupId = groupId;
this.offsetReset = offsetReset;
this.sourceTbName = sourceTbName;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
props = new Properties();
props.put("bootstrap.servers", broker_list);
props.put("group.id", groupId);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
props.put("auto.offset.reset", offsetReset); //value 反序列化
producer = new KafkaProducer<String, String>(props);
}
@Override
public void invoke(String value, Context context) {
if(value.equals("") || value.equals("null")) {
System.out.println("Sink 中 invoke 方法過來(lái)的字符串值-> "+ value);
} else {
JSONObject jObjNew = JSON.parseObject(value);
String tb_name = jObjNew.getString("tb_name");
System.out.println("表明對(duì)比 -> " + tb_name + " --- " + sourceTbName);
if (tb_name.equals(sourceTbName)) {
record = new ProducerRecord<String, String>(topic, null, null, value);
producer.send(record);
System.out.println("發(fā)送數(shù)據(jù): " + value);
producer.flush();
}
}
}
@Override
public void close() throws Exception {
super.close();
}
}
- 創(chuàng)建主類,調(diào)用ETL方法:
package com.nfdwsyy.mmain;
import com.nfdwsyy.achieve.EmpSourceAchi;
/**
* @author feiniu
* @create 2020-07-23 10:55
*/
public class EmployeeMain01 {
public static void main(String[] args){
EmpSourceAchi empAchi = new EmpSourceAchi();
empAchi.successKafka2KafkaMethod();
}
}
??好了,全部代碼都寫完了,接下來(lái)我們可以去測(cè)試使用咯。
四、本地測(cè)試 并 打包部署上 yarn
4.1 本地測(cè)試
??運(yùn)行程序之后對(duì)數(shù)據(jù)庫(kù)的源表進(jìn)行增刪改,即可在控制臺(tái)看到發(fā)往kafka的數(shù)據(jù),這里不做本地測(cè)試。
4.2 部署上 yarn 服務(wù)器
打包并上傳至服務(wù)器的指定目錄,然后執(zhí)行如下命令部署應(yīng)用:
bin/flink run -m yarn-cluster -ynm oggsyncflinkjob -d -c com.nfdwsyy.mmain.EmployeeMain01 /opt/mycdhflink/myjar/Kafka2FlinkETL2Kafka.jar
這時(shí)候我們可以在頁(yè)面上部署情況了:

接下來(lái)我們?cè)賳?dòng)接收ETL之后的消費(fèi)者:
bin/kafka-console-consumer.sh --bootstrap-server cdh101:9092,cdh102:9092,cdh103:9092 --topic piaoju-to-kafka-topic --from-beginning
源庫(kù)中對(duì)表數(shù)據(jù)操作:

??從處理結(jié)果的數(shù)據(jù)看來(lái),其實(shí)它已經(jīng)變成是一個(gè)處理過增刪改操作之后最簡(jiǎn)單的 json串了,那么至于如果對(duì)這些處理過后的數(shù)據(jù)進(jìn)行計(jì)算如聚合等那都是小菜一碟了;原創(chuàng)不易,轉(zhuǎn)載必須注明出處;欲知如何計(jì)算,請(qǐng)看下回分曉,哈哈哈哈。。。