ETL實(shí)時(shí)方案: Kafka->Flink->Hive

數(shù)據(jù)結(jié)構(gòu)

kafka數(shù)據(jù)結(jié)構(gòu)

基于前兩章 數(shù)據(jù)埋點(diǎn)設(shè)計(jì)和SDK源碼數(shù)據(jù)采集和驗(yàn)證方案的介紹, 我們是使用filebeat采集容器日志到kafka, 使用kafka-eagle查看kafka數(shù)據(jù)。

image.png

經(jīng)過json格式化(bejson.com, json.cn)之后,可得到以下數(shù)據(jù)格式, 由此可知真正的日志數(shù)據(jù)在message.request字段。

{
    "@timestamp": "2022-01-20T03:10:55.155Z",
    "@metadata": {
        "beat": "filebeat",
        "type": "_doc",
        "version": "7.9.3"
    },
    "ecs": {
        "version": "1.5.0"
    },
    "host": {
        "name": "df6d1b047497"
    },
    "agent": {
        "type": "filebeat",
        "version": "7.9.3",
        "hostname": "df6d1b047497",
        "ephemeral_id": "c711a4c8-904a-4dfe-9696-9b54f9dde4a9",
        "id": "d4671f09-1ec3-4bfd-bb6d-1a08761926f9",
        "name": "df6d1b047497"
    },
    "log": {
        "offset": 196240916,
        "file": {
            "path": "/var/lib/docker/containers/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee/36847a16c7c8e029744475172847cd14dda0dc28d1c33199df9f8c443e2798ee-json.log"
        }
    },
    "stream": "stderr",
    "message": {
        "msg": "event",
        "request": "{\"agent\":\"python-requests/2.25.1\",\"event\":\"enter_party_group\",\"game_id\":10,\"ip\":\"192.168.90.90\",\"properties\":{\"#data_source\":\"來源3\",\"#os\":\"ios\",\"#vp@compared_with_now\":1,\"#vp@cost_channel\":\"渠道3\",\"#vp@revenue_amount\":\"9293\",\"#zone_offset\":\"694.0263211183825\",\"$city\":\"\",\"$country\":\"美國(guó)\",\"$iso\":\"US\",\"$latitude\":38.8868,\"$location_timezone\":\"America/Chicago\",\"$longitude\":-94.8223,\"$province\":\"\",\"channel\":\"渠道1\",\"group_id\":\"536\",\"life_time\":47},\"time\":1642644975070,\"timestamp\":\"1642648255126\",\"timezone\":\"Asia/Shanghai\",\"type\":\"action\",\"uid\":\"uid_44\",\"uid_type\":\"0\"}",
        "type": "hdfs",
        "app": "sdk_event",
        "level": "info",
        "ts": 1.6426482551549864e+09,
        "caller": "log/logger.go:71"
    },
    "input": {
        "type": "docker"
    }
}

最終的數(shù)據(jù)格式如下

{
    "agent": "python-requests/2.25.1",
    "event": "enter_party_group",
    "game_id": 10,
    "ip": "192.168.90.90",
    "properties": {
        "#data_source": "來源3",
        "#os": "ios",
        "#vp@compared_with_now": 1,
        "#vp@cost_channel": "渠道3",
        "#vp@revenue_amount": "9293",
        "#zone_offset": "694.0263211183825",
        "$city": "",
        "$country": "美國(guó)",
        "$iso": "US",
        "$latitude": 38.8868,
        "$location_timezone": "America/Chicago",
        "$longitude": -94.8223,
        "$province": "",
        "channel": "渠道1",
        "group_id": "536",
        "life_time": 47
    },
    "time": 1642644975070,
    "timestamp": "1642648255126",
    "timezone": "Asia/Shanghai",
    "type": "action",
    "uid": "uid_44",
    "uid_type": "0"
}

hive數(shù)據(jù)表結(jié)構(gòu)

hive 
show create table  action  (此為表名, 我這邊是action表) 

輸入輸出格式為ORC, Presto針對(duì)這種格式的數(shù)據(jù)做了優(yōu)化查詢, 如果是impala查詢則使用parquet格式。

