Configuration

原文鏈接


對于單節(jié)點(diǎn)設(shè)置,F(xiàn)link已經(jīng)準(zhǔn)備就緒,不需要更改默認(rèn)配置就可以啟動(dòng)。

開箱即用的配置會(huì)使用你默認(rèn)安裝的Java。如果你想手動(dòng)覆蓋Java運(yùn)行環(huán)境,可以手動(dòng)設(shè)置環(huán)境變量JAVA_HOME或者conf/flink-conf.yaml文件中的env.java.home屬性。

本頁列出了配置一個(gè)良好的運(yùn)行(分布式的)環(huán)境,通常需要設(shè)置的最常見的選項(xiàng)。。此外,這里列出了所有可用配置參數(shù)的完整列表。

所有的配置都是在 conf/flink-conf.yaml中完成的,它期望是一個(gè)key: value格式的YAML鍵值對 的扁平集合。

系統(tǒng)和運(yùn)行腳本在啟動(dòng)時(shí)解析配置。對配置文件的更改需要重新啟動(dòng)Flink JobManager和TaskManager。

任務(wù)管理器的配置文件可以不同,F(xiàn)link不假設(shè)集群中有統(tǒng)一的機(jī)器。

通用選項(xiàng)

  • env.java.home: 使用的Java安裝路徑(默認(rèn):如果找到的話,系統(tǒng)默認(rèn)的Java安裝路徑)。如果啟動(dòng)腳本未能自動(dòng)解析java主目錄,則需要指定??梢灾付ㄖ赶蛞粋€(gè)特定的Java安裝路徑或版本。如果沒有指定此選項(xiàng),啟動(dòng)腳本還好評(píng)估$JAVA_HOME環(huán)境變量。

  • env.java.opts: 設(shè)置自定義JVM選項(xiàng)。Flink的啟動(dòng)腳本,JobManager,TaskManager和Flink的YARN客戶端關(guān)心這個(gè)值。它可以用來設(shè)置不同的垃圾收集器或者將遠(yuǎn)程調(diào)試添加到運(yùn)行Flink服務(wù)的JVM中。包含雙引號(hào)的選項(xiàng),延遲參數(shù)替換,從而允許Flink的啟動(dòng)腳本訪問變量。分別使用 env.java.opts.jobmanagerenv.java.opts.taskmanager 為JobManager和TaskManager制定選項(xiàng)。

  • env.java.opts.jobmanager: JobManager指定的JVM選項(xiàng)。JobManager-specific JVM options. 這些是除了常規(guī) env.java.opts 之外的選項(xiàng)。

  • env.java.opts.taskmanager: TaskManager 指定的JVM選項(xiàng)。 這些是除了常規(guī) env.java.opts 之外的選項(xiàng)。

  • jobmanager.rpc.address: JobManager的外部地址,它是分布式系統(tǒng)的master/協(xié)調(diào)者的地址(默認(rèn): localhost)。注意: 地址(主機(jī)名和IP)應(yīng)該能被所有的節(jié)點(diǎn)和客戶端訪問。

  • jobmanager.rpc.port: JobManager的端口號(hào)(默認(rèn):6123)。

  • jobmanager.heap.mb: JobManager的JVM堆大小(以兆字節(jié)為單位)。如果您正在運(yùn)行非常大的應(yīng)用程序(有許多操作符),或者您保存了很長的歷史,那么您可能需要增加JobManager的堆大小。

  • taskmanager.heap.mb: TaskManager的JVM堆大小,它是系統(tǒng)的并行worker。與Hadoop相比,F(xiàn)link在TaskManager內(nèi)(包括排序/哈希/緩存)運(yùn)行操作符(例如,join,aggregate)和用戶定義的函數(shù)(例如,Map,Reduce,CoGroup),因此這個(gè)值應(yīng)該盡可能大。如果集群只是運(yùn)行Flink,那么每個(gè)機(jī)器的可用內(nèi)存總量減去一些操作系統(tǒng)的內(nèi)存(可能1-2GB)是一個(gè)比較好的值。在YARN上,這個(gè)值自動(dòng)設(shè)置成TaskManager的YARN容器的大小,減去一定的公差值。

  • taskmanager.numberOfTaskSlots: 一個(gè)TaskManager能夠運(yùn)行的并行的操作符或者用戶函數(shù)實(shí)例的數(shù)量(默認(rèn):1)。如果這個(gè)值大于1,一個(gè)TaskManager將接受多個(gè)函數(shù)或操作符的實(shí)例。這樣,TaskManager能利用多個(gè)CPU核,但是同時(shí),可用內(nèi)存被劃分給不同的操作符或函數(shù)實(shí)例。這個(gè)值通常和TaskManager所在的機(jī)器的物理CPU核數(shù)成正比(例如,等于核心的數(shù)量,或者是核心數(shù)量的一半)。更多的關(guān)于slot.

  • parallelism.default: 對于沒有指定并行性的程序,默認(rèn)并行性為1。對于沒有并發(fā)作業(yè)運(yùn)行的系統(tǒng),設(shè)置這個(gè)值為NumTaskManagers * NumSlotsPerTaskManager,這樣使得系統(tǒng)使用所有可用的資源來執(zhí)行程序。注意: 默認(rèn)的并行性可以被作業(yè)重寫,通過調(diào)用ExecutionEnvironment上的 setParallelism(int parallelism)方法或者通過傳遞 -p <parallelism>到Flink命令行前端??梢酝ㄟ^調(diào)用操作符上的setParallelism(int parallelism)方法重寫單個(gè)轉(zhuǎn)換的并行度。關(guān)于并行度的更多信息見并行執(zhí)行。

  • fs.default-scheme: 使用的默認(rèn)文件系統(tǒng)的scheme,具有連接的必要權(quán)限,例如在HDFS的情況下,NameNode 的host:port(如果需要)。默認(rèn)情況下,這個(gè)值被設(shè)為 file:///,它指向本地文件系統(tǒng)。這意味著將使用本地文件系統(tǒng)來搜索用戶指定的文件,而不需要顯示scheme定義。另外一個(gè)例子,如果這個(gè)值被設(shè)為hdfs://localhost:9000/,然后用戶指定的文件路徑?jīng)]有scheme定義,例如/user/USERNAME/in.txt,將被轉(zhuǎn)換為hdfs://localhost:9000/user/USERNAME/in.txt。這個(gè)scheme僅用于在用戶提供的URI中沒有指定其它scheme(顯示的)。

  • fs.hdfs.hadoopconf: Hadoop文件系統(tǒng)(HDFS)配置目錄的絕對路徑(可選值)。指定這個(gè)值允許程序使用短URI(hdfs:///path/to/files,而不用在URI中包含NameNode的地址和端口)引用HDFS文件。如果沒有這個(gè)選項(xiàng),HDFS文件可以訪問,但需要完全限定的URI,就像hdfs://address:port/path/to/files。這個(gè)選項(xiàng)還會(huì)導(dǎo)致文件寫者獲取HDFS的塊大小和復(fù)制因子的默認(rèn)值。Flink會(huì)在指定的目錄中查找“core-site.xml”和“hdfs-site.xml”文件。

  • classloader.resolve-order: 當(dāng)加載用戶代碼類時(shí),F(xiàn)link使用child-first的 ClassLoader還是parent-first ClassLoader??梢允?code>parent-first 或 child-first中的一個(gè)值。(默認(rèn):child-first)

  • classloader.parent-first-patterns: 一個(gè)(分號(hào)分割的)模式列表,它指定哪些類應(yīng)該總是通過父 ClassLoader進(jìn)行解析。模式是對類全限定名的簡單前綴匹配。默認(rèn)情況下,它被設(shè)置為java.;org.apache.flink.;javax.annotation;org.slf4j;org.apache.log4j;org.apache.logging.log4j;ch.qos.logback。如果你想更改此設(shè)置,并且希望保持默認(rèn)行為,則必須確保在你的模式列表中包含默認(rèn)模式。

高級(jí)選項(xiàng)

計(jì)算

  • taskmanager.compute.numa: When enabled a TaskManager is started on each NUMA node for each worker listed in conf/slaves (DEFAULT: false). Note: only supported when deploying Flink as a standalone cluster.

托管內(nèi)存

默認(rèn)情況下,F(xiàn)link分配空閑內(nèi)存(通過taskmanager.heap.mb配置的總內(nèi)存減去用作網(wǎng)絡(luò)緩沖的內(nèi)存)的0.7倍作為托管內(nèi)存。托管內(nèi)存幫助Flink有效的運(yùn)行批處理操作。它阻止OutOfMemoryException發(fā)生,因?yàn)镕link知道它有多少內(nèi)存能用于執(zhí)行操作。如果Flink耗盡了托管內(nèi)存,它就利用磁盤空間。使用托管內(nèi)存,一些操作可以直接在原始數(shù)據(jù)上執(zhí)行,而不用將數(shù)據(jù)反序列化成Java對象??傊泄軆?nèi)存提高了系統(tǒng)的健壯性和速度。

