Flink 指標(二)

報告(Reporter)

通過 conf/flink-conf.yaml 文件配置一個或多個 Reporters 來暴露度量值給外部系統(tǒng),這些 Reporter 將在作業(yè)和任務啟動的時候實例化。

  • metrics.reporter.<name>.<config>:名字為 <name> 的 Reporter 的通用設置
  • metrics.reporter.<name>.class:名字為 <name> 的 Reporter class
  • metrics.reporter.<name>.interval:名字為 <name> 的 Reporter 的間隔時間
  • metrics.reporter.<name>.scope.delimiter:名字為 <name> 的 Reporter 的標識符的分隔符(默認使用 metrics.scope.delimiter
  • metrics.reporters:(可選)以逗號分隔的包含報告名稱列表。默認情況下,將使用所有已配置的報告。

所有的 Reporter 配置至少需要配置 class 屬性,還有一些允許配置記錄間隔。下面是一些 Reporter 的配置實例:

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.my_jmx_reporter.port: 9020-9040

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

包含 Reporter 的 jar 必須放到 /lib 文件夾,這樣 Flink 就可以訪問到這些 jar。
可以通過繼承 org.apache.flink.metrics.reporter.MetricReporter 接口來實現自己的 Reporter,如果需要定期發(fā)送記錄,需要繼承 Scheduled 接口。

下面是一些支持的 Reporter:

JMX(org.apache.flink.metrics.jmx.JMXReporter)

不需要添加額外的依賴就可以支持 JMX Reporter,默認是不激活的。

參數:

  • port - (可選)JMX 連接監(jiān)聽的端口。為了能夠在一個主機上運行多個 Reporter 實例(例如,當一個 TaskManager 與 JobManager 共同使用時),建議端口范圍(如 9250-9260),實際端口將顯示在相關作業(yè)或 TaskManager 日志中。如果設置此設置,Flink 將為給定的端口/范圍啟動額外的 JMX 連接器。度量指標將在本地默認的JMX實例上顯示。

配置示例:

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8789

通過 JMX 公開的度量由域(domain)和鍵屬性列表(key-properties)標識,這些屬性一起構成對象名。

域始終以 org.apache.flink 開頭,后跟一個通用的度量標識符。與通常的標識符不同,它不受作用域格式的影響,不包含任何變量,并且在跨作業(yè)時也是常量。例子:org.apache.flink.job.task.numbytesout。

鍵屬性列表包含與給定指標關聯的所有變量的值,無論配置的作用域格式如何。例子:host=localhost,job_name=myjob,task_name=mytask。

因此,域標識一個度量類,鍵屬性列表標識該度量的一個(或多個)實例。

Ganglia(org.apache.flink.metrics.ganglia.GangliaReporter)

要使用此 Reporter,必須復制 /opt/flink-metrics-ganglia-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。

參數:

  • host - 在 gmond.conf 中的 udp_recv_channel.bind 下配置的 gmond 主機地址
  • port - 在 gmond.conf 的 udp_recv_channel.port 下配置的 gmond 端口
  • tmax - 舊指標應保留多長時間的軟限制
  • dmax - 舊指標應保留多長時間的硬限制
  • ttl - 傳輸的 UDP 包的生存時間
  • addressingMode - 要使用的 UDP 尋址模式(單播/多播)

配置示例:

metrics.reporter.gang.class: org.apache.flink.metrics.ganglia.GangliaReporter
metrics.reporter.gang.host: localhost
metrics.reporter.gang.port: 8649
metrics.reporter.gang.tmax: 60
metrics.reporter.gang.dmax: 0
metrics.reporter.gang.ttl: 1
metrics.reporter.gang.addressingMode: MULTICAST

Graphite(org.apache.flink.metrics.graphite.GraphiteReporter)

要使用此 Reporter,必須復制 /opt/flink-metrics-graphite-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。

參數:

  • host - Graphite 服務器主機地址
  • port - Graphite 服務器端口
  • protocol - 使用協(xié)議(TCP / UDP)

配置示例:

metrics.reporter.grph.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.grph.host: localhost
metrics.reporter.grph.port: 2003
metrics.reporter.grph.protocol: TCP

Prometheus (org.apache.flink.metrics.prometheus.PrometheusReporter)

要使用此 Reporter,必須復制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。

參數:

  • port - (可選)Prometheus exporter 監(jiān)聽的端口,默認為 9249。為了能夠在一個主機上運行多個報告實例(例如,當一個 TaskManager 與 JobManager 共同使用時),建議使用端口范圍(如:9250-9260)。

配置示例:

metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink 度量類型映射到 Prometheus 度量類型,如下所示:

Flink Prometheus Description
Counter Gauge Prometheus 計數器不能減
Gauge Gauge Prometheus 僅支持數字和布爾類型
Histogram Summary 分位數 .5,.75,.95,.98,.99 和 .999
Meter Gauge The gauge exports the meter’s rate

PrometheusPushGateway(org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter)

要使用此 Reporter,必須復制 /opt/flink-metrics-prometheus-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。

參數:

默認值 描述
deleteOnShutdown true 指定是否在關閉時從 PushGateway 中刪除指標。
Host (none) PushGateway 服務器主機。
jobName (none) 將推送指標的作業(yè)名稱。
port -1 PushGateway 服務器端口。
randomJobNameSuffix true 指定是否應將隨機后綴附加到作業(yè)名稱。

配置示例:

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false

PrometheusPushGatewayReporter 將指標推送到 Pushgateway,可由 Prometheus 抓取。

StatsD(org.apache.flink.metrics.statsd.StatsDReporter)

要使用此 Reporter,必須復制 /opt/flink-metrics-statsd-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
參數:

  • host - StatsD 服務器主機
  • port - StatsD 服務器端口

配置示例:

metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

Datadog(org.apache.flink.metrics.datadog.DatadogHttpReporter)

要使用此 Reporter,必須復制 /opt/flink-metrics-datadog-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。
Flink 指標,如任何變量 <host>,<job_name>,<tm_id>,<subtask_index>,<task_name><operator_name>,將被發(fā)送到 Datadog 作為標簽。標簽看起來像 host:localhostjob_name:myjobname。

參數:

  • apikey - Datadog APIKeys
  • tags - (可選)發(fā)送到 Datadog 時將應用于度量標準的全局標記。標簽應僅以逗號分隔

配置示例:

metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
metrics.reporter.dghttp.apikey: xxx
metrics.reporter.dghttp.tags: myflinkapp,prod

Slf4j(org.apache.flink.metrics.slf4j.Slf4jReporter)

要使用此 Reporter,必須復制 /opt/flink-metrics-slf4j-1.6.1-SNAPSHOT.jar 到 Flink 的 /lib 文件夾下。

配置示例:

metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter
metrics.reporter.slf4j.interval: 60 SECONDS

系統(tǒng)指標

Flink 默認會收集當前狀態(tài)的指標,下文的表格中包括以下5列:

  • “Scope”列描述了生成系統(tǒng)范圍的范圍格式,比如,如果表格里面的值為“Operator”,那么“metrics.scope.operator”將作為指標的范圍格式。如果表格包含使用斜線分割的多個值,那么系統(tǒng)將根據不同的值分別報告多個指標,比如同時包含 job- 和 taskmanagers 兩個。
  • “Infix”(可選)列描述了附加哪個中綴到系統(tǒng)范圍之后。
  • “Metrics” 列出了此系統(tǒng)范圍和中綴注冊的所有特性的名字。
  • “Description”列描述了指標測量的信息。
  • “Type”描述了指標的類型。

請注意,“infix” 和 “Metrics” 列中所有的點根據 “metrics.delimiter” 設置變化。

因此,為了推斷指標的標識符:

  1. 先從“Scope”列獲取范圍格式。
  2. 如果“Infix”列有值的話,附加到范圍格式后面,并根據“metrices.delimiter”設置附加相應的分隔符。
  3. 附加指標的名稱。

CPU

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.CPU Load JVM CPU使用情況。 Gauge
- - Time JVM CPU時間。 Gauge

Memory

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Memory Heap.Used 當前使用的堆內存量(bytes)。 Gauge
- - Heap.Committed 保證可供 JVM 使用的堆內存量(bytes)。 Gauge
- - Heap.Max 可用于內存管理的最大堆內存量(bytes)。 Gauge
- - NonHeap.Used 當前使用的非堆內存量(bytes)。 Gauge
- - NonHeap.Committed 保證 JVM 可用的非堆內存量(bytes)。 Gauge
- - NonHeap.Max 可用于內存管理的最大非堆內存量(bytes)。 Gauge
- - Direct.Count 直接緩沖池中的緩沖區(qū)數。 Gauge
- - Direct.MemoryUsed JVM 用于直接緩沖池的內存量(bytes)。 Gauge
- - Direct.TotalCapacity 直接緩沖池中所有緩沖區(qū)的總容量(bytes)。 Gauge
- - Mapped.Count 映射緩沖池中的緩沖區(qū)數。 Gauge
- - Mapped.MemoryUsed JVM 用于映射緩沖池的內存量(bytes)。 Gauge
- - Mapped.TotalCapacity 映射緩沖池中的緩沖區(qū)數(bytes)。 Gauge

Threads

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.Threads Count 活動線程總數。 Gauge

GarbageCollection

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.GarbageCollector <GarbageCollector>.Count 已發(fā)生的集合總數。 Gauge
- - <GarbageCollector>.Time 執(zhí)行垃圾收集所花費的總時間。 Gauge

ClassLoader

Scope Infix Metrics Description Type
Job-/TaskManager Status.JVM.ClassLoader ClassesLoaded JVM 啟動以來加載的類總數。 Gauge
- - ClassesUnloaded JVM 啟動以來卸載的類總數。 Gauge

Network

Scope Infix Metrics Description Type
TaskManager Status.Network AvailableMemorySegments 未使用的內存段數。 Gauge
- - TotalMemorySegments 分配的內存段數。 Gauge
Task buffers inputQueueLength 排隊的輸入緩沖區(qū)數。 Gauge
- - outputQueueLength 排隊輸出緩沖區(qū)的數量。 Gauge
- - inPoolUsage 估計輸入緩沖區(qū)的使用情況。 Gauge
- - outPoolUsage 估計輸出緩沖區(qū)的使用情況。 Gauge
- Network.
<Input/Output>.
<gate>
totalQueueLen 所有輸入/輸出通道中排隊緩沖區(qū)的總數。 Gauge
- - minQueueLen 所有輸入/輸出通道中的最小排隊緩沖區(qū)數。 Gauge
- - maxQueueLen 所有輸入/輸出通道中的最大排隊緩沖區(qū)數。 Gauge
- - avgQueueLen 所有輸入/輸出通道中的平均緩沖區(qū)數。 Gauge

Cluster

Scope Metrics Description Type
JobManager numRegisteredTaskManagers 注冊 TaskManager 的數量。 Gauge
- numRunningJobs 正在運行的作業(yè)數量。 Gauge
- taskSlotsAvailable 可用任務槽的數量。 Gauge
- taskSlotsTotal 任務槽的總數。 Gauge

Availability

Scope Metrics Description Type
Job restartingTime 重新啟動作業(yè)所花費的時間,或當前重新啟動的持續(xù)時間(ms)。 Gauge
- uptime 作業(yè)運行的時間不間斷。對于已完成的作業(yè),返回-1(ms)。 Gauge
- downtime 對于當前處于故障/恢復狀態(tài)的作業(yè),在此中斷期間經過的時間。對于正在運行的作業(yè)返回0,對于已完成的作業(yè)返回-1(ms)。 Gauge
- fullRestarts 自提交此作業(yè)以來完全重新啟動的總次數。 Gauge

Checkpointing

Scope Metrics Description Type
Job lastCheckpointDuration 完成最后一個檢查點所花費的時間(ms)。 Gauge
- lastCheckpointSize 最后一個檢查點的總大?。╞ytes)。 Gauge
- lastCheckpointExternalPath 存儲最后一個外部檢查點的路徑。 Gauge
- lastCheckpointRestoreTimestamp 在協(xié)調器上恢復最后一個檢查點時的時間戳(ms)。 Gauge
- lastCheckpointAlignmentBuffered 在最后一個檢查點的所有子任務上進行對齊期間的緩沖字節(jié)數(ms)。 Gauge
- numberOfInProgressCheckpoints 進行中檢查點的數量。 Gauge
- numberOfCompletedCheckpoints 成功完成檢查點的數量。 Gauge
- numberOfFailedCheckpoints 失敗檢查點的數量。 Gauge
- totalNumberOfCheckpoints 總檢查點的數量(正在進行,已完成,失?。?。 Gauge
Task checkpointAlignmentTime 最后一次屏障對齊完成所花費的時間(nanoseconds),或當前對齊到目前為止所用的時間(nanoseconds)。 Gauge

IO

Scope Metrics Description Type
Job <SOURCE_ID>.
<source_subtask_index>.
<operator_id>.
<operator_subtask_index>.
latency
從給定源子任務到算子子任務的延遲分布(ms)。 Histogram
Task numBytesInLocal 此任務從本地源讀取的總字節(jié)數。 Counter
- numBytesInLocalPerSecond 此任務每秒從本地源讀取的字節(jié)數。 Meter
- numBytesInRemote 此任務從遠程源讀取的總字節(jié)數。 Counter
- numBytesInRemotePerSecond 此任務每秒從遠程源讀取的字節(jié)數。 Meter
- numBuffersInLocal 此任務從本地源讀取的網絡緩沖區(qū)總數。 Counter
- numBuffersInLocalPerSecond 此任務每秒從本地源讀取的網絡緩沖區(qū)數。 Meter
- numBuffersInRemote 此任務從遠程源讀取的網絡緩沖區(qū)總數。 Counter
- numBuffersInRemotePerSecond 此任務每秒從遠程源讀取的網絡緩沖區(qū)數。 Meter
- numBytesOut 此任務已發(fā)出的總字節(jié)數。 Counter
- numBytesOutPerSecond 此任務每秒發(fā)出的字節(jié)數。 Meter
- numBuffersOut 此任務已發(fā)出的網絡緩沖區(qū)總數。 Counter
- numBuffersOutPerSecond 此任務每秒發(fā)出的網絡緩沖區(qū)數。 Meter
Task/Operator numRecordsIn 此算子/任務已收到的記錄總數。 Counter
- numRecordsInPerSecond 此算子/任務每秒接收的記錄數。 Meter
- numRecordsOut 此算子/任務已發(fā)出的記錄總數。 Counter
- numRecordsOutPerSecond 此算子/任務每秒發(fā)送的記錄數。 Meter
- numLateRecordsDropped 此算子/任務因遲到而丟失的記錄數。 Counter
- currentInputWatermark 此算子/任務收到的最后一個水?。╩s)。注意:對于具有2個輸入的算子/任務,這是最后收到的水印的最小值。 Gauge
Operator currentInput1Watermark 此算子在其第一個輸入(ms)中收到的最后一個水印。注意:僅適用于具有2個輸入的算子。 Gauge
- currentInput2Watermark 此算子在其第二個輸入中接收的最后一個水?。╩s)。注意:僅適用于具有2個輸入的算子。 Gauge
- currentOutputWatermark 此算子發(fā)出的最后一個水?。╩s)。 Gauge
- numSplitsProcessed 此數據源已處理的InputSplits總數。 Gauge

