Flink日志采集、集中存儲、可視化查詢實(shí)踐

1. 背景

筆者的開發(fā)大數(shù)據(jù)平臺XSailboat中包含基于Flink的可視化計(jì)算管道開發(fā)和運(yùn)維功能。狀態(tài)存儲器中數(shù)據(jù)的查看和節(jié)點(diǎn)的日志查看功能是其重要的輔助支撐功能。它能使得在大數(shù)據(jù)平臺上就能完全實(shí)現(xiàn)計(jì)算管道的開發(fā)、調(diào)試、部署,逐漸擺脫Flink的原生界面。

DAG開發(fā)計(jì)算管道.png

Flink分JobManager和TaskManager,JobManager中的日志是總體性的,構(gòu)建計(jì)算管道的過程,就是在JobManager中完成的,而Job的執(zhí)行則是在TaskManager中。
就可以從中找失敗原因。而任務(wù)執(zhí)行過程中出現(xiàn)的異?;蛘咝畔ⅲ窃赥askManager中的它的日志是不需要分計(jì)算管道的,總體看就可以。只需要把日志數(shù)據(jù)集成進(jìn)來就行,不需要采集。關(guān)鍵是TaskManager中不同Task產(chǎn)生的平臺框架(非原生Flink底座)和計(jì)算管道開發(fā)者用Aviator語言編寫的調(diào)試日志。

2. 功能目標(biāo)

a. 計(jì)算管道部署過程中,如果出錯,希望能看到平臺框架構(gòu)建計(jì)算管道的過程日志信息及異常信息。
b. 點(diǎn)擊計(jì)算管道的節(jié)點(diǎn),可以查看計(jì)算管道的某個實(shí)例上此節(jié)點(diǎn)的日志;(一個計(jì)算管道可以部署在多個flink集群中,生成多個實(shí)例)
c. 可以主動開啟和關(guān)閉調(diào)試日志。這里的調(diào)試日志是計(jì)算管道開發(fā)時,開發(fā)者加的用Aviator語言寫的打印調(diào)試日志的代碼。

3. Flink的log相關(guān)接口

  • /jobmanager/logs Returns the list of log files on the JobManager.
    返回:
{
    "logs": [
        {
            "name": "prelaunch.out",
            "size": 100,
            "mtime": 1692856478000
        },
        {
            "name": "prelaunch.err",
            "size": 0,
            "mtime": 1692856478000
        },
        {
            "name": "launch_container.sh",
            "size": 23160,
            "mtime": 1692856478000
        },
        {
            "name": "directory.info",
            "size": 7496,
            "mtime": 1692856478000
        },
        {
            "name": "jobmanager.out",
            "size": 19850,
            "mtime": 1692856480000
        },
        {
            "name": "jobmanager.err",
            "size": 512,
            "mtime": 1692856478000
        },
        {
            "name": "jobmanager.log.1",
            "size": 29374,
            "mtime": 1692856479000
        },
        {
            "name": "jobmanager.log",
            "size": 87087,
            "mtime": 1692857018000
        }
    ]
}
  • /jobs/:jobid/jobmanager/log-url Returns the log url of jobmanager of a specific job.
  • 返回
{
    "url": "http://192.168.0.152:34243/#/job-manager/logs"
}

這個url的界面圖


JobManager日志圖.png
  • /jobs/:jobid/taskmanagers/:taskmanagerid/log-url Returns the log url of jobmanager of a specific job.
  • /taskmanagers/:taskmanagerid/logs Returns the list of log files on a TaskManager.

4. 總體技術(shù)方案

在這里我們只需要采集TaskMananger中,節(jié)點(diǎn)運(yùn)行相關(guān)的日志數(shù)據(jù),即Function中的日志數(shù)據(jù)。


日志流轉(zhuǎn)過程.png

4. 技術(shù)點(diǎn)

4.1 基礎(chǔ)知識

基礎(chǔ)準(zhǔn)備:
a. 大致了解一下Flink的日志配置。參考Flink官方文檔《logging》
b. 熟悉一下log4j2是怎么配置的。 參考Log4j官方文檔《Kafka Appender》《PatternLayout》

這里記錄一下log4j的幾個配置知識點(diǎn),詳細(xì)參考官方文檔《lookup》
a. 參數(shù)輸入相關(guān):

  • ${env:變量名},注入System.getEnv中的參數(shù)
  • ${sys:變量名},注入System.getProperties()中的參數(shù)
  • %X{變量名} ,注入MDC中的參數(shù)
  • ${ctx:變量名},注入ThreadContext中的參數(shù)

官方文檔中提到

  • log4j-session.properties: used by the command line interface when starting a Kubernetes/Yarn session cluster (i.e., kubernetes-session.sh/yarn-session.sh)

雖然我們的Flink是Yarn Session(detached mode)模式,但是我們并不是使用原生的yarn-session.sh腳本去啟動,而是筆者分析了Flink Yarn Session的啟動模式之后,重寫了啟動代碼,以讓Flink更好地融入到大數(shù)據(jù)平臺,讓大數(shù)據(jù)平臺能掌控Flink集群的生命周期、啟動/停止,監(jiān)控,在指定資源隊(duì)列運(yùn)行多個特定標(biāo)簽的Flink集群等。在筆者啟動Flink級群的時候,使用的是log4j.properties,所以接下來將在這里面修改。

4.2 Flink的log4j.properties文件配置

