關于Flink
Flink架構
Flink是一種流式計算框架,與Spark的“微批”設計理念不同,F(xiàn)link則將數(shù)據(jù)看作無限的和有限的數(shù)據(jù)流,支持對數(shù)據(jù)流進行逐條或者窗口式處理,從而保證數(shù)據(jù)處理延遲可以達到毫秒級。同時基于有限流的設想,F(xiàn)link還可以實現(xiàn)數(shù)據(jù)的批量處理,實現(xiàn)了流批一體。可以說完全基于數(shù)據(jù)流的設計理念,一舉將Spark打的體無完膚,從而可以說目前Flink成為各大廠商的流式數(shù)據(jù)處理的標配。
Flink集群采用的是主從架構,JobManager作為集群的主節(jié)點,負責集群任務調度以及資源管理的任務。TaskManager作為集群的從節(jié)點,負責接受JobManager分派的執(zhí)行任務,為集群提供計算資源。
Flink提供了多種部署模式,包括Standalone、YARN、Mesos、K8S、Docker等。針對不同的部署方式,F(xiàn)link集群的運行時態(tài)架構不太一樣,但是核心部署架構還是保持一致的,即JobManager+TaskManager的組合。
與YARN一致,為了解決主節(jié)點的單點問題,F(xiàn)link也需要解決JobManager的單點問題。針對Standalone的部署模式,需要部署多個JobManager節(jié)點通過主備模式實現(xiàn)高可用;如果是將Flink集群托管給YARN進行管理,則可以借助YARN集群的AM恢復機制來實現(xiàn)JobManager的高可用。
Flink on YARN
Flink可以運行在YARN集群中,也是基于Job Manager+TaskManager來對外提供服務。在YARN中,JobManager將內嵌在Flink應用的AM中,負責處理client端提交的Flink JOB。而TaskManager將運行在Yarn Container中,提供計算能力。
Flink在YARN集群上支持三種部署模式:Session模式、Per-job模式以及Application模式。
Session模式
Session模式的原理與Standalone模式相似,只不過是將Flink集群運行在YARN集群提供的Container中。在這種模式下,F(xiàn)link將基于client的請求動態(tài)進行TaskManager的申請和回收。
這種模式的優(yōu)勢在于與standalone模式保持了一致的體驗,同時Yarn集群又提供了Container節(jié)點的自動恢復能力,即TaskManager進程掛掉之后,AM(JobManager)能夠自動把它拉起來。劣勢也是standalone的劣勢,就是會產生資源競爭,所有任務也都在一個Flink集群實例當中,日志混淆在了一起,不太便于日志排查和分析。
啟動yarn-session
yarn-session.sh -jm <jm-memory> -tm <tm-memory> -s <slots-per-taskmanager> -z <zk-namespace> -nm <app-name> -d
提交Job
在yarn-session模式下,提交job的方式與standalone模式一致。只不過區(qū)別在于,遵循的是yarn-cluster模式的命令參數(shù)組合,即可以通過-yD進行參數(shù)傳遞。
flink run -yD <xxx>=<xxx> -c <classname> -d <jar-file> <arguments>
停止job
與standalone模式一致,可以通過以下命令:
flink cancel <job-id>
停止yarn-session
兩種方式:
- 命令退出:
echo "stop" | ./bin/yarn-session.sh -id <appId> - Attach方式:
yarn-session.sh -id <appId>然后通過ctrl+C來退出
高可用配置
high-availability: zookeeper
high-availability.zookeeper.quorum: localhost:2181
high-availability.storageDir: hdfs:///flink/recovery
high-availability.zookeeper.path.root: /flink
yarn.application-attempts: 10
探秘
經(jīng)過研讀源碼,在啟動yarn-session時,系統(tǒng)首先會檢測是否在${yarn.properties-file.location}文件夾下存在yarn配置文件,在配置文件中存放的是flink應用的applicationId信息。
這樣我們在執(zhí)行 flink run命令后,就可以通過appId連接到Flink集群,從而將應用提交到y(tǒng)arn-session的flink 集群中。
Per-Job 模式
Per-Job 模式是指每個Flink Job都是一組獨立集群,即有自己的JobManager和TaskManager。提交任務后,YARN首先會為該任務分派一個AM容器,該容器內會運行一個JobManager進程,之后JobManager會向Yarn申請運行TaskManager所需要的container,container的數(shù)量和container的配置(CPU、內存)會基于客戶端的需求來確定,當JobManager和TaskManager進程都拉起來之后,則會執(zhí)行相應的Flink Job。這樣,與Standalone以及yarn-session不同的是,我們不需要準備一個常駐的Flink 集群進程,只需要保證本地有一個Flink環(huán)境即可,F(xiàn)link集群是在我們提交Job時動態(tài)創(chuàng)建出來的。
這種方式的優(yōu)勢在于每個任務獨立運行,相互不會收到干擾,這個不干擾還包括了運行日志也是隔離的。另外,借助YARN集群的能力,提供了對Flink Job的全方位保障,包括JobManager的高可用,TaskManager的恢復,然后再結合Flink自身提供的健壯性,包括檢查點、保存點機制,從而能夠很好的保障Flink Job的高可用和健壯性。劣勢的話,就是保證了資源隔離的同時也占用了更多的資源,因為每個Job都需要一個JobManager,每個JobManager都會消耗一個AM進程資源。
提交Job
flink run -d -m yarn-cluster -ys <slots-per-taskmanager> -yD yarn.containers.vcores=2 -yD <xxx>=<xxx> -ynm <app-name> -ytm <tm-memory> -yjm <jm-memory> -c <class> <jar-file> <arguments>
其中 -m yarn-cluster 為固定寫法【針對1.11版本】,通過閱讀源碼,F(xiàn)link內部時通過提取-m中是否存在yarn-cluster標識來確定是否時per-job模式的。
停止Job
flink list -yid <appId>
flink cancel <jobId> -yid <appId>
探秘
之前一直好奇都是執(zhí)行flink run命令,flink是如何區(qū)分是standalone、yarn-session還是per-job的,研讀源碼之后發(fā)現(xiàn),相關的判斷邏輯主要是在flink-client的工程中做的,首先會篩選應用Client進行Flink Job的啟動,包括 GenericCLI、FlinkYarnSessionCli、DefaultCLI。如果在啟動命令中包含 -m yarn-cluster 參數(shù)的話,則FlinkYarnSessionCli將被啟用,從而順利的開始執(zhí)行后續(xù)的Per-job模式的啟動流程。
Application模式
application模式與per-job模式類似,也是每個Job一個獨立的Flink集群。與per-job的區(qū)別在于Flink Job的main函數(shù)的執(zhí)行位置。Per-job的執(zhí)行位置實在提交任務的客戶端所在機器,需要進行JobGragh的生成與提交;而Application模式是在JobManaer側執(zhí)行,也就是在Yarn的AM中執(zhí)行的,這樣就可以釋放clien側的資源占用。另外,application模式可以從HDFS加載flink job相關jar文件,這樣可以避免相關Jar文件的重復加載。
這種模式的優(yōu)勢的話與per-job是一致的,而且相比per-job還提供了遠端啟動job、jar包共享等優(yōu)勢,是一種更先進的部署模式。劣勢與perjob是一致的。
啟動Job
flink run-application -t yarn-application -Dtaskmanager.numberOfTaskSlots=2 -Dyarn.application.name=<xxx> -Dyarn.container.vcores=<xxx> -Dyarn.provided.lib.dirs=<hdfs-path> -Dtaskmanager.memory.process.size=2048m -Dstate.checkpoints.dir=<hdfs-path>/<job-name> -Dstate.saveponits.dir=<hdfs-path>/<job-name> -c <class> <jar-file> <arguments>
停止Job
flink list -yid=<appId>
flink cancel 00000000000000000000000000000000 -yid <appId>
探秘
application模式,flink client采用的是GenericCLI,也就是遵循的是flink run的Generic Mode的參數(shù)體系。相比與per-job的啟動參數(shù)配置,這里多出了checkpoints和savepoints路徑的配置,這是因為在application模式下,開啟了HA之后,所有任務的jobid都會固定為 0,這樣為了避免各個任務的checkpoints和savepoints存儲路徑發(fā)生沖突,所以需要針對每個Job進行定制。
另外,這種運行模式在運行時,如果依賴于系統(tǒng)變量,則需要在yarn-env中添加,并同時加入到y(tǒng)arn-site.xml的env-whitlist參數(shù)中。
HA場景驗證
本次驗證了各種場景下的Flink on YARN的高可用場景,包括kill TaskManager容器、kill JobManager容器、kill NodeManager進程、kill ResourceManager進程、停止NodeManager進程以及停止ResourceManager進程。針對這些場景,F(xiàn)link Job都能夠做到正常恢復。針對不同的場景,恢復的機制也不太相同。
kill TaskManager容器后,會導致Flink Job的中斷;而kill JobManager容器則不會。同樣kill NodeManager也會導致相應的job進行中斷恢復,而kill ResourceManager則不會(因為我們開啟了RM的recovery模式,并且開啟了RM的狀態(tài)存儲)。
總結
借助Yarn提供的資源管理能力,能夠很好的保護Flink Job的高可用,避免了對于Flink集群的維護。但是實際上針對我們這種全是long running的job,并且需要保證所有任務都得能夠啟動起來的場景看,YARN處理提供了可靠性保障外,并沒有發(fā)揮出Yarn的調度策略優(yōu)勢。針對我們單個部門的單個項目而言,引入YARN的成本有些高。畢竟雖然不用維護Flink集群了,但是卻引入了YARN集群,哈哈。
經(jīng)過討論,YARN集群還是適合在整個公司來應用,是公司的大數(shù)據(jù)基礎設施的一部分,借助Yarn的靈活的調度策略,來對公司的整個大數(shù)據(jù)資源池進行管控和調度,這樣我們在實行flink on yarn就更加輕松了。
當然,目前K8S發(fā)展勢頭很猛,未來Flink的最佳部署實踐是不是還是以Yarn為主,還是直接基于K8S還有待進一步學習才行。