??自從Flink出了FlinkCDC之后,我們對(duì)數(shù)據(jù)庫(kù)日志的采集就變得方便了許多了,除去了MaxWell、Cannel、OGG等第三方組件的繁瑣配置,目前實(shí)現(xiàn)CDC有兩種方式:HQL實(shí)現(xiàn) 和 DataStreamAPI實(shí)現(xiàn)(推薦)。
??想更深入的了解CDC可以通過此鏈接進(jìn)行學(xué)習(xí):
??1. 文檔 -> https://ververica.github.io/flink-cdc-connectors/master/
??2. 項(xiàng)目 -> https://github.com/ververica/flink-cdc-connectors
??廢話不多說,今天我們就使用 FlinkCDC 對(duì)業(yè)務(wù)數(shù)據(jù)進(jìn)行動(dòng)態(tài)的分流的實(shí)現(xiàn)。
一、動(dòng)態(tài)分流
??由于FlinkCDC是把全部數(shù)據(jù)統(tǒng)一寫入一個(gè)Topic中, 這樣顯然不利于日后的數(shù)據(jù)處理。所以需要把各個(gè)表拆開處理。但是由于每個(gè)表有不同的特點(diǎn),有些表是維度表,有些表是事實(shí)表。
??在實(shí)時(shí)計(jì)算中一般把維度數(shù)據(jù)寫入存儲(chǔ)容器,一般是方便通過主鍵查詢的數(shù)據(jù)庫(kù)比如HBase,Redis,MySQL等。一般把事實(shí)數(shù)據(jù)寫入流中,進(jìn)行進(jìn)一步處理,最終形成寬表。
??這樣的配置不適合寫在配置文件中,因?yàn)檫@樣的話,業(yè)務(wù)端隨著需求變化每增加一張表,就要修改配置重啟計(jì)算程序。所以這里需要一種動(dòng)態(tài)配置方案,把這種配置長(zhǎng)期保存起來,一旦配置有變化,實(shí)時(shí)計(jì)算可以自動(dòng)感知。
二、實(shí)現(xiàn)流程圖