托管內(nèi)存的默認(rèn)分?jǐn)?shù)可以通過taskmanager.memory.fraction參數(shù)進(jìn)行調(diào)整??梢允褂?code>taskmanager.memory.size設(shè)置絕對值(覆蓋分?jǐn)?shù)參數(shù))。如果需要,托內(nèi)內(nèi)存可以分配在JVM堆外。這可以提高具有大內(nèi)存空間情況下的性能。

  • taskmanager.memory.size: TaskManager持有的用于排序,hash表和中間結(jié)果緩存的堆內(nèi)或堆外(依賴于taskmanager.memory.off-heap)的內(nèi)存大小(MB)。如果未指定(-1),內(nèi)存管理器將使用TaskManager JVM堆大小的固定比率的內(nèi)存,該比率由taskmanager.memory.fraction指定。(默認(rèn)值:-1)

  • taskmanager.memory.fraction: 相對數(shù)量的內(nèi)存(相對于taskmanager.heap.mb,再減去網(wǎng)絡(luò)緩存使用的內(nèi)存數(shù)量之后),TaskManager使用它來進(jìn)行排序,hash表和緩存中間結(jié)果。例如,值為0.8意味著TaskManager將其內(nèi)存(堆上或者堆外依賴于taskmanager.memory.off-heap)的80%用于內(nèi)部數(shù)據(jù)緩沖,剩下的20%的內(nèi)存用于用戶定義的函數(shù)創(chuàng)建的對象。(默認(rèn):0.7) 這個(gè)參數(shù)僅用于沒有設(shè)置taskmanager.memory.size時(shí)進(jìn)行評(píng)估。

  • taskmanager.memory.off-heap: 如果設(shè)置為true,TaskManager分配用于排序,hash表和緩存中間結(jié)果的內(nèi)存位于JVM堆外。對于具有較大內(nèi)存的設(shè)置,這可以提高在內(nèi)存上執(zhí)行的操作的效率(默認(rèn)為false)。

  • taskmanager.memory.segment-size: 內(nèi)存管理和網(wǎng)絡(luò)棧使用的內(nèi)存緩沖塊字節(jié)數(shù)大小。(默認(rèn): 32768 (= 32 KB))。

  • taskmanager.memory.preallocate: 可以是truefalse。指定TaskManager是否應(yīng)該在啟動(dòng)時(shí)分配所有的托管內(nèi)存。 (默認(rèn): false)。當(dāng) taskmanager.memory.off-heap設(shè)置為true時(shí),建議這個(gè)配置也設(shè)為 true。如果這個(gè)配置設(shè)為false,那么只有配置的JVM參數(shù)MaxDirectMemorySize到達(dá)并觸發(fā)一個(gè)full GC時(shí)才會(huì)清理分配的堆外內(nèi)存。注意:對于流設(shè)置,我們強(qiáng)烈推薦設(shè)置這個(gè)值為false,因?yàn)楹诵牡臓顟B(tài)后端當(dāng)前不使用托管內(nèi)存。

