主要引用官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/metrics/
Flink 提供了 Metric 系統(tǒng),允許收集 Metric 并暴露給外部系統(tǒng)。
注冊 Metrics
可以通過任何繼承了 RichFunction 的函數(shù)訪問 Metric 系統(tǒng)。調用 getRuntionContext().getMetricGroup() 方法,該方法返回一個 MetricGroup 對象,可以創(chuàng)建并注冊 Metric。
Metric 類型
Counter
Counter 用來計數(shù)。當前值可以使用 inc()/inc(long n) 或 dec()/dec(long n) 進行增減。
// 實現(xiàn) RichMapFunction 接口
public class MyMapper extends RichMapFunction<String, String> {
private transient Counter counter;
@Override
public void open(Configuration config) {
// 定義一個 Counter Metric
this.counter = getRuntimeContext()
.getMetricGroup()
.counter("myCounter");
}
@Override
public String map(String value) throws Exception {
// Counter 增加 1
this.counter.inc();
return value;
}
}
Gauge
Gauge 根據(jù)需要提供任何類型的值。需要先創(chuàng)建一個實現(xiàn) org.apache.flink.metrics.Gauge 的類,返回值的類形沒有限制。
Report 程序在暴露數(shù)據(jù)給外部系統(tǒng)時,會把對象轉換為字符串,這意味著需要一個有意義的 toString() 實現(xiàn)。
public class MyMapper extends RichMapFunction<String, String> {
private transient int valueToExpose = 0;
@Override
public void open(Configuration config) {
getRuntimeContext()
.getMetricGroup()
.gauge("MyGauge", new Gauge<Integer>() {
// 實現(xiàn) org.apache.flink.metrics.Gauge 接口
@Override
public Integer getValue() {
return valueToExpose;
}
});
}
@Override
public String map(String value) throws Exception {
valueToExpose++;
return value;
}
}
Histogram
Histogram 統(tǒng)計值的分布。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Histogram histogram;
@Override
public void open(Configuration config) {
this.histogram = getRuntimeContext()
.getMetricGroup()
.histogram("myHistogram", new MyHistogram());
}
@Override
public Long map(Long value) throws Exception {
// 加入一個新值
this.histogram.update(value);
return value;
}
}
Flink 沒有提供 Histogram 的默認實現(xiàn),可以添加依賴使用 DropwizardHistogramWrapper 實現(xiàn)
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-dropwizard</artifactId>
<version>1.13.0</version>
</dependency>
Meter
Meter 用來統(tǒng)計平均吞吐量。
public class MyMapper extends RichMapFunction<Long, Long> {
private transient Meter meter;
@Override
public void open(Configuration config) {
this.meter = getRuntimeContext()
.getMetricGroup()
.meter("myMeter", new MyMeter());
}
@Override
public Long map(Long value) throws Exception {
// 注冊事件
// markEvent(long n) 可以注冊同時發(fā)生多個時間
this.meter.markEvent();
return value;
}
}
同樣添加 flink-metrics-dropwizard 依賴,可以使用 DropwizardMeterWrapper 實現(xiàn)
Scope
每個 Metric 都會分配一個標識符和一組鍵值對,用來報告 Metric。
標識符基于3個組成部分:注冊時的用戶定義名稱、可選的用戶定義 Scope 和系統(tǒng)提供的 Scope。例如,如果 A.B 是系統(tǒng) Scope,C.D 是用戶 Scope,E 是名稱,那么標識符將是 A.B.C.D.E。
可以通過在 conf/flink-conf.yaml 中設置 metrics.scope.delimiter 鍵來配置用于標識符的分隔符(默認值:.)。
User Scope
定義 User Scope 的方法: 調用 MetricGroup#addGroup(String name),MetricGroup#addGroup(int name),MetricGroup#addGroup(String key, String value)。這些方法會影響 MetricGroup#getMetricIdentifier 和 MetricGroup#getScopeComponents 的返回值。
// 創(chuàng)建 Metric 時指定 Scope
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetrics")
.counter("myCounter");
counter = getRuntimeContext()
.getMetricGroup()
.addGroup("MyMetricsKey", "MyMetricsValue")
.counter("myCounter");
System Scope
System Scope 包含 Metric 的上下文信息,例如注冊在哪個 Task(<task_name>)或屬于哪個 Job(<job_name>)。
應該包含哪些上下文信息可以通過 conf/flink-conf.yaml 配置。
metrics.scope.jm- 默認值:
<host>.jobmanager - JobManager 的所有 Metric
- 默認值:
metrics.scope.jm.job- 默認值:
<host>.jobmanager.<job_name> - JobManager 和 Job 的所有 Metric
- 默認值:
metrics.scope.tm- 默認值:
<host>.taskmanager.<tm_id> - TaskManager 的所有 Metric
- 默認值:
metrics.scope.tm.job- 默認值:
<host>.taskmanager.<tm_id><job_name> - TaskManager 和 Job 的所有 Metric
- 默認值:
metrics.scope.task- 默認值:
<host>.taskmanager.<tm_id><job_name><task_name><subtask_index> - Task 的所有 Metric
- 默認值:
metrics.scope.operator- 默認值:
<host>.taskmanager.<tm_id><job_name><operator_name><subtask_index> - Operator 的所有 Metric
- 默認值:
<host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index> 可以作為變量使用。變量的數(shù)量或順序沒有限制,區(qū)分大小寫。
例如:Operator Metric 的默認 Scope 格式為 <host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>,生成的標識符類似 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的形式;如果希望包含 Task 名稱,并且忽略 TaskManager 信息,可以設置 metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>,生成的標識符會變成 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric。
建議添加帶有 ID 的變量(如:<job_id>)保證唯一性,避免出現(xiàn)命名沖突的問題。所有可以使用的變量:
JobManager:
<host>TaskManager:
<host>,<tm_id>Job:
<job_id>,<job_name>Task:
<task_id>,<task_name>,<task_attempt_id>,<task_attempt_num>,<subtask_index>Operator:
<operator_id>,<operator_name>,<subtask_index>
Reporter
Flink 允許向外部系統(tǒng)報告 Metric。
通過在 conf/flink-conf.yaml 中配置一個或多個 Reporter,可以將 Metric 暴露給外部系統(tǒng)。這些 Reporter 在啟動時實例化。
-
metrics.reporter.<name>.<config>:Reporter 名稱 -
metrics.reporter.<name>.class:Reporter 實現(xiàn)類 -
metrics.reporter.<name>.factory.class:Reporter 工廠類 -
metrics.reporter.<name>.interval:Reporter 調用間隔 -
metrics.reporter.<name>.scope.delimiter:Scope 標識符的分隔符(默認使用metrics.scope.delimiter) -
metrics.reporter.<name>.scope.variables.excludes:可選項,以 “;” 分隔的變量列表,可以忽略這些變量 -
metrics.reporters:可選項,以 “,” 分隔的 Reporter 名稱列表,表示應用哪些 Reporter,默認會包含所有配置的 Reporter。
Reporter 必須至少配置 class 或 factory.class 屬性(使用哪個取決于 Reporter 的實現(xiàn))。
配置 Reporter 示例
metrics.reporters: my_jmx_reporter,my_other_reporter
metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num
metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000
自定義 Reporter:
- 實現(xiàn) org.apache.flink.metrics.reporter.MetricReporter 接口
- 如果要定時發(fā)送報告,實現(xiàn) Scheduled 接口
下面列出了一些支持的 Reporter
JMX
org.apache.flink.metrics.jmx.JMXReporter
參數(shù):
- port - JMX 監(jiān)聽端口,建議使用范圍:9250-9260。實際端口將顯示在相關 Job 或 Task Manager 日志中。
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.jmx.port: 8789
通過 JMX 公開的 Metric 由一個 domain 和一組 key 屬性組成標識。domain 總是以 org.apache.flink 開始,接一個通用 metric 標識(與一般的 metric 標識不同,不受 scope 格式的影響,不包含任何變量),例如:org.apache.flink.job.task.numBytesOut。
key 屬性列表包含與給定 Metric 關聯(lián)的所有變量的值(不受 scope 格式影響)。例如:host=localhost,job_name=MyJob,task_name=MyTask。
Prometheus
org.apache.flink.metrics.prometheus.PrometheusReporter
參數(shù):
- port - Prometheus exporter 偵聽的端口,默認為 9249,建議使用范圍:9250-9260。
- filterLabelValueCharacters - 可選項,過濾 label 值中的字符。如果啟用,不匹配 [a-zA-Z0-9:_] 的字符會被移除。默認開啟,在關閉前,確認 label 值是否符合 Premetheus 要求(Flink metric 變量都會作為 Prometheus label)。
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
Flink Metric 類型和 Prometheus Metric 類型映射
| Flink | Prometheus | Note |
|---|---|---|
| Counter | Gauge | Prometheus Counters 不能遞減 |
| Gauge | Gauge | 只支持數(shù)值和布爾 |
| Histogram | Summary | 分位數(shù)支持 .5, .75, .95, .98, .99, .999 |
| Meter | Gauge |
PrometheusPushGateway
org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
參數(shù)
| Key | Default | Type | Description |
|---|---|---|---|
| deleteOnShutdown | true | Boolean | 在關閉時,是否刪除 PushGateway 中的 Metric。 |
| filterLabelValueCharacters | true | Boolean | 是否過濾 label 值中的字符。如果啟用,不匹配 [a-zA-Z0-9:_] 的字符會被移除。默認開啟,在關閉前,確認 label 值是否符合 Premetheus 要求(Flink metric 變量都會作為 Prometheus label)。 |
| groupingKey | (none) | String | 指定 grouping key。格式:lable_name=label_value;lable_name=label_value; |
| host | (none) | String | PushGateway 服務地址 |
| jobName | (none) | String | 指定作業(yè),推送 metric |
| port | -1 | Integer | PushGateway 服務端口 |
| randomJobNameSuffix | true | Boolean | 作業(yè)名稱添加隨機后綴 |
PrometheusPushGatewayReporter 將 Metric 推到 Pushgateway
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS
系統(tǒng) Metrics
默認情況下,F(xiàn)link 收集的指標
CPU

Memory

Threads
| Scope | 中綴 | Metrics | 描述 | 類型 |
|---|---|---|---|---|
| Job-/TaskManager | Status.JVM.Threads | Count | 活動線程的總數(shù) | Gauge |
GC

ClassLoader

Default Shuffle Service
代替 Network/IO 部分 Metrics

Cluster

Availability
如果啟用了 Reactive Mode(1.13 MVP 特性),這些 Metric(除 numRestarts)不能正常工作。

Checkpointing
如果啟用了 Reactive Mode(1.13 MVP 特性),Job Scope 的 Metric 不能正常工作。

IO

Connectors
Kafka Connector
| Scope | Metrics | 變量 | 描述 | 類型 |
|---|---|---|---|---|
| Operator | commitsSucceeded | n/a | 成功提交到 kafka 的 offset 總數(shù)。 <br />如果啟動了 offset commit 并且開啟 checkpointing | Counter |
| Operator | commitsFailed | n/a | 沒有成功提交到 Kafka 的 offset 總數(shù)。 <br />如果啟動了 offset commit 并且開啟 checkpointing | Counter |
| Operator | committedOffsets | topic, partition | 對于每個分區(qū),最后一次成功提交到 Kafka 的offset。 <br />可以指定 topic 和 partition | Gauge |
| Operator | currentOffsets | topic, partition | 對于每個分區(qū),當前讀取的 offset。 <br />可以指定 topic 和 partition | Gauge |
HBase Connector
| Scope | Metrics | User Variables | Description | Type |
|---|---|---|---|---|
| Operator | lookupCacheHitRate | n/a | Lookup 緩存命中率 | Gauge |
延遲跟蹤
Flink 允許跟蹤在系統(tǒng)中傳輸?shù)挠涗浀难舆t。默認情況下禁用此功能。要啟用延遲跟蹤,必須在 Flink 配置(conf/flink-conf.yaml)或 ExecutionConfig 中將 latencyTrackingInterval 設置為正數(shù)。
Source 會定期(latencyTrackingInterval)發(fā)出一個特殊的記錄,稱為 LatencyMarker。記錄包含一個時間戳,該時間戳從記錄在源處發(fā)出時算起。LatencyMarker 不能超過(overtake)正常記錄,因此如果正常記錄在 Operator 前排隊,將增加標記跟蹤的延遲。
延遲監(jiān)控的粒度,分為以下3檔:
single:每個算子單獨統(tǒng)計延遲;
operator(默認值):每個下游算子都統(tǒng)計自己與 Source 算子之間的延遲;
subtask:每個下游算子的 sub-task 都統(tǒng)計自己與 Source 算子的 sub-task 之間的延遲。
需要注意:
- LatencyMarker 記錄的時間戳最終是靠
System.currentTimeMillis()方法獲取本地時間,要保證 Flink 集群內所有節(jié)點的時區(qū)、時間是同步的,可以用 NTP 等工具來配置。 - 啟用延遲 metric 會影響集群的性能(特別是 subtask 粒度)。官方建議僅用于調試目的。
REST API Integration
Metrics 可以通過 REST API 查詢。下面列出一些可用的 Endpoint 和 JSON 返回格式。
Base URL:http://hostname:8081/jobmanager/metrics
查詢 Metric 未聚合值
/jobmanager/metrics/taskmanagers/<taskmanagerid>/metrics/jobs/<jobid>/metrics/jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>
查詢 Metric 聚合值
/taskmanagers/metrics/jobs/metrics/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics
查詢 Metric 部分值的聚合值
/taskmanagers/metrics?taskmanagers=A,B,C/jobs/metrics?jobs=D,E,F/jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3
特殊字符需要轉義(符合 URL 標準)
查看 Metric 列表
GET /jobmanager/metrics
[
{
"id": "metric1"
},
{
"id": "metric2"
}
]
請求特定 Metric 的值(未聚合)
GET taskmanagers/<taskmanagerid>/metrics?get=metric1,metric2
[
{
"id": "metric1",
"value": "34"
},
{
"id": "metric2",
"value": "2"
}
]
請求特定 Metric 的聚合值
GET /taskmanagers/metrics?get=metric1,metric2
[
{
"id": "metric1",
"min": 1,
"max": 34,
"avg": 15,
"sum": 45
},
{
"id": "metric2",
"min": 2,
"max": 14,
"avg": 7,
"sum": 16
}
]
請求特定 Metric 的特定值的聚合值
GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max
[
{
"id": "metric1",
"min": 1,
"max": 34,
},
{
"id": "metric2",
"min": 2,
"max": 14,
}
]
Dashboard Integration
為 Task 或 Operator 收集的 Metric 也可以在儀表板中可視化。在作業(yè)的主頁面上,選擇 Metrics 選項卡。在 Graph 中選擇一個任務后,可以使用 Add Metric 下拉菜單選擇要顯示的 Metric。
- Task metrics 列表樣式
<subtask_index>.<metric_name> - Operator metrics 列表樣式
<subtask_index>.<operator_name>.<metric_name>
每個 Metric 可以被可視化為一個單獨的圖形,x軸表示時間,y軸表示測量值。圖表每10秒自動更新一次。