# to Kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.topic = _log_flink
appender.kafka.key = ${env:CONTAINER_ID}
appender.kafka.property.type = Property
appender.kafka.property.name= bootstrap.servers
appender.kafka.property.value= XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
appender.kafka.filter.ctxmap.type=ContextMapFilter
appender.kafka.filter.ctxmap.onMismatch=DENY
appender.kafka.filter.ctxmap.kvp_1.type=KeyValuePair
appender.kafka.filter.ctxmap.kvp_1.key=collect_log
appender.kafka.filter.ctxmap.kvp_1.value=true
appender.kafka.layout.type = PatternLayout
# MDC參數(shù)where格式:runUid.nodeId  ,subTaskIndex格式:subIndexId+1
appender.kafka.layout.pattern = %X{where}.${event:Timestamp}.%X{subTaskIndex} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level  %-60c L:%L -%msg%n
rootLogger.appenderRef.kafka.ref = Kafka

這里我們只想采集JobManager中構(gòu)建Job過程的日志和TaskManager中執(zhí)行Task時Function內(nèi)的日志。所以這里用了一個ContextMapFilter過濾器,其中有一個MDC參數(shù)collect_log,如果要開始收集日志,帶代碼中將它設(shè)置為true

MDC.put("collect_log", true) ;

JobManager和TaskManager都是用這個log4j.propeties文件,JobMananger中構(gòu)建Job的過程不屬于任何一個節(jié)點(diǎn),它的where參數(shù)中nodeId為0,subIndexId+1部分也為0,以和節(jié)點(diǎn)區(qū)別。

因?yàn)樵贘obManager構(gòu)建Job的過程中,是沒有JobId的,所以在外部傳入一個runUid用來標(biāo)識此次計(jì)算管道提交運(yùn)行。

這樣就把日志采集存儲到了Kafka。


Kafka中的日志數(shù)據(jù).png

4.3 使用Flume將日志數(shù)據(jù)從Kafka同步到HBase

日志存儲到HBase之后,需要有一個rowKey,這里對Kafka中的消息做一下拆分,使用第一個空格前的內(nèi)容作為rowKey,第一個空格之后的
內(nèi)容作為日志內(nèi)容。即

  • %X{where}.${event:Timestamp} 作為rowKey
  • %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %-60c L:%L -%msg%n 作為消息內(nèi)容

當(dāng)一個任務(wù)節(jié)點(diǎn)或者構(gòu)建Job過程中,1毫秒內(nèi)輸出多條日志,則多條日志的rokwKey會相同,為此在Flume將日志從Kafka遷移到HBase的過程中,rowKey需要在后面再加上一個Kafka偏移量。
flume的配置如下:

#### 配置名為_log_flink的代理,用于采集kafka的_log_flink主題數(shù)據(jù),寫入hbase

# 指定代理的組件名稱
_log_flink.sources = r1
_log_flink.sinks = k1
_log_flink.channels = c1

# ==============================配置sources組件========
# sources組件類型
_log_flink.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
_log_flink.sources.r1.batchSize = 10
_log_flink.sources.r1.kafka.bootstrap.servers = XCloud150:9092,XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
_log_flink.sources.r1.topic = _log_flink
# 組件要綁定的通道
_log_flink.sources.r1.channels = c1
_log_flink.sources.r1.consumer.timeout.ms = 1000
_log_flink.sources.r1.kafka.consumer.group.id = group33
_log_flink.sources.r1.kafka.consumer.auto.offset.reset = earliest

_log_flink.sources.r1.interceptors = i1
_log_flink.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.extend.AviatorInterceptorBuilder
_log_flink.sources.r1.interceptors.i1.handler = let i = string.indexOf(body , " ") ; \
if(i>0){ \
    let key = string.substring(body , 0 , i) + "." + seq.get(headers , "offset") ; \
    let msg = string.substring(body , i+1) ; \
    seq.put(headers , "hbase_rowkey" , key) ; \
    return msg ; \
} \
return nil ;

# =============================配置sink組件======
# _log_flink.sinks.k1.type = logger
# _log_flink.sinks.k1.channel = c1

# sink組件類型
_log_flink.sinks.k1.type = hbase2
# HBase中要寫入的表的名稱
_log_flink.sinks.k1.table = xz:_log_flink
# process不要返回backoff,讓循環(huán)加快
_log_flink.sinks.k1.backoffEnabled = false
# batchsize 減小到50,缺省100
_log_flink.sinks.k1.batchSize = 50
# HBase中要寫入的列簇
_log_flink.sinks.k1.columnFamily = logInfo
# 自定義的序列化工具
_log_flink.sinks.k1.serializer = org.apache.flume.sink.hbase2.HeaderHBase2EventSerializer
# 指定列簇下的列
_log_flink.sinks.k1.serializer.payloadColumn = body
# 組件要綁定的通道
_log_flink.sinks.k1.channel = c1

# =============================配置channel組件====
# channel 組件的類型
_log_flink.channels.c1.type = memory
# 將沒有數(shù)據(jù)時的take等待時間減小到0.2秒,缺省是3秒
_log_flink.channels.c1.keep-alive = 200
_log_flink.channels.c1.keep-alive-timeunit = ms
# 容量,通道中存儲的最大事件數(shù)
_log_flink.channels.c1.capacity = 10000
# 每次交易,channel從source獲取,或匯給sink的最大事件數(shù)
_log_flink.channels.c1.transactionCapacity = 10000 
_log_flink.channels.c1.byteCapacityBufferPercentage = 20
_log_flink.channels.c1.byteCapacity = 800000

這里使用了自己開發(fā)的一個flume攔截器:org.apache.flume.interceptor.extend.AviatorInterceptorBuilder。關(guān)于它參看《flume攔截器--使用Aviator擴(kuò)展攔截器功能》

如此就把數(shù)據(jù)存儲到了HBase中


HBase中的flink日志.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容