內(nèi)存和性能調(diào)試

這些選項(xiàng)對于調(diào)試Flink應(yīng)用內(nèi)存和垃圾收集相關(guān)問題時(shí)非常有用,例如性能和OOM導(dǎo)致死亡或異常。

  • taskmanager.debug.memory.startLogThread: 使TaskManager定期記錄內(nèi)存和垃圾收集統(tǒng)計(jì)信息。統(tǒng)計(jì)數(shù)據(jù)包括當(dāng)前堆,堆外和其它內(nèi)存池的利用率,以及堆內(nèi)存池花費(fèi)在垃圾收集上的時(shí)間。

  • taskmanager.debug.memory.logIntervalMs: TaskManager記錄內(nèi)存和垃圾收集統(tǒng)計(jì)信息的間隔(毫秒)。只有在taskmanager.debug.memory.startLogThread設(shè)置為true時(shí)才生效。

其它

  • taskmanager.tmp.dirs: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round-robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.log.path: The config parameter defining the taskmanager log file location

  • jobmanager.web.address: Address of the JobManager’s web interface (DEFAULT: anyLocalAddress()).

  • jobmanager.web.port: Port of the JobManager’s web interface (DEFAULT: 8081).

  • jobmanager.web.tmpdir: This configuration parameter allows defining the Flink web directory to be used by the web interface. The web interface will copy its static files into the directory. Also uploaded job jars are stored in the directory if not overridden. By default, the temporary directory is used.

  • jobmanager.web.upload.dir: The config parameter defining the directory for uploading the job jars. If not specified a dynamic directory will be used under the directory specified by jobmanager.web.tmpdir.

  • fs.overwrite-files: Specifies whether file output writers should overwrite existing files by default. Set to true to overwrite by default, false otherwise. (DEFAULT: false)

  • fs.output.always-create-directory: File writers running with a parallelism larger than one create a directory for the output file path and put the different result files (one per parallel writer task) into that directory. If this option is set to true, writers with a parallelism of 1 will also create a directory and place a single result file into it. If the option is set to false, the writer will directly create the file directly at the output path, without creating a containing directory. (DEFAULT: false)

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. (DEFAULT: 0.1)

  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB)

  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB)

  • state.backend: The backend that will be used to store operator state checkpoints if checkpointing is enabled. Supported backends:

    • jobmanager: In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
    • filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
  • state.backend.fs.checkpointdir: Directory for storing checkpoints in a Flink supported filesystem. Note: State backend must be accessible from the JobManager, use file:// only for local setups.

  • state.backend.rocksdb.checkpointdir: The local directory for storing RocksDB files, or a list of directories separated by the systems directory delimiter (for example ‘:’ (colon) on Linux/Unix). (DEFAULT value is taskmanager.tmp.dirs)

  • state.checkpoints.dir: The target directory for meta data of externalized checkpoints.

  • state.checkpoints.num-retained: The number of completed checkpoint instances to retain. Having more than one allows recovery fallback to an earlier checkpoints if the latest checkpoint is corrupt. (Default: 1)

  • high-availability.zookeeper.storageDir: Required for HA. Directory for storing JobManager metadata; this is persisted in the state backend and only a pointer to this state is stored in ZooKeeper. Exactly like the checkpoint directory it must be accessible from the JobManager and a local filesystem should only be used for local deployments. Previously this key was named recovery.zookeeper.storageDir.

  • blob.storage.directory: Directory for storing blobs (such as user JARs) on the TaskManagers.

  • blob.service.cleanup.interval: Cleanup interval (in seconds) of transient blobs at server and caches as well as permanent blobs at the caches (DEFAULT: 1 hour). Whenever a job is not referenced at the cache anymore, we set a TTL for its permanent blob files and let the periodic cleanup task (executed every blob.service.cleanup.interval seconds) remove them after this TTL has passed. We do the same for transient blob files at both server and caches but immediately after accessing them, i.e. an put or get operation. This means that a blob will be retained at most <tt>2 * blob.service.cleanup.interval</tt> seconds after not being referenced anymore (permanent blobs) or their last access (transient blobs). For permanent blobs, this means that a recovery still has the chance to use existing files rather downloading them again.

  • blob.server.port: Port definition for the blob server (serving user JARs) on the TaskManagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.

  • blob.service.ssl.enabled: Flag to enable ssl for the blob client/server communication. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true).

  • restart-strategy: Default restart strategy to use in case no restart strategy has been specified for the job. The options are:

    • fixed delay strategy: fixed-delay.
    • failure rate strategy: failure-rate.
    • no restarts: none

    Default value is none unless checkpointing is enabled for the job in which case the default is fixed-delay with Integer.MAX_VALUE restart attempts and 10s delay.

  • restart-strategy.fixed-delay.attempts: Number of restart attempts, used if the default restart strategy is set to “fixed-delay”. Default value is 1, unless “fixed-delay” was activated by enabling checkpoints, in which case the default is Integer.MAX_VALUE.

  • restart-strategy.fixed-delay.delay: Delay between restart attempts, used if the default restart strategy is set to “fixed-delay”. (default: 1 s)

  • restart-strategy.failure-rate.max-failures-per-interval: Maximum number of restarts in given time interval before failing a job in “failure-rate” strategy. Default value is 1.

  • restart-strategy.failure-rate.failure-rate-interval: Time interval for measuring failure rate in “failure-rate” strategy. Default value is 1 minute.

  • restart-strategy.failure-rate.delay: Delay between restart attempts, used if the default restart strategy is set to “failure-rate”. Default value is the akka.ask.timeout.