Connectors

Kafka 連接器

Scope Metrics User Variables Description Type
Operator commitsSucceeded N / A 如果啟用了偏移提交并且啟用了檢查點,則成功向 Kafka 提交的偏移提交總數。 Counter
- commitsFailed N / A 如果啟用了偏移提交并且啟用了檢查點,則 Kafka 的偏移提交失敗總數。請注意,將偏移量提交回 Kafka 只是暴露消費者進度的一種方法,因此提交失敗不會影響 Flink 的檢查點分區(qū)偏移的完整性。 Counter
- committedOffsets Topic,分區(qū) 對于每個分區(qū),最后成功提交到 Kafka 的偏移量??梢酝ㄟ^主題名稱和分區(qū)ID指定特定分區(qū)的度量標準。 Gauge
- currentOffsets Topic,分區(qū) 消費者對每個分區(qū)的當前讀取偏移量??梢酝ㄟ^主題名稱和分區(qū)ID指定特定分區(qū)的度量標準。 Gauge

Kinesis 連接器

Scope Metrics User Variables Description Type
Operator millisBehindLatest stream,shardId 對于每個 Kinesis 分片,消費者在流的頭部后面的毫秒數,表示消費者當前時間落后多少??梢酝ㄟ^流名稱和分片標識指定特定分片的度量標準。值為0表示記錄處理被捕獲,此時沒有要處理的新記錄。值-1表示該度量標準尚未報告。 Gauge
- sleepTimeMillis stream,shardId 消費者在從 Kinesis 獲取記錄之前花費的毫秒數??梢酝ㄟ^流名稱和分片標識指定特定分片的度量標準。 Gauge
- maxNumberOfRecordsPerFetch stream,shardId 消費者在單個 getRecords 調用 Kinesis 時請求的最大記錄數。 Gauge
- numberOfAggregatedRecordsPerFetch stream,shardId 消費者在單個 getRecords 調用 Kinesis 時獲取的聚合 Kinesis 記錄數。 Gauge
- numberOfDeggregatedRecordsPerFetch stream,shardId 消費者在單個 getRecords 調用 Kinesis 時獲取的分解 Kinesis 記錄的數量。 Gauge
- averageRecordSizeBytes stream,shardId Kinesis 記錄的平均大?。╞ytes),由消費者在單個 getRecords 調用中獲取。 Gauge
- runLoopTimeNanos stream,shardId 消費者在運行循環(huán)中花費的實際時間(ns)。 Gauge
- loopFrequencyHz stream,shardId 一秒鐘內調用 getRecords 的次數。 Gauge
- bytesRequestedPerFetch stream,shardId 在一次調用 getRecords 中請求的字節(jié)數。 Gauge

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/monitoring/metrics.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容