Flink高可用集群搭建

部署flink的準(zhǔn)備工作

  • flink版本的選擇需要考慮hadoop的版本,本集群hadoop的版本為2.7.4,flink的版本為flink-1.7.2-bin-hadoop27-scala_2.11.tgz
  • flink高可用集群需要依賴zookeeper

開始安裝

1. 編輯flink的配置文件 flink-conf.yaml

# 這里選擇配置主節(jié)點(diǎn)
jobmanager.rpc.address: node1

# The RPC port where the JobManager is reachable.

jobmanager.rpc.port: 6123


# The heap size for the JobManager JVM
# jobmanager內(nèi)存大小
jobmanager.heap.size: 8192m


# The heap size for the TaskManager JVM
# taskmanager內(nèi)存大小
taskmanager.heap.size: 8192m


# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
# taskmanager虛擬核數(shù) 這里但虛擬機(jī)為6 vcores
taskmanager.numberOfTaskSlots: 6

# The parallelism used for programs that did not specify and other parallelism.
# 集群總核數(shù) 五節(jié)點(diǎn)
parallelism.default: 30

# The default file system scheme and authority.
# 
# By default file paths without scheme are interpreted relative to the local
# root file system 'file:///'. Use this to override the default and interpret
# relative paths relative to a different file system,
# for example 'hdfs://mynamenode:12345'
#
# fs.default-scheme

#==============================================================================
# High Availability
#==============================================================================

# The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
# 高可用方式 zoookeeper
high-availability: zookeeper

# The path where metadata for master recovery is persisted. While ZooKeeper stores
# the small ground truth for checkpoint and leader election, this location stores
# the larger objects, like persisted dataflow graphs.
# 
# Must be a durable file system that is accessible from all nodes
# (like HDFS, S3, Ceph, nfs, ...) 
# 高可用集群數(shù)據(jù)存儲文件夾
high-availability.storageDir: hdfs://leo/flink/ha/
# yarn 應(yīng)用最大嘗試次數(shù)
yarn.application-attempts: 10

# The list of ZooKeeper quorum peers that coordinate the high-availability
# setup. This must be a list of the form:
# "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
# 高可用集群的zookeeper地址
high-availability.zookeeper.quorum: node3:2181,node4:2181,node5:2181
# flink zookeeper 根目錄
high-availability.zookeeper.path.root: /flink

# ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
# It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
# The default value is "open" and it can be changed to "creator" if ZK security is enabled
#
# high-availability.zookeeper.client.acl: open

#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================

# The backend that will be used to store operator state checkpoints if
# checkpointing is enabled.
#
# Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
# <class-name-of-factory>.
#
state.backend: filesystem

# Directory for checkpoints filesystem, when using any of the default bundled
# state backends.
# checkpoints hdfs路徑
state.checkpoints.dir: hdfs://leo/flink-checkpoints

# Default target directory for savepoints, optional.
# 
state.savepoints.dir: hdfs://leo/flink-checkpoints

# Flag to enable/disable incremental checkpoints for backends that
# support incremental checkpoints (like the RocksDB state backend). 
#
# state.backend.incremental: false

#==============================================================================
# Web Frontend
#==============================================================================

# The address under which the web-based runtime monitor listens.
#
#web.address: 0.0.0.0

# The port under which the web-based runtime monitor listens.
# A value of -1 deactivates the web server.
# web ui端口
rest.port: 8081

# Flag to specify whether job submission is enabled from the web-based
# runtime monitor. Uncomment to disable.

web.submit.enable: true

#==============================================================================
# Advanced
#==============================================================================

# Override the directories for temporary files. If not specified, the
# system-specific Java temporary directory (java.io.tmpdir property) is taken.
#
# For framework setups on Yarn or Mesos, Flink will automatically pick up the
# containers' temp directories without any need for configuration.
#
# Add a delimited list for multiple directories, using the system directory
# delimiter (colon ':' on unix) or a comma, e.g.:
#     /data1/tmp:/data2/tmp:/data3/tmp
#
# Note: Each directory entry is read from and written to by a different I/O
# thread. You can include the same directory multiple times in order to create
# multiple I/O threads against that directory. This is for example relevant for
# high-throughput RAIDs.
#
# io.tmp.dirs: /tmp