Full Reference

HDFS

These parameters configure the default HDFS used by Flink. Setups that do not specify a HDFS configuration have to specify the full path to HDFS files (hdfs://address:port/path/to/files) Files will also be written with default HDFS parameters (block size, replication factor).

  • fs.hdfs.hadoopconf: The absolute path to the Hadoop configuration directory. The system will look for the “core-site.xml” and “hdfs-site.xml” files in that directory (DEFAULT: null).

  • fs.hdfs.hdfsdefault: The absolute path of Hadoop’s own configuration file “hdfs-default.xml” (DEFAULT: null).

  • fs.hdfs.hdfssite: The absolute path of Hadoop’s own configuration file “hdfs-site.xml” (DEFAULT: null).

JobManager & TaskManager

The following parameters configure Flink’s JobManager and TaskManagers.

  • jobmanager.rpc.address: The external address of the JobManager, which is the master/coordinator of the distributed system (DEFAULT: localhost). Note: The address (host name or IP) should be accessible by all nodes including the client.

  • jobmanager.rpc.port: The port number of the JobManager (DEFAULT: 6123).

  • taskmanager.hostname: The hostname of the network interface that the TaskManager binds to. By default, the TaskManager searches for network interfaces that can connect to the JobManager and other TaskManagers. This option can be used to define a hostname if that strategy fails for some reason. Because different TaskManagers need different values for this option, it usually is specified in an additional non-shared TaskManager-specific config file.

  • taskmanager.rpc.port: The task manager’s IPC port (DEFAULT: 0, which lets the OS choose a free port). Flink also accepts a list of ports (“50100,50101”), ranges (“50100-50200”) or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple TaskManagers are running on the same machine.

  • taskmanager.data.port: The task manager’s port used for data exchange operations (DEFAULT: 0, which lets the OS choose a free port).

  • taskmanager.data.ssl.enabled: Enable SSL support for the taskmanager data transport. This is applicable only when the global ssl flag security.ssl.enabled is set to true (DEFAULT: true)

  • jobmanager.heap.mb: JVM heap size (in megabytes) for the JobManager (DEFAULT: 256).

  • taskmanager.heap.mb: JVM heap size (in megabytes) for the TaskManagers, which are the parallel workers of the system. In contrast to Hadoop, Flink runs operators (e.g., join, aggregate) and user-defined functions (e.g., Map, Reduce, CoGroup) inside the TaskManager (including sorting/hashing/caching), so this value should be as large as possible (DEFAULT: 512). On YARN setups, this value is automatically configured to the size of the TaskManager’s YARN container, minus a certain tolerance value.

  • taskmanager.numberOfTaskSlots: The number of parallel operator or user function instances that a single TaskManager can run (DEFAULT: 1). If this value is larger than 1, a single TaskManager takes multiple instances of a function or operator. That way, the TaskManager can utilize multiple CPU cores, but at the same time, the available memory is divided between the different operator or function instances. This value is typically proportional to the number of physical CPU cores that the TaskManager’s machine has (e.g., equal to the number of cores, or half the number of cores).

  • taskmanager.tmp.dirs: The directory for temporary files, or a list of directories separated by the system’s directory delimiter (for example ‘:’ (colon) on Linux/Unix). If multiple directories are specified, then the temporary files will be distributed across the directories in a round robin fashion. The I/O manager component will spawn one reading and one writing thread per directory. A directory may be listed multiple times to have the I/O manager use multiple threads for it (for example if it is physically stored on a very fast disc or RAID) (DEFAULT: The system’s tmp dir).

  • taskmanager.network.memory.fraction: Fraction of JVM memory to use for network buffers. This determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value or the min/max values below. Also note, that taskmanager.network.memory.min and taskmanager.network.memory.max may override this fraction. (DEFAULT: 0.1)

  • taskmanager.network.memory.min: Minimum memory size for network buffers in bytes (DEFAULT: 64 MB). Previously, this was determined from taskmanager.network.numberOfBuffers and taskmanager.memory.segment-size.

  • taskmanager.network.memory.max: Maximum memory size for network buffers in bytes (DEFAULT: 1 GB). Previously, this was determined from taskmanager.network.numberOfBuffers and taskmanager.memory.segment-size.

  • taskmanager.network.numberOfBuffers (deprecated, replaced by the three parameters above): The number of buffers available to the network stack. This number determines how many streaming data exchange channels a TaskManager can have at the same time and how well buffered the channels are. If a job is rejected or you get a warning that the system has not enough buffers available, increase this value (DEFAULT: 2048). If set, it will be mapped to taskmanager.network.memory.min and taskmanager.network.memory.max based on taskmanager.memory.segment-size.

  • taskmanager.memory.size: The amount of memory (in megabytes) that the task manager reserves on the JVM’s heap space for sorting, hash tables, and caching of intermediate results. If unspecified (-1), the memory manager will take a fixed ratio of the heap memory available to the JVM, as specified by taskmanager.memory.fraction. (DEFAULT: -1)

  • taskmanager.memory.fraction: The relative amount of memory (with respect to taskmanager.heap.mb, after subtracting the amount of memory used by network buffers) that the task manager reserves for sorting, hash tables, and caching of intermediate results. For example, a value of 0.8 means that a task manager reserves 80% of its memory (on-heap or off-heap depending on taskmanager.memory.off-heap) for internal data buffers, leaving 20% of free memory for the task manager’s heap for objects created by user-defined functions. (DEFAULT: 0.7) This parameter is only evaluated, if taskmanager.memory.size is not set.

  • taskmanager.debug.memory.startLogThread: Causes the TaskManagers to periodically log memory and Garbage collection statistics. The statistics include current heap-, off-heap, and other memory pool utilization, as well as the time spent on garbage collection, by heap memory pool.

  • taskmanager.debug.memory.logIntervalMs: The interval (in milliseconds) in which the TaskManagers log the memory and garbage collection statistics. Only has an effect, if taskmanager.debug.memory.startLogThread is set to true.

  • taskmanager.maxRegistrationDuration: Defines the maximum time it can take for the TaskManager registration. If the duration is exceeded without a successful registration, then the TaskManager terminates. The max registration duration requires a time unit specifier (ms/s/min/h/d) (e.g. “10 min”). (DEFAULT: Inf)

  • taskmanager.initial-registration-pause: The initial registration pause between two consecutive registration attempts. The pause is doubled for each new registration attempt until it reaches the maximum registration pause. The initial registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 500 ms)

  • taskmanager.max-registration-pause: The maximum registration pause between two consecutive registration attempts. The max registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 30 s)

  • taskmanager.refused-registration-pause: The pause after a registration has been refused by the job manager before retrying to connect. The refused registration pause requires a time unit specifier (ms/s/min/h/d) (e.g. “5 s”). (DEFAULT: 10 s)

  • taskmanager.jvm-exit-on-oom: Indicates that the TaskManager should immediately terminate the JVM if the task thread throws an OutOfMemoryError (DEFAULT: false).

  • blob.fetch.retries: The number of retries for the TaskManager to download BLOBs (such as JAR files) from the JobManager (DEFAULT: 50).

  • blob.fetch.num-concurrent: The number concurrent BLOB fetches (such as JAR file downloads) that the JobManager serves (DEFAULT: 50).

  • blob.fetch.backlog: The maximum number of queued BLOB fetches (such as JAR file downloads) that the JobManager allows (DEFAULT: 1000).

  • task.cancellation-interval: Time interval between two successive task cancellation attempts in milliseconds (DEFAULT: 30000).

  • taskmanager.exit-on-fatal-akka-error: Whether the TaskManager shall be terminated in case of a fatal Akka error (quarantining event). (DEFAULT: false)

  • jobmanager.tdd.offload.minsize: Maximum size of the TaskDeploymentDescriptor’s serialized task and job information to still transmit them via RPC. Larger blobs may be offloaded to the BLOB server. (DEFAULT: 1 KiB).