CREATE TABLE `action`(                             
   `uid` string,                                    
   `uid_type` string,                               
   `agent` string,                                  
   `ip` string,                                     
   `timestamp` timestamp,                           
   `time` timestamp,                                
   `year` string,                                   
   `month` string,                                  
   `week` string,                                   
   `hour` string,                                   
   `minute` string,                                 
   `properties` map<string,string>)                 
 PARTITIONED BY (                                   
   `game_id` int,                                   
   `timezone` string,                               
   `event` string,                                  
   `day` date)                                      
 ROW FORMAT SERDE                                   
   'org.apache.hadoop.hive.ql.io.orc.OrcSerde'      
 WITH SERDEPROPERTIES (                             
   'colelction.delim'=',',                          
   'field.delim'='\t',                              
   'mapkey.delim'=':',                              
   'serialization.format'='\t')                     
 STORED AS INPUTFORMAT                              
   'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'  
 OUTPUTFORMAT                                      
   'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' 
 LOCATION                                           
   'hdfs://slaves01:8020/warehouse/tablespace/managed/hive/event.db/action' 
 TBLPROPERTIES (                                   
   'auto-compaction'='true',                       
   'bucketing_version'='2',                        
   'compaction.file-size'='128MB',                 
   'sink.partition-commit.delay'='0s',             
   'sink.partition-commit.policy.kind'='metastore,success-file',  
   'sink.partition-commit.trigger'='process-time',  
   'sink.shuffle-by-partition.enable'='true',       
   'transient_lastDdlTime'='1642571371')

flink處理邏輯和源碼

從業(yè)務(wù)數(shù)據(jù)到hive, 首先是json解析數(shù)據(jù), 將time時(shí)間進(jìn)行了year, day等按時(shí)區(qū)進(jìn)行解析拆分為時(shí)間分區(qū)。完整代碼
Json解析

public class JsonParser {
    public static FileHdfsBean ParseHdfs(String jsonStr){
        FileHdfsBean bean = null;
        try {
            JSONObject jsonObject = JSON.parseObject(jsonStr).getJSONObject("message").getJSONObject("request");
            JSONObject properties = jsonObject.getJSONObject("properties");
            String deviceId = jsonObject.getString("device_no");
            String deviceType = jsonObject.getString("device_type");
            String uid = deviceId == null ? jsonObject.getString("uid") : deviceId;
            String uid_type = deviceId == null ? jsonObject.getString("uid_type") : deviceType;
            String agent = jsonObject.getString("agent");
            String ip = jsonObject.getString("ip");
            String event = jsonObject.getString("event");
            String timestamp = jsonObject.getString("timestamp");
            String time = jsonObject.getString("time");
            int game_id = jsonObject.getIntValue("game_id");
            String timezone = jsonObject.getString("timezone");
            Map<String, String> map = JSONObject.parseObject(properties.toJSONString(), new TypeReference<Map<String, String>>() {
            });
            bean = new FileHdfsBean(event, uid, uid_type, agent, ip, timestamp, time, game_id, timezone);
            bean.setMap2Str(map);
        } catch (Exception e) {
            System.out.println("ParseHdfs Exception:" + e.getMessage());
        }
        return bean;
    }
}

時(shí)間拆分

    private void setYearMonthWeekDayHourMin(String timestamp, String time) {
        long t = Long.parseLong(timestamp);
        long t2 = Long.parseLong(time);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        sdf.setTimeZone(TimeZone.getTimeZone(this.timezone));
        String timestamp2 = sdf.format(t);
        String dateAndTime = sdf.format(t2);
        this.year = dateAndTime.substring(0,4);
        this.month = dateAndTime.substring(0,7);
        this.day = Date.valueOf(dateAndTime.substring(0,10));
        this.week = day +":"+ dateAndTime.substring(11,13) + getWeekOfMonth(dateAndTime.substring(0,10)) ;
        this.hour = dateAndTime.substring(0,13);
        this.minute = dateAndTime.substring(0,16);
        this.timestamp = Timestamp.valueOf(timestamp2);
        this.time = Timestamp.valueOf(dateAndTime);
    }

