Flink最佳實踐

解析命令行參數(shù)和在Flink應用程序中傳遞參數(shù)

幾乎所有的Flink應用程序,包括批處理和流處理,都依賴于外部配置參數(shù),這些參數(shù)被用來指定輸入和輸出源(如路徑或者地址),系統(tǒng)參數(shù)(并發(fā)數(shù),運行時配置)和應用程序的可配參數(shù)(通常用在自定義函數(shù)中)。

Flink提供了一個簡單的叫做ParameterToolutility,ParameterTool提供了一些基礎的工具來解決這些問題,當然你也可以不用這里所有描述的ParameterTool,其他框架如:Commons CLIargparse4j在Flink中也是支持的。

獲取你的配置值并傳入ParameterTool中

ParameterTool提供了一系列預定義的靜態(tài)方法來讀取配置信息,ParameterTool內(nèi)部是一個Map<String, String>,所以很容易與你自己的配置形式相集成。

從.properties文件中獲取

下面方法將去讀取一個Properties文件,并返回若干key/value對:

String propertiesFile = "/home/sam/flink/myjob.properties";
ParameterTool parameter = ParameterTool.fromPropertiesFile(propertiesFile);
從命令行參數(shù)中獲取

下面會從命令行中獲取像--input hdfs:///mydata --elements 42這種形式的參數(shù):

public static void main(String[] args) {
    ParameterTool parameter = ParameterTool.fromArgs(args);
    // .. regular code ..
從系統(tǒng)屬性中獲取

當啟動一個JVM時,你可以給它傳遞一些系統(tǒng)屬性如:-Dinput=hdfs:///mydata,你也可以用這些系統(tǒng)屬性來初始化ParameterTool:

ParameterTool parameter = ParameterTool.fromSystemProperties();

在程序中使用ParameterTool的參數(shù)

既然我們已經(jīng)從其他地方(方法如上)拿到了配置參數(shù),我們就可以以各種形式來使用它們了。

直接從ParameterTool中獲取

ParameterTool本身有方法來獲取這些值:

ParameterTool parameters = // ...
parameter.getRequired("input");
parameter.get("output", "myDefaultValue");
parameter.getLong("expectedCount", -1L);
parameter.getNumberOfParameters()
// .. there are more methods available.

你可以在提交應用程序的客戶端main()方法中直接使用這些方法返回的值,例如:你可以按如下方法來設置一個算子的并發(fā)度:

ParameterTool parameters = ParameterTool.fromArgs(args);
int parallelism = parameters.get("mapParallelism", 2);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).setParallelism(parallelism);

因為ParameterTool是可序列化的,所以你可以將它傳遞給函數(shù)本身;

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer(parameters));

之后在函數(shù)內(nèi)部使用ParameterTool來獲取命令行參數(shù)。

將參數(shù)以Configuration對象的形式傳遞給函數(shù)

下面的例子展示了如何將參數(shù)以Configuration對象的形式傳遞給用戶自定義函數(shù)。

ParameterTool parameters = ParameterTool.fromArgs(args);
DataSet<Tuple2<String, Integer>> counts = text
        .flatMap(new Tokenizer()).withParameters(parameters.getConfiguration())

在Tokenizer內(nèi)部,Configuration對象可以通過open(Configuration conf)方法來獲?。?/p>

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
    @Override
    public void open(Configuration parameters) throws Exception {
  parameters.getInteger("myInt", -1);
  // .. do
注冊全局參數(shù)

在ExecutionConfig中注冊為全作業(yè)參數(shù)的參數(shù),可以被JobManager的web端以及用戶自定義函數(shù)中以配置值的形式訪問.
注冊全局參數(shù):

ParameterTool parameters = ParameterTool.fromArgs(args);

// 創(chuàng)建一個執(zhí)行環(huán)境
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameters);

在用戶自定義的富函數(shù)中獲取它們:

public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
  ParameterTool parameters = (ParameterTool)
      getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
  parameters.getRequired("input");
  // .. do more ..

命名大型的TupleX類型

對于有很多字段的數(shù)據(jù)類型我們建議采用POJOs(普通Java對象),而不是TupleX;同時,POJOs還可以用來為大型的Tuple命名。、
例如:
不使用:Tuple11<String, String, ..., String> var = new ...;
因為繼承大型Tuple類來創(chuàng)建一個自定義的類型會比直接使用大型Tuple簡單得多:

CustomType var = new ...;

public static class CustomType extends Tuple11<String, String, ..., String> {
    // constructor matching super
}

使用Logback而不是Log4j

注意:本文檔適用于Flink 0.10之后的版本
Apache Flink使用slf4j來作為logging 抽象,也建議用戶在他們自定義的方法中也使用slf4j。Slf4j是一個編譯時的logging接口,在運行時可以使用不同的logging實現(xiàn),例如:log4j或者Logback。
默認情況下Flink依賴Log4j,本也描述了如何在Flink中使用Logback。有用戶反饋它們通過本指南,可以使用Graylog來建立集中式日志收集。
使用下面的代碼來獲取一個logger實例:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyClass implements MapFunction {
    private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
    // ...
在IDE之外運行Flink時使用Logback或者在一個Java應用程序中使用Logback

在所有情況下類會在一個由依賴管理器如Maven創(chuàng)建的classpath中執(zhí)行,F(xiàn)link會將log4j推到classpath中。
因此,你需要將log4j從Flink的依賴中剔除,下面的描述假定有一個跟Maven工程:
按如下方式來修改你的工程:

<dependencies>
  <!-- Add the two required logback dependencies -->
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-core</artifactId>
    <version>1.1.3</version>
  </dependency>
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.1.3</version>
  </dependency>

  <!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
   Hadoop is logging to log4j! -->
  <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>log4j-over-slf4j</artifactId>
    <version>1.7.7</version>
  </dependency>

  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.10</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
  <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.10</artifactId>
    <version>1.3.0</version>
    <exclusions>
      <exclusion>
        <groupId>log4j</groupId>
        <artifactId>*</artifactId>
      </exclusion>
      <exclusion>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
      </exclusion>
    </exclusions>
  </dependency>
</dependencies>

在<dependencies>部分進行如下修改;
  1、將所有的log4j依賴從所有的Flink dependencies中剔除:這會導致Maven忽略Flink對log4j的傳遞依賴。
  2、將slf4j-log4j12 artifact從Flink依賴中剔除:因為我們將用slf4j到logback的綁定,所以我們需要刪除slf4j到log4j的綁定。
  3、添加Logback依賴:logback-core和logback-classic
  4、添加log4j-over-slf4j依賴:log4j-over-slf4j是一種允許舊應用程序直接使用Log4j API來調(diào)用Slf4j接口的工具。Flink依賴Hadoop,而Hadoop是使用Log4j來記錄日志的

請注意:你需要手動添加exclusion到所有你添加到pom文件中的Flink依賴。
你可能還需要檢查一下其他非Flink依賴是否也是log4j的綁定,你可以使用mvn dependency:true來分析你的工程依賴。

當Flink運行在集群中是使用Logback

本指南適用于當Flink運行在YARN或者standalong集群時。

為了在Flink中使用Logback而不是Log4j,你需要將 log4j-1.2.xx.jar 和 sfl4j-log4j12-xxx.jar從 lib/目錄中刪除
接下來,你需要將下面的jar文件添加到 lib/目錄下:
  logback-classic.jar
  logback-core.jar
  log4j-over-slf4j.jar

注意:當使用單任務的YARN集群時,你需要明確的設置 lib/目錄!

將自定義logger的Flink提交到YARN中的命令是:./bin/flink run -yt $FLINK_HOME/lib <... remaining arguments ...>

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評論 19 139
  • 歷史 log4j可以當之無愧地說是Java日志框架的元老,1999年發(fā)布首個版本,2012年發(fā)布最后一個版本,20...
    kelgon閱讀 10,288評論 3 53
  • Spring Boot 參考指南 介紹 轉載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,253評論 6 342
  • 概述 在項目開發(fā)中,為了跟蹤代碼的運行情況,常常要使用日志來記錄信息。在Java世界,有很多的日志工具庫來實現(xiàn)日志...
    靜默虛空閱讀 1,975評論 1 9
  • spring官方文檔:http://docs.spring.io/spring/docs/current/spri...
    牛馬風情閱讀 1,842評論 0 3

友情鏈接更多精彩內(nèi)容