High Availability (HA)

  • high-availability: Defines the high availability mode used for the cluster execution. Currently, Flink supports the following modes:
    • none (default): No high availability. A single JobManager runs and no JobManager state is checkpointed.
    • zookeeper: Supports the execution of multiple JobManagers and JobManager state checkpointing. Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution. In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state. In order to use the ‘zookeeper’ mode, it is mandatory to also define the high-availability.zookeeper.quorum configuration value.
  • high-availability.cluster-id: (Default /default_ns in standalone cluster mode, or the <yarn-application-id>under YARN) Defines the subdirectory under the root dir where the ZooKeeper HA mode will create znodes. This allows to isolate multiple applications on the same ZooKeeper. Previously this key was named recovery.zookeeper.path.namespace and high-availability.zookeeper.path.namespace.</yarn-application-id>

Previously this key was named recovery.mode and the default value was standalone.

ZooKeeper-based HA Mode

  • high-availability.zookeeper.quorum: Defines the ZooKeeper quorum URL which is used to connect to the ZooKeeper cluster when the ‘zookeeper’ HA mode is selected. Previously this key was named recovery.zookeeper.quorum.

  • high-availability.zookeeper.path.root: (Default /flink) Defines the root dir under which the ZooKeeper HA mode will create namespace directories. Previously this ket was named recovery.zookeeper.path.root.

  • high-availability.zookeeper.path.latch: (Default /leaderlatch) Defines the znode of the leader latch which is used to elect the leader. Previously this key was named recovery.zookeeper.path.latch.

  • high-availability.zookeeper.path.leader: (Default /leader) Defines the znode of the leader which contains the URL to the leader and the current leader session ID. Previously this key was named recovery.zookeeper.path.leader.

  • high-availability.storageDir: Defines the directory in the state backend where the JobManager metadata will be stored (ZooKeeper only keeps pointers to it). Required for HA. Previously this key was named recovery.zookeeper.storageDir and high-availability.zookeeper.storageDir.

  • high-availability.zookeeper.client.session-timeout: (Default 60000) Defines the session timeout for the ZooKeeper session in ms. Previously this key was named recovery.zookeeper.client.session-timeout

  • high-availability.zookeeper.client.connection-timeout: (Default 15000) Defines the connection timeout for ZooKeeper in ms. Previously this key was named recovery.zookeeper.client.connection-timeout.

  • high-availability.zookeeper.client.retry-wait: (Default 5000) Defines the pause between consecutive retries in ms. Previously this key was named recovery.zookeeper.client.retry-wait.

  • high-availability.zookeeper.client.max-retry-attempts: (Default 3) Defines the number of connection retries before the client gives up. Previously this key was named recovery.zookeeper.client.max-retry-attempts.

  • high-availability.job.delay: (Default akka.ask.timeout) Defines the delay before persisted jobs are recovered in case of a master recovery situation. Previously this key was named recovery.job.delay.

  • high-availability.zookeeper.client.acl: (Default open) Defines the ACL (open|creator) to be configured on ZK node. The configuration value can be set to “creator” if the ZooKeeper server configuration has the “authProvider” property mapped to use SASLAuthenticationProvider and the cluster is configured to run in secure mode (Kerberos). The ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes

ZooKeeper Security

  • zookeeper.sasl.disable: (Default: true) Defines if SASL based authentication needs to be enabled or disabled. The configuration value can be set to “true” if ZooKeeper cluster is running in secure mode (Kerberos).

  • zookeeper.sasl.service-name: (Default: zookeeper) If the ZooKeeper server is configured with a different service name (default:”zookeeper”) then it can be supplied using this configuration. A mismatch in service name between client and server configuration will cause the authentication to fail.

Environment

  • env.log.dir: (Defaults to the log directory under Flink’s home) Defines the directory where the Flink logs are saved. It has to be an absolute path.

  • env.log.max: (Default: 5) The maximum number of old log files to keep.

  • env.ssh.opts: Additional command line options passed to SSH clients when starting or stopping JobManager, TaskManager, and Zookeeper services (start-cluster.sh, stop-cluster.sh, start-zookeeper-quorum.sh, stop-zookeeper-quorum.sh).

Configuring TaskManager processing slots

Flink executes a program in parallel by splitting it into subtasks and scheduling these subtasks to processing slots.

Each Flink TaskManager provides processing slots in the cluster. The number of slots is typically proportional to the number of available CPU cores of each TaskManager. As a general recommendation, the number of available CPU cores is a good default for taskmanager.numberOfTaskSlots.

When starting a Flink application, users can supply the default number of slots to use for that job. The command line value therefore is called -p (for parallelism). In addition, it is possible to set the number of slots in the programming APIs for the whole application and for individual operators.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容