sink2hive

    private void sink2hive(StreamTableEnvironment tableEnv, DataStreamSource<String> sourceStream) {
        System.out.println("save2hive...");
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        // true使用mr,false則使用flink,在tableEnv上設(shè)置會(huì)作用再所有接收器上
        configuration.setString("table.exec.hive.fallback-mapred-reader", "false");
        // 創(chuàng)建Hive Catalog
        String name = "kafka2hive";
        String defaultDatabase = "default";
        HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, HIVE_CONF_DIR);
        System.out.println("注冊(cè)hive Catalog...");
        // 注冊(cè)hive Catalog
        tableEnv.registerCatalog(name, hiveCatalog);
        tableEnv.useCatalog(name);
        System.out.println("解析字段,封裝樣例類...");
        // 解析字段,封裝樣例類
        SingleOutputStreamOperator<FileHdfsBean> beanStream = sourceStream
                .map(JsonParser::ParseHdfs)
                .filter(Objects::nonNull);
        // beanStream流轉(zhuǎn)臨時(shí)表
        System.out.println("轉(zhuǎn)臨時(shí)表...");
        tableEnv.executeSql("drop table if exists tmpTable");
        tableEnv.createTemporaryView("tmpTable", beanStream);
        System.out.println("創(chuàng)建Hive表...");
        // 切換為Hive的語法
        tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
        // 創(chuàng)建Hive表
        String table = HIVE_DB_NAME + "." + HIVE_TABLE_NAME;
        // tableEnv.executeSql("drop table if exists " + table);
        tableEnv.executeSql("create database IF NOT EXISTS " + HIVE_DB_NAME);
        tableEnv.executeSql("CREATE TABLE IF NOT EXISTS " + table + "\n" +
                "(uid string, uid_type string, agent string, ip string,\n" +
                "`timestamp` timestamp,`time` timestamp,`year` string,`month` string,`week` string,`hour` string, `minute` string,properties map<string,string>)\n" +
                "PARTITIONED BY(game_id int,timezone string,event string,day date)\n" +
                "ROW FORMAT DELIMITED\n" +
                "FIELDS TERMINATED BY '\\t'\n" +
                "COLLECTION ITEMS TERMINATED BY ','\n" +
                "MAP KEYS TERMINATED BY ':'\n" +
                "stored as orc TBLPROPERTIES (\n" +
                "'sink.partition-commit.trigger'='process-time',\n" +
                "'sink.partition-commit.delay'='0s',\n" +
                "'sink.partition-commit.policy.kind'='metastore,success-file',\n" +
                "'sink.shuffle-by-partition.enable'='true',\n" +
                "'auto-compaction'='true',\n" +
                "'compaction.file-size'='128MB'\n" +
                ")"
        );
        // 切換回默認(rèn)語法
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
        System.out.println("插入數(shù)據(jù)到hive表...");
        // 插入數(shù)據(jù)到hive表
        tableEnv.executeSql("INSERT INTO  " + table + "\n" +
                "SELECT uid, uid_type, agent, ip,`timestamp`,`time`,`year`,`month`,`week`,`hour`,`minute`," +
                "str_to_map(properties,'&',':') as properties,game_id ,timezone ,event ,`day` from tmpTable");
    }

任務(wù)運(yùn)行模式

dolphin on yarn

每次提交都會(huì)創(chuàng)建一個(gè)新的flink集群,任務(wù)之間互相獨(dú)立,互不影響,方便管理。任務(wù)執(zhí)行完成之后創(chuàng)建的集群也會(huì)消失。(推薦此模式)

創(chuàng)建工作流

image.png

從Yarn查看AM

image.png

查看flink任務(wù)詳情

image.png

yarn-session

在yarn中初始化一個(gè)flink集群,開辟指定的資源,以后提交任務(wù)都向這里提交。這個(gè)flink集群會(huì)常駐在yarn集群中,除非手工停止。

# 啟動(dòng)命令
$FLINK_HOME/bin/yarn-session.sh -tm xxx  -s xx ...  (指定相關(guān)資源) 

啟動(dòng)成功后,在yarn會(huì)出現(xiàn)一個(gè)常駐任務(wù)Flink session cluster,點(diǎn)擊找到ApplicationMaster地址


image.png

提交任務(wù)

image.png

查看任務(wù)和日志

image.png

定時(shí)優(yōu)化任務(wù)

hive小分區(qū)合并