# Specify whether TaskManager's managed memory should be allocated when starting
# up (true) or when memory is requested.
#
# We recommend to set this value to 'true' only in setups for pure batch
# processing (DataSet API). Streaming setups currently do not use the TaskManager's
# managed memory: The 'rocksdb' state backend uses RocksDB's own memory management,
# while the 'memory' and 'filesystem' backends explicitly keep data as objects
# to save on serialization cost.
#
# taskmanager.memory.preallocate: false

# The classloading resolve order. Possible values are 'child-first' (Flink's default)
# and 'parent-first' (Java's default).
#
# Child first classloading allows users to use different dependency/library
# versions in their application than those in the classpath. Switching back
# to 'parent-first' may help with debugging dependency issues.
#
# classloader.resolve-order: child-first

# The amount of memory going to the network stack. These numbers usually need 
# no tuning. Adjusting them may be necessary in case of an "Insufficient number
# of network buffers" error. The default min is 64MB, teh default max is 1GB.
# 
# taskmanager.network.memory.fraction: 0.1
# taskmanager.network.memory.min: 64mb
# taskmanager.network.memory.max: 1gb

#==============================================================================
# Flink Cluster Security Configuration
#==============================================================================

# Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
# may be enabled in four steps:
# 1. configure the local krb5.conf file
# 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
# 3. make the credentials available to various JAAS login contexts
# 4. configure the connector to use JAAS/SASL

# The below configure how Kerberos credentials are provided. A keytab will be used instead of
# a ticket cache if the keytab path and principal are set.

# security.kerberos.login.use-ticket-cache: true
# security.kerberos.login.keytab: /path/to/kerberos/keytab
# security.kerberos.login.principal: flink-user

# The configuration below defines which JAAS login contexts

# security.kerberos.login.contexts: Client,KafkaClient

#==============================================================================
# ZK Security Configuration
#==============================================================================

# Below configurations are applicable if ZK ensemble is configured for security

# Override below configuration to provide custom ZK service name if configured
# zookeeper.sasl.service-name: zookeeper

# The configuration below must match one of the values set in "security.kerberos.login.contexts"
# zookeeper.sasl.login-context-name: Client

#==============================================================================
# HistoryServer
#==============================================================================

# The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)

# Directory to upload completed jobs to. Add this directory to the list of
# monitored directories of the HistoryServer as well (see below).
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/

# The address under which the web-based HistoryServer listens.
#historyserver.web.address: 0.0.0.0

# The port under which the web-based HistoryServer listens.
historyserver.web.port: 8082

# Comma separated list of directories to monitor for completed jobs.
#historyserver.archive.fs.dir: hdfs:///completed-jobs/

# Interval in milliseconds for refreshing the monitored directories.
#historyserver.archive.fs.refresh-interval: 10000

2. 編輯 bin/yarn-session.sh

export HADOOP_USER_NAME=hadoop

JVM_ARGS="$JVM_ARGS -Xmx512m"

備注:為了防止發(fā)生權(quán)限錯誤

3. 編輯masters文件

node1:8081
node2:8082

4. 編輯slaves文件

node1
node2
node3
node4
node5

5. 編輯zoo.cfg文件,末位追加如下內(nèi)容

# ZooKeeper quorum peers
#server.1=localhost:2888:3888
# server.2=host:peer-port:leader-port
server.1=node3:2888:3888
server.2=node4:2888:3888
server.3=node5:2888:3888

6. flink日志配置

Flink默認(rèn)包含兩種配置方式:log4j以及l(fā)ogback

不配置的情況下運(yùn)行flink集群或者運(yùn)行flink job會提示建議移除其中一種。

直接移除或者重命名都可行。

例如:mv logback.xml logback.xml_bak

7. 快速開始

  • Standalone模式
cd /home/hadoop/flink-1.7.2/

