一、Flink安裝
1.1 二進制安裝
? ? 在官網(wǎng)下載頁面下載二進制包,下載的壓縮包需要和服務(wù)器環(huán)境中的Hadoop、Scala版本相匹配,然后直接解壓。
1.2源碼編譯
????如果對Flink進行功能修改、或者為了支持特定版本的Hadoop,需要對Flink重新編譯。在maven 3、jdk 8.0以上環(huán)境下編譯,建議Maven3.0.x、3.1.x、 3.2.x。
源碼下載
git clone https://github.com/apache/flink
hadoop版本
Flink有依賴于HDFS、Yarn,而這又依賴于Hadoop,如果版本號錯誤,可能程序執(zhí)行過程會出異常。Flink鏡像庫中已經(jīng)編譯好的安裝包通常對應(yīng)的是Hadoop的主流版本,如果用戶需要指定Hadoop版本編譯安裝包,可以在編譯過程中使用-Dhadoop.version參數(shù)指定Hadoop版本,目前Flink支持2.4以上版本的Hadoop。如果用戶使用的是供應(yīng)商提供的Hadoop平臺,如Cloudera的CDH等,則需要根據(jù)供應(yīng)商的系統(tǒng)版本來編譯Flink,可以指定-Pvendor-repos參數(shù)來激活類似于Cloudera的Maven Repositories,會在編譯過程中下載依賴對應(yīng)版本的包。
hadoop version
為了提高build速度,可以用-DskipTests -Dfast跳過tests、QA Plugins、JavsDocs等過程。
mvn clean install -DskipTests -Dfast -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.0
cloudera的maven倉庫已經(jīng)在pom文件里配置過了,這樣在打包時就能在倉庫找到特定版本的jar包:
如果想在build flink發(fā)布版本里包括Hadoop lib,可以加-Pinclude-hadoop,
scala版本
如果用scala語言的Flink API、lib包,項目中scala版本需要和Flink中scala版本相匹配,因為scala版本不是嚴格向后兼容的,不同版本差異較大。flink1.8版本支持scala2.11、scala2.12,如果是純java語言開發(fā),可以忽略這項,默認的是2.11版本,但可以用-Pscala-2.12激活2.12版本:
mvn clean install -DskipTests -Dfast -Pscala-2.12 -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.0
pom文件里scala2.12版本的配置:
??? build后的二進制文件目錄:
flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT
二、Flink部署
??? Flink壓縮包在集群上解壓縮后,可以修改conf/flink-conf.yaml配置flink集群,F(xiàn)link支持多種部署模式,不同部署模式對應(yīng)不同配置方式。
2.1?Standalone Cluster
standalone模式下的集群,包括至少一個JobManager、至少一個TaskManager,如果節(jié)點異常,master節(jié)點和work節(jié)點不會自動重啟。當worker節(jié)點異常時,如果集群有足夠可用的solt數(shù),job還可以自動恢復(fù);如果master節(jié)點異常,只有master配置了HA,job才能恢復(fù)執(zhí)行。
組件交互
配置
conf/flink-conf.yam是Flink集群的配置文件。
1. 系統(tǒng)配置
jobmanager.rpc.address:指定master節(jié)點IP地址
jobmanager.rpc.port:指定master端口號
env.java.opts:節(jié)點默認的JVM參數(shù)
env.java.opts.jobmanager:JM的JVM參數(shù)
env.java.opts.taskmanager:TM的JVM參數(shù)
2.? 內(nèi)存配置
jobmanager.heap.size:JM節(jié)點可分配的最大內(nèi)存,master主要管理集群資源(RM)、協(xié)調(diào)應(yīng)用程序執(zhí)行(TM),所需資源不是很多;但是如果集群上了跑有很多應(yīng)用或者一個應(yīng)用有很多算子,就要考慮增加內(nèi)存;默認1024m
taskmanager.heap.size:worker節(jié)點負責(zé)實際任務(wù)的執(zhí)行并處理潛在的大量數(shù)據(jù),而且當使用內(nèi)存或文件格式的狀態(tài)后端,也會占用JVM堆內(nèi)存。默認1024m,根據(jù)實際應(yīng)用情況適當調(diào)整。
On Yarn模式下,內(nèi)存大小是被TM的Yarn資源容器自動分配的。
taskmanager.memory.size/taskmanager.memory.fraction:TaskManager為排序、hashtable、中間結(jié)果的緩存而分配的內(nèi)存。taskmanager.memory.size配置絕對值、taskmanager.memory.fraction按照比例分配內(nèi)存。如果內(nèi)存不夠
taskmanager.memory.off-heap:
3.? work節(jié)點資源配置
taskmanager.numberOfTaskSlots:work進程可以同時運行的task數(shù),即work節(jié)點的并發(fā)數(shù)。需要注意的是,flink不為每個task或solt指定分配內(nèi)存,這樣一個任務(wù)就可能消耗JVM大部分內(nèi)存,這點在上篇文章里也有特別提到。通常是物理cpu的數(shù)量、或是其一半,默認1。
4.? 網(wǎng)絡(luò)配置
TaskManager的網(wǎng)絡(luò)緩存區(qū)用于跨節(jié)點的數(shù)據(jù)交換(task接收、發(fā)送事件),是網(wǎng)絡(luò)交換的關(guān)鍵組件。TaskManager在發(fā)送事件之前先把事件收集到緩存區(qū)、在接收到數(shù)據(jù)之后并把數(shù)據(jù)傳遞給應(yīng)用之前緩存到緩沖區(qū)。這種方式有效的利用網(wǎng)絡(luò)資源實現(xiàn)高吞吐。緩存區(qū)的總數(shù)量和任務(wù)之間的網(wǎng)絡(luò)連接總數(shù)相同,比如通過分區(qū)或廣播連接起來的兩個算子,緩存區(qū)總數(shù)是發(fā)送和接收算子的并行度的乘積。網(wǎng)絡(luò)緩存區(qū)在JVM堆外內(nèi)存分配。
如果沒有足夠數(shù)量的緩存區(qū),可能不能為同時建立的所有網(wǎng)絡(luò)連接分配緩存區(qū),會降低吞吐量。
taskmanager.network.memory.fraction:該參數(shù)配置TaskManager節(jié)點為網(wǎng)絡(luò)緩沖區(qū)分配的總內(nèi)存,默認占JVM堆內(nèi)存10%
taskmanager.memory.segment-size:配置一個網(wǎng)絡(luò)堆棧分配的內(nèi)存大小,默認32k。
????減少網(wǎng)絡(luò)堆棧(一個網(wǎng)絡(luò)數(shù)據(jù)交換管道的網(wǎng)絡(luò)緩沖區(qū))會增加網(wǎng)絡(luò)緩沖區(qū)數(shù)量,但有可能降低整體的網(wǎng)絡(luò)傳輸效率。
5.? 應(yīng)用并行度
parallelism.default:Flink應(yīng)用默認并行度,即應(yīng)用內(nèi)每個算子的并行度,與整個集群的CPU數(shù)量有關(guān),增加parallelism可以提高任務(wù)并行的計算的實例數(shù),提升數(shù)據(jù)處理效率,但也會占用更多Slot。一個算子的并行度有多種配置方式,其優(yōu)先級是:
配置文件配置的(parallelism.default) < 啟動時-p參數(shù)指定 < 應(yīng)用代碼設(shè)置的并行度(env.setParallelism)<應(yīng)用代碼里為每個算子配的(setParallelism)
6. 磁盤配置
Flink在運行時會把臨時數(shù)據(jù)寫到本地文件系統(tǒng),比如flink接收到的JAR文件、應(yīng)用程序狀態(tài)(當用RocksDB存儲應(yīng)用程序狀態(tài)時),要避免目錄里的數(shù)據(jù)被服務(wù)器自動清空,否則job重啟時可能因找不到元數(shù)據(jù)導(dǎo)致恢復(fù)失敗
io.tmp.dirs :指定默認的本地臨時目錄
7. checkpoint
state.backend:維護狀態(tài)數(shù)據(jù)的狀態(tài)后端,會影響應(yīng)用程序的性能??蛇x的配置項是jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
state.backend.async:是否異步checkpoint
state.backend.incremental:是否創(chuàng)建增量checkpoint,增量checkpoint僅存儲和之前checkpoint的差異,而不是完整的checkpoint。但不是所有的后端狀態(tài)都支持異步和增量checkpoint。
state.checkpoints.dir:checkpoint存儲數(shù)據(jù)文件、元數(shù)據(jù)的默認根目錄,這個目錄必須能被所有的進程、節(jié)點訪問到
state.savepoints.dir:savepoint的根目錄,用于狀態(tài)存儲寫文件時使用(MemoryStateBackend, FsStateBackend, RocksDBStateBackend)
conf/slaves:配置集群中所有的工作節(jié)點
啟動
????在JM節(jié)點執(zhí)行下面命令,完成的工作:本地啟動JM、并通過SSH命令啟動所有的work節(jié)點上的TM,work節(jié)點列表配置在slaves文件,要求JM節(jié)點可用免密碼ssh到TM節(jié)點。:
bin/start-cluster.sh
????也可以在集群的每個節(jié)點上單獨執(zhí)行命令,分別對JM、TM進行啟動、停止操作:
bin/jobmanager.sh((start|start-foreground)[host][webui-port])|stop|stop-all
bin/taskmanager.sh start|start-foreground|stop|stop-all
默認JobManager配置Rest綁定的配置項rest.port(默認值8081),集群的JM、TM啟動后,可以在JM的web頁面上看到相關(guān)信息:
Run Example
執(zhí)行內(nèi)置的demo:
./flink run ../examples/batch/WordCount.jar
-m 指定DispatcherRestEndpoint的ip和端口,即master節(jié)點的WEB UI
這些默認配置項會在集群啟動的時候加載到Flink集群中,當用戶提交任務(wù)時,可以通過-D符號來動態(tài)設(shè)定系統(tǒng)參數(shù),此時flink-conf.yaml配置文件中的參數(shù)就會被覆蓋掉
2.2 on Yarn
yarn是Hadoop上的一個資源管理組件,管理集群上的計算資源,比如集群的CPU、內(nèi)存,并把資源封裝成container分配給應(yīng)用。Flink在Yarn上兩種運行模式:job模式、session模式,job模式是以單個job的方式啟動Flink集群,一旦作業(yè)執(zhí)行完畢,F(xiàn)link集群就停止并釋放所有資源;session模式在Yarn上啟動一個長期運行的Flink集群,可以執(zhí)行多個Job,但要手動管理集群的停止。不論是哪種on Yarn模式,當Flink TaskManager失敗后,任務(wù)都會被自動轉(zhuǎn)移到其他TM。
Yarn主要由ResourceManager、ApplicationMaster、NodeManager組成,RM是一個全局的資源管理器,負責(zé)整個系統(tǒng)的資源管理和分配;每個往Yarn上提交的應(yīng)用程序都包含一個AM,主要是向RM請求資源,將獲得的資源進一步分配給內(nèi)部的任務(wù),監(jiān)控所有任務(wù)的運行狀態(tài)并在任務(wù)失敗時重新申請資源以重啟任務(wù);NM定時向RM匯報本節(jié)點上資源利用情況,接收并處理來自AM的任務(wù)啟動、停止等各種請求;資源分配的單位被抽象成container。
環(huán)境前置條件:設(shè)置HADOOP_CLASSPATH環(huán)境變量,F(xiàn)link依賴于hadoop lib,啟動Flink組件(如Client、JM、TM),如果flink發(fā)布包里已經(jīng)打包了hadoop lib,可不用再配置該變量;設(shè)置YARN_CONF_DIR或者HADOOP_CONF_DIR環(huán)境變量,以便可以讀取yarn和hdfs的配置,如果啟動的節(jié)點沒有相應(yīng)的配置文件,F(xiàn)link在啟動過程中無法正常連接到Y(jié)arn集群。需要注意的是,如果只是把hadoop相關(guān)配置文件(core-site.xml、yarn-site.xml等)放到flink配置文件目錄下,是不起作用的。
配置
yarn.appmaster.vcores:Yarn?ApplicationMaster分配的虛擬內(nèi)核(vcores),默認1.
yarn.containers.vcores:Yarn?containers分配的虛擬內(nèi)核,默認情況下,虛擬內(nèi)核數(shù)和TaskManager配置的slots數(shù)相同,默認是1.
2.2.1 Session Mode
當啟動yarn session時,Yarn會為ApplicationMaster分配一個container,這個ApplicationMaster運行Flink JobManager;用戶提交任務(wù)后,yarn session會向yarn申請資源container、啟動Flink所有必須服務(wù),比如JM、TM,即客戶端直接向yarn session請求,而不是向Yarn進行交互,session會自動分配container,用于運行Task Manager(一個TM占用一個container)。Flink Session是長期運行的服務(wù),可以同時運行多個flink 應(yīng)用。
Start
./bin/yarn-session.sh-jm1024m-tm4096m
?-jm:JobManager(Yarn Application Master)申請1G內(nèi)存資源、-tm:TM分配4G內(nèi)存
當session運行后,可在yarn管理界面上監(jiān)控到,點擊ApplicationMaster會打開JobManager頁面:
-d參數(shù):當Flink Yarn客戶的把flink提交到y(tǒng)arn集群后,就自動停止。
停止Yarn session可以用yarn的命令行:yarn application -kill <appId>
Yarn session提交以后,就可以用flink命令提交job到集群。
Submit Job
提交程序到y(tǒng)arn集群上:
./flink run ../examples/batch/WordCount.jar? --input hdfs:///xxxx --output hdfs:///xxx
可選參數(shù):-yid :指定Flink yarn-session的Application ID
job提交后,因為并沒有啟動新的Yarn Application Maser,所以在在Yarn ResourceManager Web上監(jiān)控不到,可以在Flink Web UI上看到。
2.2.2 Job Mode
? ? Flink任務(wù)直接以單個應(yīng)用提交到y(tǒng)arn上,每次提交的Flink任務(wù)都會啟動獨立的Yarn Aplication,每個任務(wù)都會有自己的JM、TM,所有的資源都獨立使用。啟動作業(yè)提交大致過程:客戶端向Yarn RM提交應(yīng)用,Yarn RM為該應(yīng)用申請一定數(shù)量的container(container數(shù)是可配置的),并在相應(yīng)的NM上啟動應(yīng)用程序的AM。每次提交任務(wù)時都會在Yarn上跑一個完整的Flink集群,這樣任務(wù)之間相互獨立、互不影響,任務(wù)跑完后創(chuàng)建的集群也隨之關(guān)閉。
Submit Job
./bin/flink run-myarn-cluster ./examples/batch/WordCount.jar
????在Yarn ResourceManager Web頁面上,能看到啟動了一個Flink類型的Application Master,當任務(wù)執(zhí)行完成后,該Application Master也自動停止。
其他可選參數(shù):
-yqu 指定Yarn資源隊列
-yjm Flink Master(Yarn ApplicationMaster)分配一個資源container,-yjm指定該資源擁有的內(nèi)存,怎么配置申請的資源虛擬內(nèi)存?
-ytm 指定為每個Flink TaskManager分配的內(nèi)存,即為每個TaskManager申請到的資源container內(nèi)存
-ys 配置每個TaskManager上的solt數(shù),默認一個solt占用一個虛擬內(nèi)核。
-yn:任務(wù)啟動的TM數(shù)。
-p 設(shè)置job的全局并行度,即使用Flink多少solt,默認并行度1