基于Hive構(gòu)建數(shù)據(jù)倉(cāng)庫時(shí),通常在ETL過程中為了加快速度而提高任務(wù)的并行度,無論任務(wù)的類型是MapReduce還是Spark還是Flink,都會(huì)在數(shù)據(jù)寫入Hive表時(shí)產(chǎn)生很多小文件。這里的小文件是指文件size小于HDFS配置的block塊大小(目前默認(rèn)配置是128MB), 其中讀寫大量小文件的速度要遠(yuǎn)遠(yuǎn)小于讀寫幾個(gè)大文件的速度,因?yàn)橐l繁與NameNode交互導(dǎo)致NameNode處理隊(duì)列過長(zhǎng)和GC時(shí)間過長(zhǎng)而產(chǎn)生延遲。故隨著小文件的增多會(huì)嚴(yán)重影響到NameNode性能和制約集群的擴(kuò)展。(建議按天定時(shí)清理)

#!/bin/bash
a='uid,uid_type,agent,ip,`timestamp`,`time`,`year`,`month`,`week`,`hour`,`minute`,properties'
for((i=2;i>1;i--));
do
 if [ $# -lt 2 ]
 then
    date=$(date -d"-$i day" +%Y-%m-%d)
 else
    date=$2
 fi
 table=$1
 echo "$date"
 echo "set hive.merge.mapfiles = true;" >> /root/${table}-${date}.sql
 echo "set hive.merge.mapredfiles = true;" >> /root/${table}-${date}.sql
 echo "set hive.merge.tezfiles = true;" >> /root/${table}-${date}.sql
 echo "set hive.merge.size.per.task = 256000000;" >> /root/${table}-${date}.sql
 echo "set hive.merge.smallfiles.avgsize = 16000000;" >> /root/${table}-${date}.sql
 hadoop fs -ls -R  /warehouse/tablespace/managed/hive/event.db/${table}/ | awk '{print $8}'| awk -F "/${table}/" '{print $2}'| grep day=${date}$ | while read line
 do
   array=(`echo ${line} | tr '/' ' '`)
   for var in ${array[@]}
   do
      arr=(`echo ${var}|tr '=' ' '`)
      case ${arr[0]} in
        "game_id") game_id=${arr[1]}
        ;;
        "timezone") timezone=$(echo "${arr[1]//%2F//}")
        ;;
        "event") event=${arr[1]}
        ;;
        "day") day=${arr[1]}
        ;;
        *) echo 'have a error!'
        ;;
      esac
   done
   echo "輸出:game_id=${game_id},timezone=${timezone},event=${event},day=${day}"
   ## 將小文件數(shù)據(jù)先放入一個(gè)臨時(shí)分區(qū)
   echo "insert overwrite table event.${table} partition (game_id='${game_id}',timezone='${timezone}',event='${event}',day='1970-01-01') select $a from event.${table} where game_id=${game_id} and timezone='${timezone}' and event='${event}' and day=cast('${day}' as date);" >> /root/${table}-${date}.sql
   ## 刪除原來的小文件分區(qū)
   echo "alter table event.${table} drop if exists partition(game_id='${game_id}',timezone='${timezone}',event='${event}',day='${day}');" >> /root/${table}-${date}.sql
   ## 將臨時(shí)分區(qū)rename為大文件分區(qū)
   echo "alter table event.${table} partition (game_id='${game_id}',timezone='${timezone}',event='${event}',day='1970-01-01') rename to partition(game_id='${game_id}',timezone='${timezone}',event='${event}',day='${day}');" >> /root/${table}-${date}.sql
 done
 ## 執(zhí)行合并小文件的sql文件
 hive -f /root/${table}-${date}.sql > /dev/null 2>&1
 echo "${table}的${date}分區(qū)合并完成"
 ## 刪除合并小文件的sql文件
 rm -rf /root/${table}-${date}.sql
done

本地運(yùn)行
bash xx.sh xx(傳入調(diào)整的table名稱)

dolphinscheduler運(yùn)行

image.png

dolphin占用磁盤定時(shí)刪除

SQL定時(shí)刪除工作流實(shí)例
隨著定時(shí)調(diào)度任務(wù)的增加, 工作實(shí)例數(shù)據(jù)量越來越大, 如果不定時(shí)進(jìn)行清除, 會(huì)導(dǎo)致頁面崩潰
以下SQL是刪除已完成的工作實(shí)例, 保留最近3天(建議按天定時(shí)清理)

delete from t_ds_process_instance where state != 1 and date(end_time) < DATE_SUB(CURDATE(), INTERVAL 3 DAY)
image.png
image.png

定時(shí)刪除日志和資源
dolphinscheduler長(zhǎng)期運(yùn)行會(huì)產(chǎn)生大量日志和執(zhí)行文件的jar包資源, 如果不定時(shí)刪除, 將會(huì)占用巨大的磁盤空間, 導(dǎo)致整個(gè)大數(shù)據(jù)集群崩潰。(建議按天定時(shí)清理)

#!/bin/bash

# 校驗(yàn)ip格式
function isValidIp() {
  local ip=$1
  local ret=1

  if [[ $ip =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
    ip=(${ip//\./ }) # 按.分割,轉(zhuǎn)成數(shù)組,方便下面的判斷
    [[ ${ip[0]} -le 255 && ${ip[1]} -le 255 && ${ip[2]} -le 255 && ${ip[3]} -le 255 ]]
    ret=$?
  fi

  return $ret
}

ips=$1;
dolphin_base_dir=$2;

# 將分隔符替換為空格
ip_array=(`echo ${ips} | tr ',' ' '`);


# 遍歷傳遞進(jìn)來的地址參數(shù),分別連接woker并執(zhí)行手動(dòng)觸發(fā)Full GC命令
echo "ip count: ${#ip_array[@]}";
for ip in ${ip_array[@]}; do
    isValidIp $ip
    if [ ! $? == 0 ]; then
        echo "Warning: param ${ip} is not a valid ip!"
        continue
    else 
        # 刪除jar包資源
        ssh -o "StrictHostKeyChecking no" ${ip} "find ${dolphin_base_dir}/exec/process -mindepth 3 -maxdepth 3 -name "[0-9]*" -mtime +1 -type d | xargs rm -rf;exit";
        # 刪除日志
        ssh -o "StrictHostKeyChecking no" ${ip} 'rm -rf /usr/hdp/current/dolphinscheduler/logs/dolphinscheduler-worker.2*.log'
        echo "${ip} has been clean up!"
    fi;
done;
image.png

image.png

presto內(nèi)存定時(shí)釋放

Presto長(zhǎng)時(shí)間運(yùn)行如果遇到內(nèi)存無法釋放, 一直增加的情況, 可以每小時(shí)定時(shí)GC清一下內(nèi)存。

#!/bin/bash

# 校驗(yàn)ip格式
function isValidIp() {
  local ip=$1
  local ret=1

  if [[ $ip =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]]; then
    ip=(${ip//\./ }) # 按.分割,轉(zhuǎn)成數(shù)組,方便下面的判斷
    [[ ${ip[0]} -le 255 && ${ip[1]} -le 255 && ${ip[2]} -le 255 && ${ip[3]} -le 255 ]]
    ret=$?
  fi

  return $ret
}

ips=$1;

# 將分隔符替換為空格
ip_array=(`echo ${ips} | tr ',' ' '`);


# 遍歷傳遞進(jìn)來的地址參數(shù),分別連接woker并執(zhí)行手動(dòng)觸發(fā)Full GC命令
echo "ip count: ${#ip_array[@]}";
for ip in ${ip_array[@]}; do
    isValidIp $ip
    if [ ! $? == 0 ]; then
        echo "Warning: param ${ip} is not a valid ip!"
        continue
    else 
        ssh -o "StrictHostKeyChecking no" ${ip} 'jmap -histo:live `jps | grep PrestoServer|cut -d " " -f 1` > /dev/null;exit;'
        echo "presto woker of ${ip} has been Full GC!"
    fi;
done;
image.png

系列文章

第一篇: Ambari自動(dòng)化部署
第二篇: 數(shù)據(jù)埋點(diǎn)設(shè)計(jì)和SDK源碼
第三篇: 數(shù)據(jù)采集和驗(yàn)證方案
第四篇: ETL實(shí)時(shí)方案: Kafka->Flink->Hive
第五篇: ETL用戶數(shù)據(jù)處理: kafka->spark->kudu
第六篇: Presto分析模型SQL和UDF函數(shù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容