Flink Metrics

主要引用官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/ops/metrics/

Flink 提供了 Metric 系統(tǒng),允許收集 Metric 并暴露給外部系統(tǒng)。

注冊 Metrics

可以通過任何繼承了 RichFunction 的函數(shù)訪問 Metric 系統(tǒng)。調用 getRuntionContext().getMetricGroup() 方法,該方法返回一個 MetricGroup 對象,可以創(chuàng)建并注冊 Metric。

Metric 類型

Counter

Counter 用來計數(shù)。當前值可以使用 inc()/inc(long n)dec()/dec(long n) 進行增減。

// 實現(xiàn) RichMapFunction 接口
public class MyMapper extends RichMapFunction<String, String> {
  private transient Counter counter;

  @Override
  public void open(Configuration config) {
    // 定義一個 Counter Metric
    this.counter = getRuntimeContext()
      .getMetricGroup()
      .counter("myCounter");
  }

  @Override
  public String map(String value) throws Exception {
    // Counter 增加 1
    this.counter.inc();
    return value;
  }
}

Gauge

Gauge 根據(jù)需要提供任何類型的值。需要先創(chuàng)建一個實現(xiàn) org.apache.flink.metrics.Gauge 的類,返回值的類形沒有限制。

Report 程序在暴露數(shù)據(jù)給外部系統(tǒng)時,會把對象轉換為字符串,這意味著需要一個有意義的 toString() 實現(xiàn)。

public class MyMapper extends RichMapFunction<String, String> {
  private transient int valueToExpose = 0;

  @Override
  public void open(Configuration config) {
    getRuntimeContext()
      .getMetricGroup()
      .gauge("MyGauge", new Gauge<Integer>() {
        // 實現(xiàn) org.apache.flink.metrics.Gauge 接口
        @Override
        public Integer getValue() {
          return valueToExpose;
        }
      });
  }

  @Override
  public String map(String value) throws Exception {
    valueToExpose++;
    return value;
  }
}

Histogram

Histogram 統(tǒng)計值的分布。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Histogram histogram;

  @Override
  public void open(Configuration config) {
    this.histogram = getRuntimeContext()
      .getMetricGroup()
      .histogram("myHistogram", new MyHistogram());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 加入一個新值
    this.histogram.update(value);
    return value;
  }
}

Flink 沒有提供 Histogram 的默認實現(xiàn),可以添加依賴使用 DropwizardHistogramWrapper 實現(xiàn)

<dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-metrics-dropwizard</artifactId>
      <version>1.13.0</version>
</dependency>

Meter

Meter 用來統(tǒng)計平均吞吐量。

public class MyMapper extends RichMapFunction<Long, Long> {
  private transient Meter meter;

  @Override
  public void open(Configuration config) {
    this.meter = getRuntimeContext()
      .getMetricGroup()
      .meter("myMeter", new MyMeter());
  }

  @Override
  public Long map(Long value) throws Exception {
    // 注冊事件
    // markEvent(long n) 可以注冊同時發(fā)生多個時間
    this.meter.markEvent();
    return value;
  }
}

同樣添加 flink-metrics-dropwizard 依賴,可以使用 DropwizardMeterWrapper 實現(xiàn)

Scope

每個 Metric 都會分配一個標識符和一組鍵值對,用來報告 Metric。

標識符基于3個組成部分:注冊時的用戶定義名稱、可選的用戶定義 Scope 和系統(tǒng)提供的 Scope。例如,如果 A.B 是系統(tǒng) Scope,C.D 是用戶 Scope,E 是名稱,那么標識符將是 A.B.C.D.E。

可以通過在 conf/flink-conf.yaml 中設置 metrics.scope.delimiter 鍵來配置用于標識符的分隔符(默認值:.)。

User Scope

定義 User Scope 的方法: 調用 MetricGroup#addGroup(String name)MetricGroup#addGroup(int name),MetricGroup#addGroup(String key, String value)。這些方法會影響 MetricGroup#getMetricIdentifierMetricGroup#getScopeComponents 的返回值。

// 創(chuàng)建 Metric 時指定 Scope
counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetrics")
  .counter("myCounter");

counter = getRuntimeContext()
  .getMetricGroup()
  .addGroup("MyMetricsKey", "MyMetricsValue")
  .counter("myCounter");

System Scope

System Scope 包含 Metric 的上下文信息,例如注冊在哪個 Task(<task_name>)或屬于哪個 Job(<job_name>)。

應該包含哪些上下文信息可以通過 conf/flink-conf.yaml 配置。

  • metrics.scope.jm

    • 默認值:<host>.jobmanager
    • JobManager 的所有 Metric
  • metrics.scope.jm.job

    • 默認值:<host>.jobmanager.<job_name>
    • JobManager 和 Job 的所有 Metric
  • metrics.scope.tm

    • 默認值:<host>.taskmanager.<tm_id>
    • TaskManager 的所有 Metric
  • metrics.scope.tm.job

    • 默認值:<host>.taskmanager.<tm_id><job_name>
    • TaskManager 和 Job 的所有 Metric
  • metrics.scope.task

    • 默認值:<host>.taskmanager.<tm_id><job_name><task_name><subtask_index>
    • Task 的所有 Metric
  • metrics.scope.operator

    • 默認值:<host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>
    • Operator 的所有 Metric

<host> | <job_name> | <tm_id> | <task_name> | <operator_name> | <subtask_index> 可以作為變量使用。變量的數(shù)量或順序沒有限制,區(qū)分大小寫。

例如:Operator Metric 的默認 Scope 格式為 <host>.taskmanager.<tm_id><job_name><operator_name><subtask_index>,生成的標識符類似 localhost.taskmanager.1234.MyJob.MyOperator.0.MyMetric 的形式;如果希望包含 Task 名稱,并且忽略 TaskManager 信息,可以設置 metrics.scope.operator: <host>.<job_name>.<task_name>.<operator_name>.<subtask_index>,生成的標識符會變成 localhost.MyJob.MySource_->_MyOperator.MyOperator.0.MyMetric。

建議添加帶有 ID 的變量(如:<job_id>)保證唯一性,避免出現(xiàn)命名沖突的問題。所有可以使用的變量:

  • JobManager: <host>

  • TaskManager: <host>, <tm_id>

  • Job: <job_id>, <job_name>

  • Task: <task_id>, <task_name>, <task_attempt_id>, <task_attempt_num>, <subtask_index>

  • Operator: <operator_id>, <operator_name>, <subtask_index>

Reporter

Flink 允許向外部系統(tǒng)報告 Metric。

通過在 conf/flink-conf.yaml 中配置一個或多個 Reporter,可以將 Metric 暴露給外部系統(tǒng)。這些 Reporter 在啟動時實例化。

  • metrics.reporter.<name>.<config>:Reporter 名稱
  • metrics.reporter.<name>.class:Reporter 實現(xiàn)類
  • metrics.reporter.<name>.factory.class:Reporter 工廠類
  • metrics.reporter.<name>.interval:Reporter 調用間隔
  • metrics.reporter.<name>.scope.delimiter:Scope 標識符的分隔符(默認使用 metrics.scope.delimiter
  • metrics.reporter.<name>.scope.variables.excludes:可選項,以 “;” 分隔的變量列表,可以忽略這些變量
  • metrics.reporters:可選項,以 “,” 分隔的 Reporter 名稱列表,表示應用哪些 Reporter,默認會包含所有配置的 Reporter。

Reporter 必須至少配置 classfactory.class 屬性(使用哪個取決于 Reporter 的實現(xiàn))。

配置 Reporter 示例

metrics.reporters: my_jmx_reporter,my_other_reporter

metrics.reporter.my_jmx_reporter.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory
metrics.reporter.my_jmx_reporter.port: 9020-9040
metrics.reporter.my_jmx_reporter.scope.variables.excludes:job_id;task_attempt_num

metrics.reporter.my_other_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.my_other_reporter.host: 192.168.1.1
metrics.reporter.my_other_reporter.port: 10000

自定義 Reporter:

  • 實現(xiàn) org.apache.flink.metrics.reporter.MetricReporter 接口
  • 如果要定時發(fā)送報告,實現(xiàn) Scheduled 接口

下面列出了一些支持的 Reporter

JMX

org.apache.flink.metrics.jmx.JMXReporter

參數(shù):

  • port - JMX 監(jiān)聽端口,建議使用范圍:9250-9260。實際端口將顯示在相關 Job 或 Task Manager 日志中。
metrics.reporter.jmx.factory.class: org.apache.flink.metrics.jmx.JMXReporterFactory 
metrics.reporter.jmx.port: 8789

通過 JMX 公開的 Metric 由一個 domain 和一組 key 屬性組成標識。domain 總是以 org.apache.flink 開始,接一個通用 metric 標識(與一般的 metric 標識不同,不受 scope 格式的影響,不包含任何變量),例如:org.apache.flink.job.task.numBytesOut。

key 屬性列表包含與給定 Metric 關聯(lián)的所有變量的值(不受 scope 格式影響)。例如:host=localhost,job_name=MyJob,task_name=MyTask

Prometheus

org.apache.flink.metrics.prometheus.PrometheusReporter

參數(shù):

  • port - Prometheus exporter 偵聽的端口,默認為 9249,建議使用范圍:9250-9260。
  • filterLabelValueCharacters - 可選項,過濾 label 值中的字符。如果啟用,不匹配 [a-zA-Z0-9:_] 的字符會被移除。默認開啟,在關閉前,確認 label 值是否符合 Premetheus 要求(Flink metric 變量都會作為 Prometheus label)。
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter

Flink Metric 類型和 Prometheus Metric 類型映射

Flink Prometheus Note
Counter Gauge Prometheus Counters 不能遞減
Gauge Gauge 只支持數(shù)值和布爾
Histogram Summary 分位數(shù)支持 .5, .75, .95, .98, .99, .999
Meter Gauge

PrometheusPushGateway

org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter

參數(shù)

Key Default Type Description
deleteOnShutdown true Boolean 在關閉時,是否刪除 PushGateway 中的 Metric。
filterLabelValueCharacters true Boolean 是否過濾 label 值中的字符。如果啟用,不匹配 [a-zA-Z0-9:_] 的字符會被移除。默認開啟,在關閉前,確認 label 值是否符合 Premetheus 要求(Flink metric 變量都會作為 Prometheus label)。
groupingKey (none) String 指定 grouping key。格式:lable_name=label_value;lable_name=label_value;
host (none) String PushGateway 服務地址
jobName (none) String 指定作業(yè),推送 metric
port -1 Integer PushGateway 服務端口
randomJobNameSuffix true Boolean 作業(yè)名稱添加隨機后綴

PrometheusPushGatewayReporter 將 Metric 推到 Pushgateway

metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: localhost
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.jobName: myJob
metrics.reporter.promgateway.randomJobNameSuffix: true
metrics.reporter.promgateway.deleteOnShutdown: false
metrics.reporter.promgateway.groupingKey: k1=v1;k2=v2
metrics.reporter.promgateway.interval: 60 SECONDS

系統(tǒng) Metrics

默認情況下,F(xiàn)link 收集的指標

CPU

CPU

Memory

Memory

Threads

Scope 中綴 Metrics 描述 類型
Job-/TaskManager Status.JVM.Threads Count 活動線程的總數(shù) Gauge

GC

GC

ClassLoader

ClassLoader

Default Shuffle Service

代替 Network/IO 部分 Metrics

Shuffle

Cluster

Cluster

Availability

如果啟用了 Reactive Mode(1.13 MVP 特性),這些 Metric(除 numRestarts)不能正常工作。

Availability

Checkpointing

如果啟用了 Reactive Mode(1.13 MVP 特性),Job Scope 的 Metric 不能正常工作。

Checkpoint

IO

IO

Connectors

Kafka Connector

Scope Metrics 變量 描述 類型
Operator commitsSucceeded n/a 成功提交到 kafka 的 offset 總數(shù)。 <br />如果啟動了 offset commit 并且開啟 checkpointing Counter
Operator commitsFailed n/a 沒有成功提交到 Kafka 的 offset 總數(shù)。 <br />如果啟動了 offset commit 并且開啟 checkpointing Counter
Operator committedOffsets topic, partition 對于每個分區(qū),最后一次成功提交到 Kafka 的offset。 <br />可以指定 topic 和 partition Gauge
Operator currentOffsets topic, partition 對于每個分區(qū),當前讀取的 offset。 <br />可以指定 topic 和 partition Gauge

HBase Connector

Scope Metrics User Variables Description Type
Operator lookupCacheHitRate n/a Lookup 緩存命中率 Gauge

延遲跟蹤

Flink 允許跟蹤在系統(tǒng)中傳輸?shù)挠涗浀难舆t。默認情況下禁用此功能。要啟用延遲跟蹤,必須在 Flink 配置(conf/flink-conf.yaml)或 ExecutionConfig 中將 latencyTrackingInterval 設置為正數(shù)。

Source 會定期(latencyTrackingInterval)發(fā)出一個特殊的記錄,稱為 LatencyMarker。記錄包含一個時間戳,該時間戳從記錄在源處發(fā)出時算起。LatencyMarker 不能超過(overtake)正常記錄,因此如果正常記錄在 Operator 前排隊,將增加標記跟蹤的延遲。

延遲監(jiān)控的粒度,分為以下3檔:

  • single:每個算子單獨統(tǒng)計延遲;

  • operator(默認值):每個下游算子都統(tǒng)計自己與 Source 算子之間的延遲;

  • subtask:每個下游算子的 sub-task 都統(tǒng)計自己與 Source 算子的 sub-task 之間的延遲。

需要注意:

  • LatencyMarker 記錄的時間戳最終是靠 System.currentTimeMillis() 方法獲取本地時間,要保證 Flink 集群內所有節(jié)點的時區(qū)、時間是同步的,可以用 NTP 等工具來配置。
  • 啟用延遲 metric 會影響集群的性能(特別是 subtask 粒度)。官方建議僅用于調試目的。

REST API Integration

Metrics 可以通過 REST API 查詢。下面列出一些可用的 Endpoint 和 JSON 返回格式。

Base URL:http://hostname:8081/jobmanager/metrics

查詢 Metric 未聚合值

  • /jobmanager/metrics
  • /taskmanagers/<taskmanagerid>/metrics
  • /jobs/<jobid>/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/<subtaskindex>

查詢 Metric 聚合值

  • /taskmanagers/metrics
  • /jobs/metrics
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics

查詢 Metric 部分值的聚合值

  • /taskmanagers/metrics?taskmanagers=A,B,C
  • /jobs/metrics?jobs=D,E,F
  • /jobs/<jobid>/vertices/<vertexid>/subtasks/metrics?subtask=1,2,3

特殊字符需要轉義(符合 URL 標準)

查看 Metric 列表

GET /jobmanager/metrics

[
  {
    "id": "metric1"
  },
  {
    "id": "metric2"
  }
]

請求特定 Metric 的值(未聚合)

GET taskmanagers/<taskmanagerid>/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "value": "34"
  },
  {
    "id": "metric2",
    "value": "2"
  }
]

請求特定 Metric 的聚合值

GET /taskmanagers/metrics?get=metric1,metric2

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
    "avg": 15,
    "sum": 45
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
    "avg": 7,
    "sum": 16
  }
]

請求特定 Metric 的特定值的聚合值

GET /taskmanagers/metrics?get=metric1,metric2&agg=min,max

[
  {
    "id": "metric1",
    "min": 1,
    "max": 34,
  },
  {
    "id": "metric2",
    "min": 2,
    "max": 14,
  }
]

Dashboard Integration

為 Task 或 Operator 收集的 Metric 也可以在儀表板中可視化。在作業(yè)的主頁面上,選擇 Metrics 選項卡。在 Graph 中選擇一個任務后,可以使用 Add Metric 下拉菜單選擇要顯示的 Metric。

  • Task metrics 列表樣式 <subtask_index>.<metric_name>
  • Operator metrics 列表樣式 <subtask_index>.<operator_name>.<metric_name>

每個 Metric 可以被可視化為一個單獨的圖形,x軸表示時間,y軸表示測量值。圖表每10秒自動更新一次。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容