[hadoop@node1 flink-1.7.2]$ cd /home/hadoop/flink-1.7.2/
[hadoop@node1 flink-1.7.2]$ ./bin/start-cluster.sh 
Starting HA cluster with 2 masters.
Starting standalonesession daemon on host node1.
Starting standalonesession daemon on host node2.
Starting taskexecutor daemon on host node1.
Starting taskexecutor daemon on host node2.
Starting taskexecutor daemon on host node3.
Starting taskexecutor daemon on host node4.
Starting taskexecutor daemon on host node5.

瀏覽器中訪問node1:8081

flink
[hadoop@node1 flink-1.7.2]$ ./bin/stop-cluster.sh 
Stopping taskexecutor daemon (pid: 12133) on host node1.
Stopping taskexecutor daemon (pid: 8607) on host node2.
Stopping taskexecutor daemon (pid: 31771) on host node3.
Stopping taskexecutor daemon (pid: 32408) on host node4.
Stopping taskexecutor daemon (pid: 18846) on host node5.
Stopping standalonesession daemon (pid: 11640) on host node1.
Stopping standalonesession daemon (pid: 8140) on host node2.
  • Flink On Yarn模式
flink on yarn

[1] 第一種方式:yarn-session.sh(開辟資源)+flink run(提交任務(wù))

啟動一個一直運(yùn)行的flink集群

# 下面的命令會申請5個taskmanager,每個2G內(nèi)存和2個solt,超過集群總資源將會啟動失敗。
./bin/yarn-session.sh -n 5 -tm 2048 -s 2 --nm leo-flink -d

-n ,--container <arg> 分配多少個yarn容器(=taskmanager的數(shù)量)

-D <arg> 動態(tài)屬性

-d, --detached 獨(dú)立運(yùn)行

-jm,--jobManagerMemory <arg> JobManager的內(nèi)存 [in MB]

-nm,--name 在YARN上為一個自定義的應(yīng)用設(shè)置一個名字

-q,--query 顯示yarn中可用的資源 (內(nèi)存, cpu核數(shù))

-qu,--queue <arg> 指定YARN隊列.

-s,--slots <arg> 每個TaskManager使用的slots(vcore)數(shù)量

-tm,--taskManagerMemory <arg> 每個TaskManager的內(nèi)存 [in MB]

-z,--zookeeperNamespace <arg> 針對HA模式在zookeeper上創(chuàng)建NameSpace

請注意:

<p style="color:red">
請注意:client必須要設(shè)置YARN_CONF_DIR或者HADOOP_CONF_DIR環(huán)境變量,通過這個環(huán)境變量來讀取YARN和HDFS的配置信息,否則啟動會失敗。
經(jīng)實(shí)驗發(fā)現(xiàn),其實(shí)如果配置的有HADOOP_HOME環(huán)境變量的話也是可以的(只是會出現(xiàn)警告)。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一個即可。
</p>

運(yùn)行結(jié)果如圖:

yarn-flink

瀏覽器中訪問 http://node4:45559

yarn-flink

yarn web-ui中

yarn-flink

<p style="color:red">
部署長期運(yùn)行的flink on yarn實(shí)例后,在flink web上看到的TaskManager以及Slots都為0。只有在提交任務(wù)的時候,才會依據(jù)分配資源給對應(yīng)的任務(wù)執(zhí)行。</p>

提交Job到長期運(yùn)行的flink on yarn實(shí)例上:

./bin/flink run ./examples/batch/WordCount.jar -input hdfs://leo/test/test.txt -output hdfs://leo/flink-word-count

通過web ui可以看到已經(jīng)運(yùn)行完成的任務(wù):

task

[2] 第二種方式:flink run -m yarn-cluster(開辟資源+提交任務(wù))

 ./bin/flink run -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024   ./examples/batch/WordCount.jar -input hdfs://leo/test/test.txt -output hdfs://leo/test/flink-word-count2.txt

yarn web ui上查看剛剛提交的任務(wù)已經(jīng)執(zhí)行成功

task

文末小節(jié)

hadoop、hbase、hive、spark、kafaka、flink的開發(fā)環(huán)境集群搭建已經(jīng)成功完成。里面或許有不足之處,或有理解不到位的地方,歡迎指正。以下系列將由基礎(chǔ)到進(jìn)階,記錄這些組件的實(shí)際使用。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容