??自從Flink出了FlinkCDC之后,我們對數據庫日志的采集就變得方便了許多了,除去了MaxWell、Cannel、OGG等第三方組件的繁瑣配置,目前實現CDC有兩種方式:HQL實現 和 DataStreamAPI實現(推薦)。
??想更深入的了解CDC可以通過此鏈接進行學習:
??1. 文檔 -> https://ververica.github.io/flink-cdc-connectors/master/
??2. 項目 -> https://github.com/ververica/flink-cdc-connectors
??廢話不多說,今天我們就使用 FlinkCDC 對業(yè)務數據進行動態(tài)的分流的實現。
一、動態(tài)分流
??由于FlinkCDC是把全部數據統一寫入一個Topic中, 這樣顯然不利于日后的數據處理。所以需要把各個表拆開處理。但是由于每個表有不同的特點,有些表是維度表,有些表是事實表。
??在實時計算中一般把維度數據寫入存儲容器,一般是方便通過主鍵查詢的數據庫比如HBase,Redis,MySQL等。一般把事實數據寫入流中,進行進一步處理,最終形成寬表。
??這樣的配置不適合寫在配置文件中,因為這樣的話,業(yè)務端隨著需求變化每增加一張表,就要修改配置重啟計算程序。所以這里需要一種動態(tài)配置方案,把這種配置長期保存起來,一旦配置有變化,實時計算可以自動感知。
二、實現流程圖

