History Server概述
Flink有一個History Server,可以用來在相應(yīng)的Flink集群關(guān)閉后查詢已完成作業(yè)的統(tǒng)計信息。例如有個批處理作業(yè)是凌晨才運行的,并且我們都知道只有當(dāng)作業(yè)處于運行中的狀態(tài),才能夠查看到相關(guān)的日志信息和統(tǒng)計信息。所以如果作業(yè)由于異常退出或者處理結(jié)果有問題,我們又無法及時查看(凌晨運行的)作業(yè)的相關(guān)日志信息。那么History Server就顯得十分重要了,因為通過History Server我們才能查詢這些已完成作業(yè)的統(tǒng)計信息,無論是正常退出還是異常退出。
此外,它對外提供了REST API,它接受HTTP請求并使用JSON數(shù)據(jù)進行響應(yīng)。Flink任務(wù)停止后,JobManager會將已經(jīng)完成任務(wù)的統(tǒng)計信息進行存檔,History Server進程則在任務(wù)停止后可以對任務(wù)統(tǒng)計信息進行查詢。比如:最后一次的checkpoint、任務(wù)運行時的相關(guān)配置。
官方文檔:
History Server的使用
History Server允許查詢由JobManager歸檔的已完成作業(yè)的狀態(tài)和統(tǒng)計信息。已完成作業(yè)的歸檔在JobManager上進行,JobManager會將歸檔的作業(yè)信息upload到文件系統(tǒng)目錄,這個文件系統(tǒng)可以是本地文件系統(tǒng)、HDFS、H3等,這個目錄是可以在配置文件中指定的。然后還需要配置History Server去掃描這個目錄,并且可以配置掃描的間隔時間。
因此,我們在使用History Server之前需要配置一下這幾個配置項:
[root@hadoop01 /usr/local/flink]# vim conf/flink-conf.yaml
# 指定由JobManager歸檔的作業(yè)信息所存放的目錄,這里使用的是HDFS
jobmanager.archive.fs.dir: hdfs://hadoop01:8020/completed-jobs/
# 指定History Server掃描哪些歸檔目錄,多個目錄使用逗號分隔
historyserver.archive.fs.dir: hdfs://hadoop01:8020/completed-jobs/
# 指定History Server間隔多少毫秒掃描一次歸檔目錄
historyserver.archive.fs.refresh-interval: 10000
# History Server所綁定的ip,0.0.0.0代表允許所有ip訪問
historyserver.web.address: 0.0.0.0
# 指定History Server所監(jiān)聽的端口號
historyserver.web.port: 8082
- 關(guān)于History Server的配置可以參考官方文檔:History Server Config
配置完成后,可以使用如下命令啟動History Server:
[root@hadoop01 /usr/local/flink]# ./bin/historyserver.sh start
Starting historyserver daemon on host hadoop01.
[root@hadoop01 /usr/local/flink]#
檢查一下是否啟動成功:
[root@hadoop01 /usr/local/flink]# netstat -lntp |grep 8082
tcp6 0 0 :::8082 :::* LISTEN 3200/java
[root@hadoop01 /usr/local/flink]# jps |grep HistoryServer
3200 HistoryServer
[root@hadoop01 /usr/local/flink]#
提交一個作業(yè)跑一下,看看完成后是否會生成歸檔信息:
[root@hadoop01 /usr/local/flink]# ./bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
作業(yè)跑完后,可以在HDFS中看到生成的歸檔目錄:
[root@hadoop01 /usr/local/flink]# hadoop fs -ls /
Found 1 items
drwxr-xr-x - root supergroup 0 2020-09-30 11:00 /completed-jobs
[root@hadoop01 /usr/local/flink]#
然后使用瀏覽器訪問8082端口可以在web界面上查看已運行完的作業(yè)信息:

點進去可以看到詳細(xì)的統(tǒng)計信息:

這些信息都是以JSON的格式存放在歸檔目錄下的文件中,文件以作業(yè)的id命名:
[root@hadoop01 /usr/local/flink]# hadoop fs -ls /completed-jobs
Found 1 items
-rw-r--r-- 1 root supergroup 31606 2020-09-30 11:00 /completed-jobs/3f9f7ec2a7a765660bdc09922d0b7d0f
[root@hadoop01 /usr/local/flink]#
History Server REST API使用
根據(jù)官方文檔的描述,History Server提供了如下REST API,所有API的響應(yīng)數(shù)據(jù)都是JSON格式:
/config/jobs/overview/jobs/<jobid>/jobs/<jobid>/vertices/jobs/<jobid>/config/jobs/<jobid>/exceptions/jobs/<jobid>/accumulators/jobs/<jobid>/vertices/<vertexid>/jobs/<jobid>/vertices/<vertexid>/subtasktimes/jobs/<jobid>/vertices/<vertexid>/taskmanagers/jobs/<jobid>/vertices/<vertexid>/accumulators/jobs/<jobid>/vertices/<vertexid>/subtasks/accumulators/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtasknum>/attempts/<attempt>/accumulators/jobs/<jobid>/plan
/config接口可以獲取基礎(chǔ)配置信息,請求示例:
[root@hadoop01 ~]# curl http://localhost:8082/config
{"refresh-interval":10000,"timezone-name":"中國時間","timezone-offset":28800000,"flink-version":"1.11.2","flink-revision":"DeadD0d0 @ 1970-01-01T01:00:00+01:00","features":{"web-submit":false}}
[root@hadoop01 ~]#
/config接口可以獲取已完成的job信息列表,請求示例:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/overview
{"jobs":[{"jid":"3f9f7ec2a7a765660bdc09922d0b7d0f","name":"Flink Java Job at Wed Sep 30 11:00:11 CST 2020","state":"FINISHED","start-time":1601434820548,"end-time":1601434826749,"duration":6201,"last-modification":1601434826749,"tasks":{"total":3,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":3,"canceling":0,"canceled":0,"failed":0,"reconciling":0}}]}
[root@hadoop01 ~]#
/jobs/<jobid>接口可以獲取指定Job的詳細(xì)信息,我們可以基于上一個接口返回的Job ID獲取指定Job的詳細(xì)信息,由于內(nèi)容太多就不貼出來了:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f
/jobs/<jobid>/config接口可以獲取指定Job的配置信息:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/config
{"jid":"3f9f7ec2a7a765660bdc09922d0b7d0f","name":"Flink Java Job at Wed Sep 30 11:00:11 CST 2020","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{}}}
[root@hadoop01 ~]#
/jobs/<jobid>/exceptions接口可以獲取指定Job的異常信息:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/exceptions
{"root-exception":null,"timestamp":null,"all-exceptions":[],"truncated":false}
[root@hadoop01 ~]#
/jobs/<jobid>/accumulators可以獲取指定Job的計數(shù)器信息:
[root@hadoop01 ~]# curl http://localhost:8082/jobs/3f9f7ec2a7a765660bdc09922d0b7d0f/accumulators
其余API也是類似的,這里就不逐一演示了。
Monitoring REST API
除了History Server REST API,F(xiàn)link還提供了Monitoring REST API,該API也是RESTFul風(fēng)格,接受HTTP請求,響應(yīng)JSON數(shù)據(jù)。監(jiān)控API可以用來查詢正在運行的作業(yè)以及最近完成的作業(yè)的狀態(tài)和統(tǒng)計信息。Flink自己的dashboard就是使用的這個監(jiān)控API,并且該監(jiān)控API也可以被自定義的監(jiān)控工具使用,例如我們可以自己基于這些API開發(fā)屬于自己的監(jiān)控工具。官方文檔:
監(jiān)控API由web服務(wù)器支持作為 Dispatcher 的一部分運行。默認(rèn)情況下,此服務(wù)監(jiān)聽在8081端口,可以在flink-conf.yaml通過rest.port進行配置。需要注意的是,目前監(jiān)控API的web服務(wù)和儀表板的web服務(wù)是相同的,因此在同一端口上一起運行。不過,它們響應(yīng)不同的HTTP Url。
在有多個 Dispatcher 的情況下(為了高可用性),每個 Dispatcher 將運行其自己的監(jiān)控API實例,該實例提供有關(guān)已完成和正在運行的作業(yè)的信息,而該 Dispatcher 會被選為集群的leader。
官方文檔中有詳細(xì)列出所有的監(jiān)控API,如果需要開發(fā)自己的監(jiān)控平臺,就可以深入了解下:
Flink Metrics
Flink對外提供了一個度量(Metrics)系統(tǒng),它允許收集和向外部系統(tǒng)提供度量信息。官方文檔:
可以在任何繼承了RichFunction的用戶函數(shù)內(nèi)部調(diào)用 getRuntimeContext().getMetricGroup() 方法來訪問度量系統(tǒng)。此方法返回一個MetricGroup對象,你可以在該對象上創(chuàng)建和注冊新的度量。如下示例:
class MyMapper extends RichMapFunction[String,String] {
@transient private var counter: Counter = _
override def open(parameters: Configuration): Unit = {
counter = getRuntimeContext()
.getMetricGroup()
// 注冊一個計數(shù)器度量
.counter("myCounter")
}
override def map(value: String): String = {
counter.inc()
value
}
}
默認(rèn)情況下,F(xiàn)link收集了幾個可以深入了解當(dāng)前狀態(tài)的指標(biāo)。官方文檔對所有指標(biāo)都有相應(yīng)的描述: