1. 背景
筆者的開發(fā)大數(shù)據(jù)平臺XSailboat中包含基于Flink的可視化計(jì)算管道開發(fā)和運(yùn)維功能。狀態(tài)存儲器中數(shù)據(jù)的查看和節(jié)點(diǎn)的日志查看功能是其重要的輔助支撐功能。它能使得在大數(shù)據(jù)平臺上就能完全實(shí)現(xiàn)計(jì)算管道的開發(fā)、調(diào)試、部署,逐漸擺脫Flink的原生界面。

Flink分JobManager和TaskManager,JobManager中的日志是總體性的,構(gòu)建計(jì)算管道的過程,就是在JobManager中完成的,而Job的執(zhí)行則是在TaskManager中。
就可以從中找失敗原因。而任務(wù)執(zhí)行過程中出現(xiàn)的異?;蛘咝畔ⅲ窃赥askManager中的它的日志是不需要分計(jì)算管道的,總體看就可以。只需要把日志數(shù)據(jù)集成進(jìn)來就行,不需要采集。關(guān)鍵是TaskManager中不同Task產(chǎn)生的平臺框架(非原生Flink底座)和計(jì)算管道開發(fā)者用Aviator語言編寫的調(diào)試日志。
2. 功能目標(biāo)
a. 計(jì)算管道部署過程中,如果出錯,希望能看到平臺框架構(gòu)建計(jì)算管道的過程日志信息及異常信息。
b. 點(diǎn)擊計(jì)算管道的節(jié)點(diǎn),可以查看計(jì)算管道的某個實(shí)例上此節(jié)點(diǎn)的日志;(一個計(jì)算管道可以部署在多個flink集群中,生成多個實(shí)例)
c. 可以主動開啟和關(guān)閉調(diào)試日志。這里的調(diào)試日志是計(jì)算管道開發(fā)時,開發(fā)者加的用Aviator語言寫的打印調(diào)試日志的代碼。
3. Flink的log相關(guān)接口
-
/jobmanager/logs Returns the list of log files on the JobManager.
返回:
{
"logs": [
{
"name": "prelaunch.out",
"size": 100,
"mtime": 1692856478000
},
{
"name": "prelaunch.err",
"size": 0,
"mtime": 1692856478000
},
{
"name": "launch_container.sh",
"size": 23160,
"mtime": 1692856478000
},
{
"name": "directory.info",
"size": 7496,
"mtime": 1692856478000
},
{
"name": "jobmanager.out",
"size": 19850,
"mtime": 1692856480000
},
{
"name": "jobmanager.err",
"size": 512,
"mtime": 1692856478000
},
{
"name": "jobmanager.log.1",
"size": 29374,
"mtime": 1692856479000
},
{
"name": "jobmanager.log",
"size": 87087,
"mtime": 1692857018000
}
]
}
- /jobs/:jobid/jobmanager/log-url Returns the log url of jobmanager of a specific job.
- 返回
{
"url": "http://192.168.0.152:34243/#/job-manager/logs"
}
這個url的界面圖

- /jobs/:jobid/taskmanagers/:taskmanagerid/log-url Returns the log url of jobmanager of a specific job.
- /taskmanagers/:taskmanagerid/logs Returns the list of log files on a TaskManager.
4. 總體技術(shù)方案
在這里我們只需要采集TaskMananger中,節(jié)點(diǎn)運(yùn)行相關(guān)的日志數(shù)據(jù),即Function中的日志數(shù)據(jù)。

4. 技術(shù)點(diǎn)
4.1 基礎(chǔ)知識
基礎(chǔ)準(zhǔn)備:
a. 大致了解一下Flink的日志配置。參考Flink官方文檔《logging》
b. 熟悉一下log4j2是怎么配置的。 參考Log4j官方文檔《Kafka Appender》和《PatternLayout》
這里記錄一下log4j的幾個配置知識點(diǎn),詳細(xì)參考官方文檔《lookup》:
a. 參數(shù)輸入相關(guān):
- ${env:變量名},注入System.getEnv中的參數(shù)
- ${sys:變量名},注入System.getProperties()中的參數(shù)
- %X{變量名} ,注入MDC中的參數(shù)
- ${ctx:變量名},注入ThreadContext中的參數(shù)
官方文檔中提到
- log4j-session.properties: used by the command line interface when starting a Kubernetes/Yarn session cluster (i.e., kubernetes-session.sh/yarn-session.sh)
雖然我們的Flink是Yarn Session(detached mode)模式,但是我們并不是使用原生的yarn-session.sh腳本去啟動,而是筆者分析了Flink Yarn Session的啟動模式之后,重寫了啟動代碼,以讓Flink更好地融入到大數(shù)據(jù)平臺,讓大數(shù)據(jù)平臺能掌控Flink集群的生命周期、啟動/停止,監(jiān)控,在指定資源隊(duì)列運(yùn)行多個特定標(biāo)簽的Flink集群等。在筆者啟動Flink級群的時候,使用的是log4j.properties,所以接下來將在這里面修改。
4.2 Flink的log4j.properties文件配置
# to Kafka
appender.kafka.type = Kafka
appender.kafka.name = Kafka
appender.kafka.topic = _log_flink
appender.kafka.key = ${env:CONTAINER_ID}
appender.kafka.property.type = Property
appender.kafka.property.name= bootstrap.servers
appender.kafka.property.value= XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
appender.kafka.filter.ctxmap.type=ContextMapFilter
appender.kafka.filter.ctxmap.onMismatch=DENY
appender.kafka.filter.ctxmap.kvp_1.type=KeyValuePair
appender.kafka.filter.ctxmap.kvp_1.key=collect_log
appender.kafka.filter.ctxmap.kvp_1.value=true
appender.kafka.layout.type = PatternLayout
# MDC參數(shù)where格式:runUid.nodeId ,subTaskIndex格式:subIndexId+1
appender.kafka.layout.pattern = %X{where}.${event:Timestamp}.%X{subTaskIndex} %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %-60c L:%L -%msg%n
rootLogger.appenderRef.kafka.ref = Kafka
這里我們只想采集JobManager中構(gòu)建Job過程的日志和TaskManager中執(zhí)行Task時Function內(nèi)的日志。所以這里用了一個ContextMapFilter過濾器,其中有一個MDC參數(shù)collect_log,如果要開始收集日志,帶代碼中將它設(shè)置為true
MDC.put("collect_log", true) ;
JobManager和TaskManager都是用這個log4j.propeties文件,JobMananger中構(gòu)建Job的過程不屬于任何一個節(jié)點(diǎn),它的where參數(shù)中nodeId為0,subIndexId+1部分也為0,以和節(jié)點(diǎn)區(qū)別。
因?yàn)樵贘obManager構(gòu)建Job的過程中,是沒有JobId的,所以在外部傳入一個runUid用來標(biāo)識此次計(jì)算管道提交運(yùn)行。
這樣就把日志采集存儲到了Kafka。

