數(shù)據(jù)結(jié)構(gòu)
kafka數(shù)據(jù)結(jié)構(gòu)
基于前兩章 數(shù)據(jù)埋點(diǎn)設(shè)計(jì)和SDK源碼和數(shù)據(jù)采集和驗(yàn)證方案的介紹, 我們是使用filebeat采集容器日志到kafka, 使用kafka-eagle查看kafka數(shù)據(jù)。

經(jīng)過json格式化(bejson.com, json.cn)之后,可得到以下數(shù)據(jù)格式, 由此可知真正的日志數(shù)據(jù)在message.request字段。
{
"@timestamp": "2022-01-20T03:10:55.155Z",
"@metadata": {
"beat": "filebeat",
"type": "_doc",
"version": "7.9.3"
},
"ecs": {
"version": "1.5.0"
},
"host": {
"name": "df6d1b047497"
},
"agent": {
"type": "filebeat",
"version": "7.9.3",
"hostname": "df6d1b047497",
"ephemeral_id": "c711a4c8-904a-4dfe-9696-9b54f9dde4a9",
"id": "d4671f09-1ec3-4bfd-bb6d-1a08761926f9",
"name": "df6d1b047497"
},
"log": {
"offset": 196240916,
"file": {
"path": "/var/lib/docker/containers/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee-json.log"
}
},
"stream": "stderr",
"message": {
"msg": "event",
"request": "{\"agent\":\"python-requests/2.25.1\",\"event\":\"enter_party_group\",\"game_id\":10,\"ip\":\"192.168.90.90\",\"properties\":{\"#data_source\":\"來源3\",\"#os\":\"ios\",\"#vp@compared_with_now\":1,\"#vp@cost_channel\":\"渠道3\",\"#vp@revenue_amount\":\"9293\",\"#zone_offset\":\"694.0263211183825\",\"$city\":\"\",\"$country\":\"美國(guó)\",\"$iso\":\"US\",\"$latitude\":38.8868,\"$location_timezone\":\"America/Chicago\",\"$longitude\":-94.8223,\"$province\":\"\",\"channel\":\"渠道1\",\"group_id\":\"536\",\"life_time\":47},\"time\":1642644975070,\"timestamp\":\"1642648255126\",\"timezone\":\"Asia/Shanghai\",\"type\":\"action\",\"uid\":\"uid_44\",\"uid_type\":\"0\"}",
"type": "hdfs",
"app": "sdk_event",
"level": "info",
"ts": 1.6426482551549864e+09,
"caller": "log/logger.go:71"
},
"input": {
"type": "docker"
}
}
最終的數(shù)據(jù)格式如下
{
"agent": "python-requests/2.25.1",
"event": "enter_party_group",
"game_id": 10,
"ip": "192.168.90.90",
"properties": {
"#data_source": "來源3",
"#os": "ios",
"#vp@compared_with_now": 1,
"#vp@cost_channel": "渠道3",
"#vp@revenue_amount": "9293",
"#zone_offset": "694.0263211183825",
"$city": "",
"$country": "美國(guó)",
"$iso": "US",
"$latitude": 38.8868,
"$location_timezone": "America/Chicago",
"$longitude": -94.8223,
"$province": "",
"channel": "渠道1",
"group_id": "536",
"life_time": 47
},
"time": 1642644975070,
"timestamp": "1642648255126",
"timezone": "Asia/Shanghai",
"type": "action",
"uid": "uid_44",
"uid_type": "0"
}
hive數(shù)據(jù)表結(jié)構(gòu)
hive
show create table action (此為表名, 我這邊是action表)
輸入輸出格式為ORC, Presto針對(duì)這種格式的數(shù)據(jù)做了優(yōu)化查詢, 如果是impala查詢則使用parquet格式。
CREATE TABLE `action`(
`uid` string,
`uid_type` string,
`agent` string,
`ip` string,
`timestamp` timestamp,
`time` timestamp,
`year` string,
`month` string,
`week` string,
`hour` string,
`minute` string,
`properties` map<string,string>)
PARTITIONED BY (
`game_id` int,
`timezone` string,
`event` string,
`day` date)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'colelction.delim'=',',
'field.delim'='\t',
'mapkey.delim'=':',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'hdfs://slaves01:8020/warehouse/tablespace/managed/hive/event.db/action'
TBLPROPERTIES (
'auto-compaction'='true',
'bucketing_version'='2',
'compaction.file-size'='128MB',
'sink.partition-commit.delay'='0s',
'sink.partition-commit.policy.kind'='metastore,success-file',
'sink.partition-commit.trigger'='process-time',
'sink.shuffle-by-partition.enable'='true',
'transient_lastDdlTime'='1642571371')
flink處理邏輯和源碼
從業(yè)務(wù)數(shù)據(jù)到hive, 首先是json解析數(shù)據(jù), 將time時(shí)間進(jìn)行了year, day等按時(shí)區(qū)進(jìn)行解析拆分為時(shí)間分區(qū)。完整代碼
Json解析
public class JsonParser {
public static FileHdfsBean ParseHdfs(String jsonStr){
FileHdfsBean bean = null;
try {
JSONObject jsonObject = JSON.parseObject(jsonStr).getJSONObject("message").getJSONObject("request");
JSONObject properties = jsonObject.getJSONObject("properties");
String deviceId = jsonObject.getString("device_no");
String deviceType = jsonObject.getString("device_type");
String uid = deviceId == null ? jsonObject.getString("uid") : deviceId;
String uid_type = deviceId == null ? jsonObject.getString("uid_type") : deviceType;
String agent = jsonObject.getString("agent");
String ip = jsonObject.getString("ip");
String event = jsonObject.getString("event");
String timestamp = jsonObject.getString("timestamp");
String time = jsonObject.getString("time");
int game_id = jsonObject.getIntValue("game_id");
String timezone = jsonObject.getString("timezone");
Map<String, String> map = JSONObject.parseObject(properties.toJSONString(), new TypeReference<Map<String, String>>() {
});
bean = new FileHdfsBean(event, uid, uid_type, agent, ip, timestamp, time, game_id, timezone);
bean.setMap2Str(map);
} catch (Exception e) {
System.out.println("ParseHdfs Exception:" + e.getMessage());
}
return bean;
}
}
時(shí)間拆分
private void setYearMonthWeekDayHourMin(String timestamp, String time) {
long t = Long.parseLong(timestamp);
long t2 = Long.parseLong(time);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
sdf.setTimeZone(TimeZone.getTimeZone(this.timezone));
String timestamp2 = sdf.format(t);
String dateAndTime = sdf.format(t2);
this.year = dateAndTime.substring(0,4);
this.month = dateAndTime.substring(0,7);
this.day = Date.valueOf(dateAndTime.substring(0,10));
this.week = day +":"+ dateAndTime.substring(11,13) + getWeekOfMonth(dateAndTime.substring(0,10)) ;
this.hour = dateAndTime.substring(0,13);
this.minute = dateAndTime.substring(0,16);
this.timestamp = Timestamp.valueOf(timestamp2);
this.time = Timestamp.valueOf(dateAndTime);
}
sink2hive
private void sink2hive(StreamTableEnvironment tableEnv, DataStreamSource<String> sourceStream) {
System.out.println("save2hive...");
Configuration configuration = tableEnv.getConfig().getConfiguration();
// true使用mr,false則使用flink,在tableEnv上設(shè)置會(huì)作用再所有接收器上
configuration.setString("table.exec.hive.fallback-mapred-reader", "false");
// 創(chuàng)建Hive Catalog
String name = "kafka2hive";
String defaultDatabase = "default";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, HIVE_CONF_DIR);
System.out.println("注冊(cè)hive Catalog...");
// 注冊(cè)hive Catalog
tableEnv.registerCatalog(name, hiveCatalog);
tableEnv.useCatalog(name);
System.out.println("解析字段,封裝樣例類...");
// 解析字段,封裝樣例類
SingleOutputStreamOperator<FileHdfsBean> beanStream = sourceStream
.map(JsonParser::ParseHdfs)
.filter(Objects::nonNull);
// beanStream流轉(zhuǎn)臨時(shí)表
System.out.println("轉(zhuǎn)臨時(shí)表...");
tableEnv.executeSql("drop table if exists tmpTable");
tableEnv.createTemporaryView("tmpTable", beanStream);
System.out.println("創(chuàng)建Hive表...");
// 切換為Hive的語法
tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
// 創(chuàng)建Hive表
String table = HIVE_DB_NAME + "." + HIVE_TABLE_NAME;
// tableEnv.executeSql("drop table if exists " + table);
tableEnv.executeSql("create database IF NOT EXISTS " + HIVE_DB_NAME);
tableEnv.executeSql("CREATE TABLE IF NOT EXISTS " + table + "\n" +
"(uid string, uid_type string, agent string, ip string,\n" +
"`timestamp` timestamp,`time` timestamp,`year` string,`month` string,`week` string,`hour` string, `minute` string,properties map<string,string>)\n" +
"PARTITIONED BY(game_id int,timezone string,event string,day date)\n" +
"ROW FORMAT DELIMITED\n" +
"FIELDS TERMINATED BY '\\t'\n" +
"COLLECTION ITEMS TERMINATED BY ','\n" +
"MAP KEYS TERMINATED BY ':'\n" +
"stored as orc TBLPROPERTIES (\n" +
"'sink.partition-commit.trigger'='process-time',\n" +
"'sink.partition-commit.delay'='0s',\n" +
"'sink.partition-commit.policy.kind'='metastore,success-file',\n" +
"'sink.shuffle-by-partition.enable'='true',\n" +
"'auto-compaction'='true',\n" +
"'compaction.file-size'='128MB'\n" +
")"
);
// 切換回默認(rèn)語法
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
System.out.println("插入數(shù)據(jù)到hive表...");
// 插入數(shù)據(jù)到hive表
tableEnv.executeSql("INSERT INTO " + table + "\n" +
"SELECT uid, uid_type, agent, ip,`timestamp`,`time`,`year`,`month`,`week`,`hour`,`minute`," +
"str_to_map(properties,'&',':') as properties,game_id ,timezone ,event ,`day` from tmpTable");
}
任務(wù)運(yùn)行模式
dolphin on yarn
每次提交都會(huì)創(chuàng)建一個(gè)新的flink集群,任務(wù)之間互相獨(dú)立,互不影響,方便管理。任務(wù)執(zhí)行完成之后創(chuàng)建的集群也會(huì)消失。(推薦此模式)
創(chuàng)建工作流