從圖中我們可以看出,把分好的流保存到對(duì)應(yīng)表、主題中:
1)業(yè)務(wù)數(shù)據(jù)保存到Kafka的主題中
2)維度數(shù)據(jù)保存到HBase的表中
三、代碼實(shí)現(xiàn)
3.1)引入 pom.xml 主要的依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
<!-- 如果保存檢查點(diǎn)到 hdfs上,需要引入此依賴 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--phoenix-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.7.0-HBase-1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
3.2)主要邏輯代碼
public static void main(String[] args) throws Exception {
//TODO 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 設(shè)置CK&狀態(tài)后端
//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
//env.enableCheckpointing(5000L);
//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//TODO 2.消費(fèi)Kafka ods_base_db 主題數(shù)據(jù)創(chuàng)建流
String sourceTopic = "ods_base_db";
String groupId = "base_db_app_211212";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.將每行數(shù)據(jù)轉(zhuǎn)換為JSON對(duì)象并過濾(delete) 主流
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
//取出數(shù)據(jù)的操作類型
String type = value.getString("type");
return !"delete".equals(type);
}
});
//TODO 4.使用FlinkCDC消費(fèi)配置表并處理成 廣播流
SourceFunction<String> mySqlSource = MySqlSource.<String>builder()
.hostname("your hostname")
.port(3306)
.databaseList("lcy_db") // monitor all tables under inventory database
.tableList("lcy_db.table_process")
.username("luchangyin")
.password("123456")
.deserializer(new MyStringDeserializationSchema()) // 自定義:converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial()) // .initial() latest
.build();
DataStream<String> tbProcessDStream = env.addSource(mySqlSource).name("source-cdc-table_process");
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = tbProcessDStream.broadcast(mapStateDescriptor);
//TODO 5.連接主流和廣播流
BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6.分流 處理數(shù)據(jù) 廣播流數(shù)據(jù),主流數(shù)據(jù)(根據(jù)廣播流數(shù)據(jù)進(jìn)行處理)
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {};
SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new MyTableProcessFunction(hbaseTag, mapStateDescriptor));
//TODO 7.提取Kafka流數(shù)據(jù)和HBase流數(shù)據(jù)
DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
//TODO 8.將Kafka數(shù)據(jù)寫入Kafka主題,將HBase數(shù)據(jù)寫入Phoenix表
kafka.print("Kafka >>>>>>>>");
hbase.print("HBase >>>>>>>");
// ------- 入 HBASE 和 Kafka 的自定義函數(shù)自個(gè)去實(shí)現(xiàn),這里不做講述 ---------
// hbase.addSink(new DimSinkFunction());
// kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
// @Override
// public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long ts) {
// return new ProducerRecord<>(jsonObject.getString("sinkTable"),jsonObject.getString("after").getBytes());
// }
// }));
//TODO 9.啟動(dòng)任務(wù)
env.execute("BaseDBApp");
}
3.3)自定義反序列化類(這里接觸過FlinkCDC都知道,接受過來的日志數(shù)據(jù)格式非常多,因此我們需要自定義獲取我們需要的具體通用數(shù)據(jù)),在 utils 包下:
public class MyStringDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//構(gòu)建結(jié)果對(duì)象
JSONObject result = new JSONObject();
//獲取數(shù)據(jù)庫(kù)名稱&表名稱
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
//獲取數(shù)據(jù)
Struct value = (Struct) sourceRecord.value();
//After
Struct after = value.getStruct("after");
JSONObject data = new JSONObject();
if (after != null) { //delete數(shù)據(jù),則after為null
Schema schema = after.schema();
List<Field> fieldList = schema.fields();
for (int i = 0; i < fieldList.size(); i++) {
Field field = fieldList.get(i);
Object fieldValue = after.get(field);
data.put(field.name(), fieldValue);
}
}
//Before
Struct before = value.getStruct("before");
JSONObject beforeData = new JSONObject();
if (before != null) { //delete數(shù)據(jù),則after為null
Schema schema = before.schema();
List<Field> fieldList = schema.fields();
for (int i = 0; i < fieldList.size(); i++) {
Field field = fieldList.get(i);
Object fieldValue = before.get(field);
beforeData.put(field.name(), fieldValue);
}
}
//獲取時(shí)間
// long ts = Long.parseLong(value.get("ts_ms").toString());
// String dt = MyDateUtils.getTimestamp2Fm(ts);
//獲取操作類型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//封裝數(shù)據(jù)
result.put("database", database);
result.put("tableName", tableName);
result.put("after", data);
result.put("before", beforeData);
result.put("type", type);
// result.put("ts", ts);
// result.put("dt", dt);
//輸出封裝好的數(shù)據(jù)
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
3.4)創(chuàng)建實(shí)體類 TableProcess (對(duì)應(yīng)我們的通用配置表):
@Data
public class TableProcess {
//動(dòng)態(tài)分流Sink常量
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//來源表
String sourceTable;
//操作類型 insert,update,delete
String operateType;
//輸出類型 hbase kafka
String sinkType;
//輸出表(主題)
String sinkTable;
//輸出字段
String sinkColumns;
//主鍵字段
String sinkPk;
//建表擴(kuò)展
String sinkExtend;
}
3.5)創(chuàng)建合并流的處理類 MyTableProcessFunction :
public class MyTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private OutputTag<JSONObject> objectOutputTag;
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
private Connection connection;
public MyTableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.objectOutputTag = objectOutputTag;
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void open(Configuration parameters) {
try {
// System.out.println("鳳凰驅(qū)動(dòng):"+ ShoppingConfig.PHOENIX_DRIVER +" , 鳳凰服務(wù):"+ ShoppingConfig.PHOENIX_SERVER);
Class.forName(ShoppingConfig.PHOENIX_DRIVER);
connection = DriverManager.getConnection(ShoppingConfig.PHOENIX_SERVER);
// System.out.println("鏈接鳳凰的對(duì)象為 -> "+ connection);
} catch (Exception e) {
System.out.println("鏈接鳳凰失敗-> "+ e.toString());
e.printStackTrace();
}
}
@Override
public void processBroadcastElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
//TODO 1.獲取并解析數(shù)據(jù)
JSONObject jsonObject = JSON.parseObject(s);
String data = jsonObject.getString("after");
TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
//TODO 2.建表
if(TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())){
checkTable(tableProcess.getSinkTable(),
tableProcess.getSinkColumns(),
tableProcess.getSinkPk(),
tableProcess.getSinkExtend());
}
//TODO 3.寫入狀態(tài),廣播出去
BroadcastState<String,TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
String key = tableProcess.getSourceTable() +"-"+ tableProcess.getOperateType();
broadcastState.put(key, tableProcess);
}
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
//TODO 1.獲取狀態(tài)數(shù)據(jù)
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String key = value.getString("tableName") +"-"+ value.getString("type");
TableProcess tableProcess = broadcastState.get(key);
if (tableProcess != null){
//TODO 2.過濾字段
filterColumn(value.getJSONObject("after"), tableProcess.getSinkColumns());
//TODO 3.分流
//將輸出表/主題信息寫入Value
value.put("sinkTable",tableProcess.getSinkTable());
String sinkType = tableProcess.getSinkType();
if(TableProcess.SINK_TYPE_KAFKA.equals(sinkType)){
//Kafka數(shù)據(jù),寫入主流
out.collect(value);
}else if(TableProcess.SINK_TYPE_HBASE.equals(sinkType)){
//HBase數(shù)據(jù),寫入側(cè)輸出流
ctx.output(objectOutputTag, value);
}
}else {
System.out.println("該組合key:"+ key +"不存在!");
}
}
private void filterColumn(JSONObject data, String sinkColumns) {
String[] fields = sinkColumns.split(",");
List<String> columns = Arrays.asList(fields);
data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
}
//建表語句 : create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx;
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try{
if (sinkPk == null){
sinkPk = "id";
}
if(sinkExtend == null){
sinkExtend = "";
}
StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
.append(ShoppingConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append("(");
String[] fields = sinkColumns.split(",");
for(int i = 0; i < fields.length; i++){
String field = fields[i];
//判斷是否為主鍵
if (sinkPk.equals(field)){
createTableSQL.append(field).append(" varchar primary key ");
}else{
createTableSQL.append(field).append(" varchar ");
}
//判斷是否為最后一個(gè)字段,如果不是,則添加","
if (i < fields.length - 1){
createTableSQL.append(",");
}
}
createTableSQL.append(")").append(sinkExtend);
//打印建表語句
System.out.println(createTableSQL);
//預(yù)編譯
preparedStatement = connection.prepareStatement(createTableSQL.toString());
//執(zhí)行
preparedStatement.execute();
}catch (SQLException e){
throw new RuntimeException("Phoenix表"+ sinkTable +"建表失敗!");
}finally {
if (preparedStatement != null){
try{
preparedStatement.close();
}catch (SQLException e){
e.printStackTrace();
}
}
}
}
}
3.6)在MySQL中創(chuàng)建我們的配置表
-- 配置表
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '來源表 ',
`operate_type` varchar(200) NOT NULL COMMENT '操作類型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '輸出類型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '輸出表 (主題 )',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '輸出字段 ',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主鍵字段 ',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表擴(kuò)展 ',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('order_detail','insert','hbase','order_detail','id,order_id,sku_name,sku_num,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('user_info','read','hbase','user_info','id,name,phone_num,user_level,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('base_attr_info','read','kafka','base_attr_info','id,attr_name,category_id');
select * from table_process;
四、聯(lián)調(diào)測(cè)試
4.1)部分kafka源數(shù)據(jù):
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"鐘表","id":10},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"鞋靴","id":11},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"母嬰","id":12},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"禮品箱包","id":13},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"食品飲料、保健食品","id":14},"type":"read","tableName":"base_category1"}
kafka-source 的數(shù)據(jù) :> {"database":"lcy_db","before":{},"after":{"name":"珠寶","id":15},"type":"read","tableName":"base_category1"}
4.2)配置表部分?jǐn)?shù)據(jù):
配置表的數(shù)據(jù) : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"kafka","sink_table":"base_attr_info","source_table":"base_attr_info","sink_columns":"id,attr_name,category_id"},"type":"read","tableName":"table_process"}
配置表的數(shù)據(jù) : > {"database":"lcy_db","before":{},"after":{"operate_type":"insert","sink_type":"hbase","sink_table":"order_detail","source_table":"order_detail","sink_columns":"id,order_id,sku_name,sku_num,create_time"},"type":"read","tableName":"table_process"}
配置表的數(shù)據(jù) : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"hbase","sink_table":"user_info","source_table":"user_info","sink_columns":"id,name,phone_num,user_level,create_time"},"type":"read","tableName":"table_process"}
4.3)分流結(jié)果數(shù)據(jù):
# 入hbase的維表數(shù)據(jù)
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"時(shí)心邦","user_level":"1","phone_num":"13794339138","id":3998},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"宋厚慶","user_level":"1","phone_num":"13274778653","id":3999},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"康素云","user_level":"1","phone_num":"13739911376","id":4000},"type":"read","tableName":"user_info"}
# 入kafka的事實(shí)表數(shù)據(jù)
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"香水彩妝","id":112},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"面部護(hù)膚","id":113},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":473,"attr_name":"香調(diào)","id":114},"type":"read","tableName":"base_attr_info"}
五、總結(jié)
??到這里我們已經(jīng)完成了動(dòng)態(tài)分流的主題實(shí)現(xiàn),之前要是沒接觸過,說容易也不容易,但是說難也不是很難;之所以總結(jié)這個(gè)動(dòng)態(tài)分流主要是在項(xiàng)目中還是挺重要的,畢竟原業(yè)務(wù)系統(tǒng)日志數(shù)據(jù)過來之后會(huì)統(tǒng)一放在同一個(gè)topic中,即使你在代碼中使用判斷有多少個(gè)業(yè)務(wù)表然后在發(fā)不作業(yè)也行,不過這樣的弊端是如果源業(yè)務(wù)系統(tǒng)有新增表的話必須要添加判斷然后再重新發(fā)布作業(yè),這樣是不利于我們?cè)谏a(chǎn)上的操作的,那么我們的動(dòng)態(tài)分流技術(shù)就可以很好的避免了此類的弊端,如果使用了動(dòng)態(tài)分流,那么如果業(yè)務(wù)表中有新增數(shù)據(jù),我們只需要在配置表中添加新表的信息即可,即我們只需要維護(hù)這個(gè)配置表即可,程序不需要?jiǎng)?,這樣大大的提升了開發(fā)成本和維護(hù)效率。over了。。。