4.3 使用Flume將日志數(shù)據(jù)從Kafka同步到HBase
日志存儲到HBase之后,需要有一個rowKey,這里對Kafka中的消息做一下拆分,使用第一個空格前的內(nèi)容作為rowKey,第一個空格之后的
內(nèi)容作為日志內(nèi)容。即
- %X{where}.${event:Timestamp} 作為rowKey
- %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %-60c L:%L -%msg%n 作為消息內(nèi)容
當(dāng)一個任務(wù)節(jié)點(diǎn)或者構(gòu)建Job過程中,1毫秒內(nèi)輸出多條日志,則多條日志的rokwKey會相同,為此在Flume將日志從Kafka遷移到HBase的過程中,rowKey需要在后面再加上一個Kafka偏移量。
flume的配置如下:
#### 配置名為_log_flink的代理,用于采集kafka的_log_flink主題數(shù)據(jù),寫入hbase
# 指定代理的組件名稱
_log_flink.sources = r1
_log_flink.sinks = k1
_log_flink.channels = c1
# ==============================配置sources組件========
# sources組件類型
_log_flink.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
_log_flink.sources.r1.batchSize = 10
_log_flink.sources.r1.kafka.bootstrap.servers = XCloud150:9092,XCloud151:9092,XCloud152:9092,XCloud153:9092,XCloud154:9092
_log_flink.sources.r1.topic = _log_flink
# 組件要綁定的通道
_log_flink.sources.r1.channels = c1
_log_flink.sources.r1.consumer.timeout.ms = 1000
_log_flink.sources.r1.kafka.consumer.group.id = group33
_log_flink.sources.r1.kafka.consumer.auto.offset.reset = earliest
_log_flink.sources.r1.interceptors = i1
_log_flink.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.extend.AviatorInterceptorBuilder
_log_flink.sources.r1.interceptors.i1.handler = let i = string.indexOf(body , " ") ; \
if(i>0){ \
let key = string.substring(body , 0 , i) + "." + seq.get(headers , "offset") ; \
let msg = string.substring(body , i+1) ; \
seq.put(headers , "hbase_rowkey" , key) ; \
return msg ; \
} \
return nil ;
# =============================配置sink組件======
# _log_flink.sinks.k1.type = logger
# _log_flink.sinks.k1.channel = c1
# sink組件類型
_log_flink.sinks.k1.type = hbase2
# HBase中要寫入的表的名稱
_log_flink.sinks.k1.table = xz:_log_flink
# process不要返回backoff,讓循環(huán)加快
_log_flink.sinks.k1.backoffEnabled = false
# batchsize 減小到50,缺省100
_log_flink.sinks.k1.batchSize = 50
# HBase中要寫入的列簇
_log_flink.sinks.k1.columnFamily = logInfo
# 自定義的序列化工具
_log_flink.sinks.k1.serializer = org.apache.flume.sink.hbase2.HeaderHBase2EventSerializer
# 指定列簇下的列
_log_flink.sinks.k1.serializer.payloadColumn = body
# 組件要綁定的通道
_log_flink.sinks.k1.channel = c1
# =============================配置channel組件====
# channel 組件的類型
_log_flink.channels.c1.type = memory
# 將沒有數(shù)據(jù)時的take等待時間減小到0.2秒,缺省是3秒
_log_flink.channels.c1.keep-alive = 200
_log_flink.channels.c1.keep-alive-timeunit = ms
# 容量,通道中存儲的最大事件數(shù)
_log_flink.channels.c1.capacity = 10000
# 每次交易,channel從source獲取,或匯給sink的最大事件數(shù)
_log_flink.channels.c1.transactionCapacity = 10000
_log_flink.channels.c1.byteCapacityBufferPercentage = 20
_log_flink.channels.c1.byteCapacity = 800000
這里使用了自己開發(fā)的一個flume攔截器:org.apache.flume.interceptor.extend.AviatorInterceptorBuilder。關(guān)于它參看《flume攔截器--使用Aviator擴(kuò)展攔截器功能》
如此就把數(shù)據(jù)存儲到了HBase中