從圖中我們可以看出,把分好的流保存到對應表、主題中:
1)業(yè)務數據保存到Kafka的主題中
2)維度數據保存到HBase的表中
三、代碼實現
3.1)引入 pom.xml 主要的依賴
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
<!-- 如果保存檢查點到 hdfs上,需要引入此依賴 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!--phoenix-->
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.7.0-HBase-1.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>5.0.0-HBase-2.0</version>
<exclusions>
<exclusion>
<groupId>org.glassfish</groupId>
<artifactId>javax.el</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
3.2)主要邏輯代碼
public static void main(String[] args) throws Exception {
//TODO 1.獲取執(zhí)行環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//1.1 設置CK&狀態(tài)后端
//env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/gmall-flink-210325/ck"));
//env.enableCheckpointing(5000L);
//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//env.getCheckpointConfig().setCheckpointTimeout(10000L);
//env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
//env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000);
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart());
//TODO 2.消費Kafka ods_base_db 主題數據創(chuàng)建流
String sourceTopic = "ods_base_db";
String groupId = "base_db_app_211212";
DataStreamSource<String> kafkaDS = env.addSource(MyKafkaUtil.getKafkaConsumer(sourceTopic, groupId));
//TODO 3.將每行數據轉換為JSON對象并過濾(delete) 主流
SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject)
.filter(new FilterFunction<JSONObject>() {
@Override
public boolean filter(JSONObject value) throws Exception {
//取出數據的操作類型
String type = value.getString("type");
return !"delete".equals(type);
}
});
//TODO 4.使用FlinkCDC消費配置表并處理成 廣播流
SourceFunction<String> mySqlSource = MySqlSource.<String>builder()
.hostname("your hostname")
.port(3306)
.databaseList("lcy_db") // monitor all tables under inventory database
.tableList("lcy_db.table_process")
.username("luchangyin")
.password("123456")
.deserializer(new MyStringDeserializationSchema()) // 自定義:converts SourceRecord to JSON String
.startupOptions(StartupOptions.initial()) // .initial() latest
.build();
DataStream<String> tbProcessDStream = env.addSource(mySqlSource).name("source-cdc-table_process");
MapStateDescriptor<String, TableProcess> mapStateDescriptor = new MapStateDescriptor<>("map-state", String.class, TableProcess.class);
BroadcastStream<String> broadcastStream = tbProcessDStream.broadcast(mapStateDescriptor);
//TODO 5.連接主流和廣播流
BroadcastConnectedStream<JSONObject, String> connectedStream = jsonObjDS.connect(broadcastStream);
//TODO 6.分流 處理數據 廣播流數據,主流數據(根據廣播流數據進行處理)
OutputTag<JSONObject> hbaseTag = new OutputTag<JSONObject>("hbase-tag") {};
SingleOutputStreamOperator<JSONObject> kafka = connectedStream.process(new MyTableProcessFunction(hbaseTag, mapStateDescriptor));
//TODO 7.提取Kafka流數據和HBase流數據
DataStream<JSONObject> hbase = kafka.getSideOutput(hbaseTag);
//TODO 8.將Kafka數據寫入Kafka主題,將HBase數據寫入Phoenix表
kafka.print("Kafka >>>>>>>>");
hbase.print("HBase >>>>>>>");
// ------- 入 HBASE 和 Kafka 的自定義函數自個去實現,這里不做講述 ---------
// hbase.addSink(new DimSinkFunction());
// kafka.addSink(MyKafkaUtil.getKafkaProducer(new KafkaSerializationSchema<JSONObject>() {
// @Override
// public ProducerRecord<byte[], byte[]> serialize(JSONObject jsonObject, @Nullable Long ts) {
// return new ProducerRecord<>(jsonObject.getString("sinkTable"),jsonObject.getString("after").getBytes());
// }
// }));
//TODO 9.啟動任務
env.execute("BaseDBApp");
}
3.3)自定義反序列化類(這里接觸過FlinkCDC都知道,接受過來的日志數據格式非常多,因此我們需要自定義獲取我們需要的具體通用數據),在 utils 包下:
public class MyStringDeserializationSchema implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//構建結果對象
JSONObject result = new JSONObject();
//獲取數據庫名稱&表名稱
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
//獲取數據
Struct value = (Struct) sourceRecord.value();
//After
Struct after = value.getStruct("after");
JSONObject data = new JSONObject();
if (after != null) { //delete數據,則after為null
Schema schema = after.schema();
List<Field> fieldList = schema.fields();
for (int i = 0; i < fieldList.size(); i++) {
Field field = fieldList.get(i);
Object fieldValue = after.get(field);
data.put(field.name(), fieldValue);
}
}
//Before
Struct before = value.getStruct("before");
JSONObject beforeData = new JSONObject();
if (before != null) { //delete數據,則after為null
Schema schema = before.schema();
List<Field> fieldList = schema.fields();
for (int i = 0; i < fieldList.size(); i++) {
Field field = fieldList.get(i);
Object fieldValue = before.get(field);
beforeData.put(field.name(), fieldValue);
}
}
//獲取時間
// long ts = Long.parseLong(value.get("ts_ms").toString());
// String dt = MyDateUtils.getTimestamp2Fm(ts);
//獲取操作類型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//封裝數據
result.put("database", database);
result.put("tableName", tableName);
result.put("after", data);
result.put("before", beforeData);
result.put("type", type);
// result.put("ts", ts);
// result.put("dt", dt);
//輸出封裝好的數據
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
3.4)創(chuàng)建實體類 TableProcess (對應我們的通用配置表):
@Data
public class TableProcess {
//動態(tài)分流Sink常量
public static final String SINK_TYPE_HBASE = "hbase";
public static final String SINK_TYPE_KAFKA = "kafka";
public static final String SINK_TYPE_CK = "clickhouse";
//來源表
String sourceTable;
//操作類型 insert,update,delete
String operateType;
//輸出類型 hbase kafka
String sinkType;
//輸出表(主題)
String sinkTable;
//輸出字段
String sinkColumns;
//主鍵字段
String sinkPk;
//建表擴展
String sinkExtend;
}
3.5)創(chuàng)建合并流的處理類 MyTableProcessFunction :
public class MyTableProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
private OutputTag<JSONObject> objectOutputTag;
private MapStateDescriptor<String, TableProcess> mapStateDescriptor;
private Connection connection;
public MyTableProcessFunction(OutputTag<JSONObject> objectOutputTag, MapStateDescriptor<String, TableProcess> mapStateDescriptor) {
this.objectOutputTag = objectOutputTag;
this.mapStateDescriptor = mapStateDescriptor;
}
@Override
public void open(Configuration parameters) {
try {
// System.out.println("鳳凰驅動:"+ ShoppingConfig.PHOENIX_DRIVER +" , 鳳凰服務:"+ ShoppingConfig.PHOENIX_SERVER);
Class.forName(ShoppingConfig.PHOENIX_DRIVER);
connection = DriverManager.getConnection(ShoppingConfig.PHOENIX_SERVER);
// System.out.println("鏈接鳳凰的對象為 -> "+ connection);
} catch (Exception e) {
System.out.println("鏈接鳳凰失敗-> "+ e.toString());
e.printStackTrace();
}
}
@Override
public void processBroadcastElement(String s, Context context, Collector<JSONObject> collector) throws Exception {
//TODO 1.獲取并解析數據
JSONObject jsonObject = JSON.parseObject(s);
String data = jsonObject.getString("after");
TableProcess tableProcess = JSON.parseObject(data, TableProcess.class);
//TODO 2.建表
if(TableProcess.SINK_TYPE_HBASE.equals(tableProcess.getSinkType())){
checkTable(tableProcess.getSinkTable(),
tableProcess.getSinkColumns(),
tableProcess.getSinkPk(),
tableProcess.getSinkExtend());
}
//TODO 3.寫入狀態(tài),廣播出去
BroadcastState<String,TableProcess> broadcastState = context.getBroadcastState(mapStateDescriptor);
String key = tableProcess.getSourceTable() +"-"+ tableProcess.getOperateType();
broadcastState.put(key, tableProcess);
}
@Override
public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
//TODO 1.獲取狀態(tài)數據
ReadOnlyBroadcastState<String, TableProcess> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
String key = value.getString("tableName") +"-"+ value.getString("type");
TableProcess tableProcess = broadcastState.get(key);
if (tableProcess != null){
//TODO 2.過濾字段
filterColumn(value.getJSONObject("after"), tableProcess.getSinkColumns());
//TODO 3.分流
//將輸出表/主題信息寫入Value
value.put("sinkTable",tableProcess.getSinkTable());
String sinkType = tableProcess.getSinkType();
if(TableProcess.SINK_TYPE_KAFKA.equals(sinkType)){
//Kafka數據,寫入主流
out.collect(value);
}else if(TableProcess.SINK_TYPE_HBASE.equals(sinkType)){
//HBase數據,寫入側輸出流
ctx.output(objectOutputTag, value);
}
}else {
System.out.println("該組合key:"+ key +"不存在!");
}
}
private void filterColumn(JSONObject data, String sinkColumns) {
String[] fields = sinkColumns.split(",");
List<String> columns = Arrays.asList(fields);
data.entrySet().removeIf(next -> !columns.contains(next.getKey()));
}
//建表語句 : create table if not exists db.tn(id varchar primary key,tm_name varchar) xxx;
private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
PreparedStatement preparedStatement = null;
try{
if (sinkPk == null){
sinkPk = "id";
}
if(sinkExtend == null){
sinkExtend = "";
}
StringBuffer createTableSQL = new StringBuffer("create table if not exists ")
.append(ShoppingConfig.HBASE_SCHEMA)
.append(".")
.append(sinkTable)
.append("(");
String[] fields = sinkColumns.split(",");
for(int i = 0; i < fields.length; i++){
String field = fields[i];
//判斷是否為主鍵
if (sinkPk.equals(field)){
createTableSQL.append(field).append(" varchar primary key ");
}else{
createTableSQL.append(field).append(" varchar ");
}
//判斷是否為最后一個字段,如果不是,則添加","
if (i < fields.length - 1){
createTableSQL.append(",");
}
}
createTableSQL.append(")").append(sinkExtend);
//打印建表語句
System.out.println(createTableSQL);
//預編譯
preparedStatement = connection.prepareStatement(createTableSQL.toString());
//執(zhí)行
preparedStatement.execute();
}catch (SQLException e){
throw new RuntimeException("Phoenix表"+ sinkTable +"建表失??!");
}finally {
if (preparedStatement != null){
try{
preparedStatement.close();
}catch (SQLException e){
e.printStackTrace();
}
}
}
}
}
3.6)在MySQL中創(chuàng)建我們的配置表
-- 配置表
CREATE TABLE `table_process` (
`source_table` varchar(200) NOT NULL COMMENT '來源表 ',
`operate_type` varchar(200) NOT NULL COMMENT '操作類型 insert,update,delete',
`sink_type` varchar(200) DEFAULT NULL COMMENT '輸出類型 hbase kafka',
`sink_table` varchar(200) DEFAULT NULL COMMENT '輸出表 (主題 )',
`sink_columns` varchar(2000) DEFAULT NULL COMMENT '輸出字段 ',
`sink_pk` varchar(200) DEFAULT NULL COMMENT '主鍵字段 ',
`sink_extend` varchar(200) DEFAULT NULL COMMENT '建表擴展 ',
PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('order_detail','insert','hbase','order_detail','id,order_id,sku_name,sku_num,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('user_info','read','hbase','user_info','id,name,phone_num,user_level,create_time');
insert into table_process(source_table,operate_type,sink_type,sink_table,sink_columns) values ('base_attr_info','read','kafka','base_attr_info','id,attr_name,category_id');
select * from table_process;
四、聯調測試
4.1)部分kafka源數據:
kafka-source 的數據 :> {"database":"lcy_db","before":{},"after":{"name":"鐘表","id":10},"type":"read","tableName":"base_category1"}
kafka-source 的數據 :> {"database":"lcy_db","before":{},"after":{"name":"鞋靴","id":11},"type":"read","tableName":"base_category1"}
kafka-source 的數據 :> {"database":"lcy_db","before":{},"after":{"name":"母嬰","id":12},"type":"read","tableName":"base_category1"}
kafka-source 的數據 :> {"database":"lcy_db","before":{},"after":{"name":"禮品箱包","id":13},"type":"read","tableName":"base_category1"}
kafka-source 的數據 :> {"database":"lcy_db","before":{},"after":{"name":"食品飲料、保健食品","id":14},"type":"read","tableName":"base_category1"}
kafka-source 的數據 :> {"database":"lcy_db","before":{},"after":{"name":"珠寶","id":15},"type":"read","tableName":"base_category1"}
4.2)配置表部分數據:
配置表的數據 : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"kafka","sink_table":"base_attr_info","source_table":"base_attr_info","sink_columns":"id,attr_name,category_id"},"type":"read","tableName":"table_process"}
配置表的數據 : > {"database":"lcy_db","before":{},"after":{"operate_type":"insert","sink_type":"hbase","sink_table":"order_detail","source_table":"order_detail","sink_columns":"id,order_id,sku_name,sku_num,create_time"},"type":"read","tableName":"table_process"}
配置表的數據 : > {"database":"lcy_db","before":{},"after":{"operate_type":"read","sink_type":"hbase","sink_table":"user_info","source_table":"user_info","sink_columns":"id,name,phone_num,user_level,create_time"},"type":"read","tableName":"table_process"}
4.3)分流結果數據:
# 入hbase的維表數據
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"時心邦","user_level":"1","phone_num":"13794339138","id":3998},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"宋厚慶","user_level":"1","phone_num":"13274778653","id":3999},"type":"read","tableName":"user_info"}
HBase >>>>>>>> {"sinkTable":"user_info","database":"lcy_db","before":{},"after":{"create_time":"2020-12-04 23:28:45","name":"康素云","user_level":"1","phone_num":"13739911376","id":4000},"type":"read","tableName":"user_info"}
# 入kafka的事實表數據
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"香水彩妝","id":112},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":477,"attr_name":"面部護膚","id":113},"type":"read","tableName":"base_attr_info"}
Kafka >>>>>>>>> {"sinkTable":"base_attr_info","database":"lcy_db","before":{},"after":{"category_id":473,"attr_name":"香調","id":114},"type":"read","tableName":"base_attr_info"}
五、總結
??到這里我們已經完成了動態(tài)分流的主題實現,之前要是沒接觸過,說容易也不容易,但是說難也不是很難;之所以總結這個動態(tài)分流主要是在項目中還是挺重要的,畢竟原業(yè)務系統日志數據過來之后會統一放在同一個topic中,即使你在代碼中使用判斷有多少個業(yè)務表然后在發(fā)不作業(yè)也行,不過這樣的弊端是如果源業(yè)務系統有新增表的話必須要添加判斷然后再重新發(fā)布作業(yè),這樣是不利于我們在生產上的操作的,那么我們的動態(tài)分流技術就可以很好的避免了此類的弊端,如果使用了動態(tài)分流,那么如果業(yè)務表中有新增數據,我們只需要在配置表中添加新表的信息即可,即我們只需要維護這個配置表即可,程序不需要動,這樣大大的提升了開發(fā)成本和維護效率。over了。。。