報告(Reporter)
通過 conf/flink-conf.yaml 文件配置一個或多個 Reporters 來暴露度量值給外部系統(tǒng),這些 Reporter 將在作業(yè)和任務啟動的時候實例化。
-
metrics.reporter.<name>.<config>:名字為<name>的 Reporter 的通用設置 -
metrics.reporter.<name>.class:名字為<name>的 Reporter class -
metrics.reporter.<name>.interval:名字為<name>的 Reporter 的間隔時間 -
metrics.reporter.<name>.scope.delimiter:名字為<name>的 Reporter 的標識符的分隔符(默認使用metrics.scope.delimiter) -
metrics.reporters:(可選)以逗號分隔的包含報告名稱列表。默認情況下,將使用所有已配置的報告。
所有的 Reporter 配置至少需要配置 class 屬性,還有一些允許配置記錄間隔。下面是一些 Reporter 的配置實例:
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
包含 Reporter 的 jar 必須放到 /lib 文件夾,這樣 Flink 就可以訪問到這些 jar。
可以通過繼承 org.apache.flink.metrics.reporter.MetricReporter 接口來實現自己的 Reporter,如果需要定期發(fā)送記錄,需要繼承 Scheduled 接口。
下面是一些支持的 Reporter:
JMX(org.apache.flink.metrics.jmx.JMXReporter)
不需要添加額外的依賴就可以支持 JMX Reporter,默認是不激活的。
參數:
- port - (可選)JMX 連接監(jiān)聽的端口。為了能夠在一個主機上運行多個 Reporter 實例(例如,當一個 TaskManager 與 JobManager 共同使用時),建議端口范圍(如 9250-9260),實際端口將顯示在相關作業(yè)或 TaskManager 日志中。如果設置此設置,Flink 將為給定的端口/范圍啟動額外的 JMX 連接器。度量指標將在本地默認的JMX實例上顯示。
配置示例:
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789
通過 JMX 公開的度量由域(domain)和鍵屬性列表(key-properties)標識,這些屬性一起構成對象名。
域始終以 org.apache.flink 開頭,后跟一個通用的度量標識符。與通常的標識符不同,它不受作用域格式的影響,不包含任何變量,并且在跨作業(yè)時也是常量。例子:org.apache.flink.job.task.numbytesout。
鍵屬性列表包含與給定指標關聯的所有變量的值,無論配置的作用域格式如何。例子:host=localhost,job_name=myjob,task_name=mytask。
因此,域標識一個度量類,鍵屬性列表標識該度量的一個(或多個)實例。
Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter)
要使用此 Reporter,必須復制 /opt/flink-metrics-ganglia-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
- host - 在 gmond.conf 中的 udp_recv_channel.bind 下配置的 gmond 主機地址
- port - 在 gmond.conf 的 udp_recv_channel.port 下配置的 gmond 端口
- tmax - 舊指標應保留多長時間的軟限制
- dmax - 舊指標應保留多長時間的硬限制
- ttl - 傳輸的 UDP 包的生存時間
- addressingMode - 要使用的 UDP 尋址模式(單播/多播)
配置示例:
metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST
Graphite(org.apache.flink.metrics.graphite.GraphiteReporter)
要使用此 Reporter,必須復制 /opt/flink-metrics-graphite-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
- host - Graphite 服務器主機地址
- port - Graphite 服務器端口
- protocol - 使用協(xié)議(TCP / UDP)
配置示例:
metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP
Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)
要使用此 Reporter,必須復制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
- port - (可選)Prometheus exporter 監(jiān)聽的端口,默認為 9249。為了能夠在一個主機上運行多個報告實例(例如,當一個 TaskManager 與 JobManager 共同使用時),建議使用端口范圍(如:9250-9260)。
配置示例:
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
Flink 度量類型映射到 Prometheus 度量類型,如下所示:
| Flink | Prometheus | Description |
|---|---|---|
| Counter | Gauge | Prometheus 計數器不能減 |
| Gauge | Gauge | Prometheus 僅支持數字和布爾類型 |
| Histogram | Summary | 分位數 .5,.75,.95,.98,.99 和 .999 |
| Meter | Gauge | The gauge exports the meter’s rate |
PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)
要使用此 Reporter,必須復制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
| 鍵 | 默認值 | 描述 |
|---|---|---|
| deleteOnShutdown | true | 指定是否在關閉時從 PushGateway 中刪除指標。 |
| Host | (none) | PushGateway 服務器主機。 |
| jobName | (none) | 將推送指標的作業(yè)名稱。 |
| port | -1 | PushGateway 服務器端口。 |
| randomJobNameSuffix | true | 指定是否應將隨機后綴附加到作業(yè)名稱。 |
配置示例:
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
PrometheusPushGatewayReporter 將指標推送到 Pushgateway,可由 Prometheus 抓取。
StatsD(org.apache.flink.metrics.statsd.StatsDReporter)
要使用此 Reporter,必須復制 /opt/flink-metrics-statsd-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:
- host - StatsD 服務器主機
- port - StatsD 服務器端口
配置示例:
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125
Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter)
要使用此 Reporter,必須復制 /opt/flink-metrics-datadog-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
Flink 指標,如任何變量 <host>,<job_name>,<tm_id>,<subtask_index>,<task_name> 和 <operator_name>,將被發(fā)送到 Datadog 作為標簽。標簽看起來像 host:localhost 和 job_name:myjobname。
參數:
- apikey - Datadog APIKeys
- tags - (可選)發(fā)送到 Datadog 時將應用于度量標準的全局標記。標簽應僅以逗號分隔
配置示例:
metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod
Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter)
要使用此 Reporter,必須復制 /opt/flink-metrics-slf4j-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
配置示例:
metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS
系統(tǒng)指標
Flink 默認會收集當前狀態(tài)的指標,下文的表格中包括以下5列:
- “Scope”列描述了生成系統(tǒng)范圍的范圍格式,比如,如果表格里面的值為“Operator”,那么“metrics.scope.operator”將作為指標的范圍格式。如果表格包含使用斜線分割的多個值,那么系統(tǒng)將根據不同的值分別報告多個指標,比如同時包含 job- 和 taskmanagers 兩個。
- “Infix”(可選)列描述了附加哪個中綴到系統(tǒng)范圍之后。
- “Metrics” 列出了此系統(tǒng)范圍和中綴注冊的所有特性的名字。
- “Description”列描述了指標測量的信息。
- “Type”描述了指標的類型。
請注意,“infix” 和 “Metrics” 列中所有的點根據 “metrics.delimiter” 設置變化。
因此,為了推斷指標的標識符:
- 先從“Scope”列獲取范圍格式。
- 如果“Infix”列有值的話,附加到范圍格式后面,并根據“metrices.delimiter”設置附加相應的分隔符。
- 附加指標的名稱。
CPU
| Scope | Infix | Metrics | Description | Type |
|---|---|---|---|---|
| Job-/TaskManager | Status.JVM.CPU | Load | JVM CPU使用情況。 | Gauge |
| - | - | Time | JVM CPU時間。 | Gauge |
Memory
| Scope | Infix | Metrics | Description | Type |
|---|---|---|---|---|
| Job-/TaskManager | Status.JVM.Memory | Heap.Used | 當前使用的堆內存量(bytes)。 | Gauge |
| - | - | Heap.Committed | 保證可供 JVM 使用的堆內存量(bytes)。 | Gauge |
| - | - | Heap.Max | 可用于內存管理的最大堆內存量(bytes)。 | Gauge |
| - | - | NonHeap.Used | 當前使用的非堆內存量(bytes)。 | Gauge |
| - | - | NonHeap.Committed | 保證 JVM 可用的非堆內存量(bytes)。 | Gauge |
| - | - | NonHeap.Max | 可用于內存管理的最大非堆內存量(bytes)。 | Gauge |
| - | - | Direct.Count | 直接緩沖池中的緩沖區(qū)數。 | Gauge |
| - | - | Direct.MemoryUsed | JVM 用于直接緩沖池的內存量(bytes)。 | Gauge |
| - | - | Direct.TotalCapacity | 直接緩沖池中所有緩沖區(qū)的總容量(bytes)。 | Gauge |
| - | - | Mapped.Count | 映射緩沖池中的緩沖區(qū)數。 | Gauge |
| - | - | Mapped.MemoryUsed | JVM 用于映射緩沖池的內存量(bytes)。 | Gauge |
| - | - | Mapped.TotalCapacity | 映射緩沖池中的緩沖區(qū)數(bytes)。 | Gauge |
Threads
| Scope | Infix | Metrics | Description | Type |
|---|---|---|---|---|
| Job-/TaskManager | Status.JVM.Threads | Count | 活動線程總數。 | Gauge |
GarbageCollection
| Scope | Infix | Metrics | Description | Type |
|---|---|---|---|---|
| Job-/TaskManager | Status.JVM.GarbageCollector |
<GarbageCollector>.Count |
已發(fā)生的集合總數。 | Gauge |
| - | - |
<GarbageCollector>.Time |
執(zhí)行垃圾收集所花費的總時間。 | Gauge |
ClassLoader
| Scope | Infix | Metrics | Description | Type |
|---|---|---|---|---|
| Job-/TaskManager | Status.JVM.ClassLoader | ClassesLoaded | JVM 啟動以來加載的類總數。 | Gauge |
| - | - | ClassesUnloaded | JVM 啟動以來卸載的類總數。 | Gauge |
Network
| Scope | Infix | Metrics | Description | Type |
|---|---|---|---|---|
| TaskManager | Status.Network | AvailableMemorySegments | 未使用的內存段數。 | Gauge |
| - | - | TotalMemorySegments | 分配的內存段數。 | Gauge |
| Task | buffers | inputQueueLength | 排隊的輸入緩沖區(qū)數。 | Gauge |
| - | - | outputQueueLength | 排隊輸出緩沖區(qū)的數量。 | Gauge |
| - | - | inPoolUsage | 估計輸入緩沖區(qū)的使用情況。 | Gauge |
| - | - | outPoolUsage | 估計輸出緩沖區(qū)的使用情況。 | Gauge |
| - | Network.<Input/Output>.<gate>
|
totalQueueLen | 所有輸入/輸出通道中排隊緩沖區(qū)的總數。 | Gauge |
| - | - | minQueueLen | 所有輸入/輸出通道中的最小排隊緩沖區(qū)數。 | Gauge |
| - | - | maxQueueLen | 所有輸入/輸出通道中的最大排隊緩沖區(qū)數。 | Gauge |
| - | - | avgQueueLen | 所有輸入/輸出通道中的平均緩沖區(qū)數。 | Gauge |
Cluster
| Scope | Metrics | Description | Type |
|---|---|---|---|
| JobManager | numRegisteredTaskManagers | 注冊 TaskManager 的數量。 | Gauge |
| - | numRunningJobs | 正在運行的作業(yè)數量。 | Gauge |
| - | taskSlotsAvailable | 可用任務槽的數量。 | Gauge |
| - | taskSlotsTotal | 任務槽的總數。 | Gauge |
Availability
| Scope | Metrics | Description | Type |
|---|---|---|---|
| Job | restartingTime | 重新啟動作業(yè)所花費的時間,或當前重新啟動的持續(xù)時間(ms)。 | Gauge |
| - | uptime | 作業(yè)運行的時間不間斷。對于已完成的作業(yè),返回-1(ms)。 | Gauge |
| - | downtime | 對于當前處于故障/恢復狀態(tài)的作業(yè),在此中斷期間經過的時間。對于正在運行的作業(yè)返回0,對于已完成的作業(yè)返回-1(ms)。 | Gauge |
| - | fullRestarts | 自提交此作業(yè)以來完全重新啟動的總次數。 | Gauge |
Checkpointing
| Scope | Metrics | Description | Type |
|---|---|---|---|
| Job | lastCheckpointDuration | 完成最后一個檢查點所花費的時間(ms)。 | Gauge |
| - | lastCheckpointSize | 最后一個檢查點的總大?。╞ytes)。 | Gauge |
| - | lastCheckpointExternalPath | 存儲最后一個外部檢查點的路徑。 | Gauge |
| - | lastCheckpointRestoreTimestamp | 在協(xié)調器上恢復最后一個檢查點時的時間戳(ms)。 | Gauge |
| - | lastCheckpointAlignmentBuffered | 在最后一個檢查點的所有子任務上進行對齊期間的緩沖字節(jié)數(ms)。 | Gauge |
| - | numberOfInProgressCheckpoints | 進行中檢查點的數量。 | Gauge |
| - | numberOfCompletedCheckpoints | 成功完成檢查點的數量。 | Gauge |
| - | numberOfFailedCheckpoints | 失敗檢查點的數量。 | Gauge |
| - | totalNumberOfCheckpoints | 總檢查點的數量(正在進行,已完成,失?。?。 | Gauge |
| Task | checkpointAlignmentTime | 最后一次屏障對齊完成所花費的時間(nanoseconds),或當前對齊到目前為止所用的時間(nanoseconds)。 | Gauge |
IO
| Scope | Metrics | Description | Type |
|---|---|---|---|
| Job |
<SOURCE_ID>.<source_subtask_index>.<operator_id>.<operator_subtask_index>.latency |
從給定源子任務到算子子任務的延遲分布(ms)。 | Histogram |
| Task | numBytesInLocal | 此任務從本地源讀取的總字節(jié)數。 | Counter |
| - | numBytesInLocalPerSecond | 此任務每秒從本地源讀取的字節(jié)數。 | Meter |
| - | numBytesInRemote | 此任務從遠程源讀取的總字節(jié)數。 | Counter |
| - | numBytesInRemotePerSecond | 此任務每秒從遠程源讀取的字節(jié)數。 | Meter |
| - | numBuffersInLocal | 此任務從本地源讀取的網絡緩沖區(qū)總數。 | Counter |
| - | numBuffersInLocalPerSecond | 此任務每秒從本地源讀取的網絡緩沖區(qū)數。 | Meter |
| - | numBuffersInRemote | 此任務從遠程源讀取的網絡緩沖區(qū)總數。 | Counter |
| - | numBuffersInRemotePerSecond | 此任務每秒從遠程源讀取的網絡緩沖區(qū)數。 | Meter |
| - | numBytesOut | 此任務已發(fā)出的總字節(jié)數。 | Counter |
| - | numBytesOutPerSecond | 此任務每秒發(fā)出的字節(jié)數。 | Meter |
| - | numBuffersOut | 此任務已發(fā)出的網絡緩沖區(qū)總數。 | Counter |
| - | numBuffersOutPerSecond | 此任務每秒發(fā)出的網絡緩沖區(qū)數。 | Meter |
| Task/Operator | numRecordsIn | 此算子/任務已收到的記錄總數。 | Counter |
| - | numRecordsInPerSecond | 此算子/任務每秒接收的記錄數。 | Meter |
| - | numRecordsOut | 此算子/任務已發(fā)出的記錄總數。 | Counter |
| - | numRecordsOutPerSecond | 此算子/任務每秒發(fā)送的記錄數。 | Meter |
| - | numLateRecordsDropped | 此算子/任務因遲到而丟失的記錄數。 | Counter |
| - | currentInputWatermark | 此算子/任務收到的最后一個水?。╩s)。注意:對于具有2個輸入的算子/任務,這是最后收到的水印的最小值。 | Gauge |
| Operator | currentInput1Watermark | 此算子在其第一個輸入(ms)中收到的最后一個水印。注意:僅適用于具有2個輸入的算子。 | Gauge |
| - | currentInput2Watermark | 此算子在其第二個輸入中接收的最后一個水?。╩s)。注意:僅適用于具有2個輸入的算子。 | Gauge |
| - | currentOutputWatermark | 此算子發(fā)出的最后一個水?。╩s)。 | Gauge |
| - | numSplitsProcessed | 此數據源已處理的InputSplits總數。 | Gauge |
Connectors
Kafka 連接器
| Scope | Metrics | User Variables | Description | Type |
|---|---|---|---|---|
| Operator | commitsSucceeded | N / A | 如果啟用了偏移提交并且啟用了檢查點,則成功向 Kafka 提交的偏移提交總數。 | Counter |
| - | commitsFailed | N / A | 如果啟用了偏移提交并且啟用了檢查點,則 Kafka 的偏移提交失敗總數。請注意,將偏移量提交回 Kafka 只是暴露消費者進度的一種方法,因此提交失敗不會影響 Flink 的檢查點分區(qū)偏移的完整性。 | Counter |
| - | committedOffsets | Topic,分區(qū) | 對于每個分區(qū),最后成功提交到 Kafka 的偏移量??梢酝ㄟ^主題名稱和分區(qū)ID指定特定分區(qū)的度量標準。 | Gauge |
| - | currentOffsets | Topic,分區(qū) | 消費者對每個分區(qū)的當前讀取偏移量??梢酝ㄟ^主題名稱和分區(qū)ID指定特定分區(qū)的度量標準。 | Gauge |
Kinesis 連接器
| Scope | Metrics | User Variables | Description | Type |
|---|---|---|---|---|
| Operator | millisBehindLatest | stream,shardId | 對于每個 Kinesis 分片,消費者在流的頭部后面的毫秒數,表示消費者當前時間落后多少??梢酝ㄟ^流名稱和分片標識指定特定分片的度量標準。值為0表示記錄處理被捕獲,此時沒有要處理的新記錄。值-1表示該度量標準尚未報告。 | Gauge |
| - | sleepTimeMillis | stream,shardId | 消費者在從 Kinesis 獲取記錄之前花費的毫秒數??梢酝ㄟ^流名稱和分片標識指定特定分片的度量標準。 | Gauge |
| - | maxNumberOfRecordsPerFetch | stream,shardId | 消費者在單個 getRecords 調用 Kinesis 時請求的最大記錄數。 | Gauge |
| - | numberOfAggregatedRecordsPerFetch | stream,shardId | 消費者在單個 getRecords 調用 Kinesis 時獲取的聚合 Kinesis 記錄數。 | Gauge |
| - | numberOfDeggregatedRecordsPerFetch | stream,shardId | 消費者在單個 getRecords 調用 Kinesis 時獲取的分解 Kinesis 記錄的數量。 | Gauge |
| - | averageRecordSizeBytes | stream,shardId | Kinesis 記錄的平均大?。╞ytes),由消費者在單個 getRecords 調用中獲取。 | Gauge |
| - | runLoopTimeNanos | stream,shardId | 消費者在運行循環(huán)中花費的實際時間(ns)。 | Gauge |
| - | loopFrequencyHz | stream,shardId | 一秒鐘內調用 getRecords 的次數。 | Gauge |
| - | bytesRequestedPerFetch | stream,shardId | 在一次調用 getRecords 中請求的字節(jié)數。 | Gauge |
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html