從Yarn查看AM

查看flink任務(wù)詳情

yarn-session
在yarn中初始化一個(gè)flink集群,開辟指定的資源,以后提交任務(wù)都向這里提交。這個(gè)flink集群會(huì)常駐在yarn集群中,除非手工停止。
# 啟動(dòng)命令
$FLINK_HOME/bin/yarn-session.sh -tm xxx -s xx ... (指定相關(guān)資源)
啟動(dòng)成功后,在yarn會(huì)出現(xiàn)一個(gè)常駐任務(wù)Flink session cluster,點(diǎn)擊找到ApplicationMaster地址

提交任務(wù)

查看任務(wù)和日志

定時(shí)優(yōu)化任務(wù)
hive小分區(qū)合并
基于Hive構(gòu)建數(shù)據(jù)倉(cāng)庫時(shí),通常在ETL過程中為了加快速度而提高任務(wù)的并行度,無論任務(wù)的類型是MapReduce還是Spark還是Flink,都會(huì)在數(shù)據(jù)寫入Hive表時(shí)產(chǎn)生很多小文件。這里的小文件是指文件size小于HDFS配置的block塊大小(目前默認(rèn)配置是128MB), 其中讀寫大量小文件的速度要遠(yuǎn)遠(yuǎn)小于讀寫幾個(gè)大文件的速度,因?yàn)橐l繁與NameNode交互導(dǎo)致NameNode處理隊(duì)列過長(zhǎng)和GC時(shí)間過長(zhǎng)而產(chǎn)生延遲。故隨著小文件的增多會(huì)嚴(yán)重影響到NameNode性能和制約集群的擴(kuò)展。(建議按天定時(shí)清理)
#!/bin/bash
a='uid,uid_type,agent,ip,`timestamp`,`time`,`year`,`month`,`week`,`hour`,`minute`,properties'
for((i=2;i>1;i--));
do
if [ $# -lt 2 ]
then
date=$(date -d"-$i day" +%Y-%m-%d)
else
date=$2
fi
table=$1
echo "$date"
echo "set hive.merge.mapfiles = true;" >> /root/${table}-${date}.sql
echo "set hive.merge.mapredfiles = true;" >> /root/${table}-${date}.sql
echo "set hive.merge.tezfiles = true;" >> /root/${table}-${date}.sql
echo "set hive.merge.size.per.task = 256000000;" >> /root/${table}-${date}.sql
echo "set hive.merge.smallfiles.avgsize = 16000000;" >> /root/${table}-${date}.sql
hadoop fs -ls -R /warehouse/tablespace/managed/hive/event.db/${table}/ | awk '{print $8}'| awk -F "/${table}/" '{print $2}'| grep day=${date}$ | while read line
do
array=(`echo ${line} | tr '/' ' '`)
for var in ${array[@]}
do
arr=(`echo ${var}|tr '=' ' '`)
case ${arr[0]} in
"game_id") game_id=${arr[1]}
;;
"timezone") timezone=$(echo "${arr[1]//%2F//}")
;;
"event") event=${arr[1]}
;;
"day") day=${arr[1]}
;;
*) echo 'have a error!'
;;
esac
done
echo "輸出:game_id=${game_id},timezone=${timezone},event=${event},day=${day}"
## 將小文件數(shù)據(jù)先放入一個(gè)臨時(shí)分區(qū)
echo "insert overwrite table event.${table} partition (game_id='${game_id}',timezone='${timezone}',event='${event}',day='1970-01-01') select $a from event.${table} where game_id=${game_id} and timezone='${timezone}' and event='${event}' and day=cast('${day}' as date);" >> /root/${table}-${date}.sql
## 刪除原來的小文件分區(qū)
echo "alter table event.${table} drop if exists partition(game_id='${game_id}',timezone='${timezone}',event='${event}',day='${day}');" >> /root/${table}-${date}.sql
## 將臨時(shí)分區(qū)rename為大文件分區(qū)
echo "alter table event.${table} partition (game_id='${game_id}',timezone='${timezone}',event='${event}',day='1970-01-01') rename to partition(game_id='${game_id}',timezone='${timezone}',event='${event}',day='${day}');" >> /root/${table}-${date}.sql
done
## 執(zhí)行合并小文件的sql文件
hive -f /root/${table}-${date}.sql > /dev/null 2>&1
echo "${table}的${date}分區(qū)合并完成"
## 刪除合并小文件的sql文件
rm -rf /root/${table}-${date}.sql
done
本地運(yùn)行
bash xx.sh xx(傳入調(diào)整的table名稱)
dolphinscheduler運(yùn)行

dolphin占用磁盤定時(shí)刪除
SQL定時(shí)刪除工作流實(shí)例
隨著定時(shí)調(diào)度任務(wù)的增加, 工作實(shí)例數(shù)據(jù)量越來越大, 如果不定時(shí)進(jìn)行清除, 會(huì)導(dǎo)致頁面崩潰
以下SQL是刪除已完成的工作實(shí)例, 保留最近3天(建議按天定時(shí)清理)
delete from t_ds_process_instance where state != 1 and date(end_time) < DATE_SUB(CURDATE(), INTERVAL 3 DAY)


定時(shí)刪除日志和資源
dolphinscheduler長(zhǎng)期運(yùn)行會(huì)產(chǎn)生大量日志和執(zhí)行文件的jar包資源, 如果不定時(shí)刪除, 將會(huì)占用巨大的磁盤空間, 導(dǎo)致整個(gè)大數(shù)據(jù)集群崩潰。(建議按天定時(shí)清理)
#!/bin/bash
# 校驗(yàn)ip格式
function isValidIp() {
local ip=$1
local ret=1
if [[ $ip =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
ip=(${ip//\./ }) # 按.分割,轉(zhuǎn)成數(shù)組,方便下面的判斷
[[ ${ip[0]} -le 255 && ${ip[1]} -le 255 && ${ip[2]} -le 255 && ${ip[3]} -le 255 ]]
ret=$?
fi
return $ret
}
ips=$1;
dolphin_base_dir=$2;
# 將分隔符替換為空格
ip_array=(`echo ${ips} | tr ',' ' '`);
# 遍歷傳遞進(jìn)來的地址參數(shù),分別連接woker并執(zhí)行手動(dòng)觸發(fā)Full GC命令
echo "ip count: ${#ip_array[@]}";
for ip in ${ip_array[@]}; do
isValidIp $ip
if [ ! $? == 0 ]; then
echo "Warning: param ${ip} is not a valid ip!"
continue
else
# 刪除jar包資源
ssh -o "StrictHostKeyChecking no" ${ip} "find ${dolphin_base_dir}/exec/process -mindepth 3 -maxdepth 3 -name "[0-9]*" -mtime +1 -type d | xargs rm -rf;exit";
# 刪除日志
ssh -o "StrictHostKeyChecking no" ${ip} 'rm -rf /usr/hdp/current/dolphinscheduler/logs/dolphinscheduler-worker.2*.log'
echo "${ip} has been clean up!"
fi;
done;


presto內(nèi)存定時(shí)釋放
Presto長(zhǎng)時(shí)間運(yùn)行如果遇到內(nèi)存無法釋放, 一直增加的情況, 可以每小時(shí)定時(shí)GC清一下內(nèi)存。
#!/bin/bash
# 校驗(yàn)ip格式
function isValidIp() {
local ip=$1
local ret=1
if [[ $ip =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
ip=(${ip//\./ }) # 按.分割,轉(zhuǎn)成數(shù)組,方便下面的判斷
[[ ${ip[0]} -le 255 && ${ip[1]} -le 255 && ${ip[2]} -le 255 && ${ip[3]} -le 255 ]]
ret=$?
fi
return $ret
}
ips=$1;
# 將分隔符替換為空格
ip_array=(`echo ${ips} | tr ',' ' '`);
# 遍歷傳遞進(jìn)來的地址參數(shù),分別連接woker并執(zhí)行手動(dòng)觸發(fā)Full GC命令
echo "ip count: ${#ip_array[@]}";
for ip in ${ip_array[@]}; do
isValidIp $ip
if [ ! $? == 0 ]; then
echo "Warning: param ${ip} is not a valid ip!"
continue
else
ssh -o "StrictHostKeyChecking no" ${ip} 'jmap -histo:live `jps | grep PrestoServer|cut -d " " -f 1` > /dev/null;exit;'
echo "presto woker of ${ip} has been Full GC!"
fi;
done;

系列文章
第一篇: Ambari自動(dòng)化部署
第二篇: 數(shù)據(jù)埋點(diǎn)設(shè)計(jì)和SDK源碼
第三篇: 數(shù)據(jù)采集和驗(yàn)證方案
第四篇: ETL實(shí)時(shí)方案: Kafka->Flink->Hive
第五篇: ETL用戶數(shù)據(jù)處理: kafka->spark->kudu
第六篇: Presto分析模型SQL和UDF函數(shù)