Flink CookBook-Flink 環(huán)境準備

一、Flink安裝

1.1 二進制安裝

? ? 在官網(wǎng)下載頁面下載二進制包,下載的壓縮包需要和服務(wù)器環(huán)境中的Hadoop、Scala版本相匹配,然后直接解壓。

1.2源碼編譯

????如果對Flink進行功能修改、或者為了支持特定版本的Hadoop,需要對Flink重新編譯。在maven 3、jdk 8.0以上環(huán)境下編譯,建議Maven3.0.x、3.1.x、 3.2.x。

源碼下載

git clone https://github.com/apache/flink

hadoop版本

Flink有依賴于HDFS、Yarn,而這又依賴于Hadoop,如果版本號錯誤,可能程序執(zhí)行過程會出異常。Flink鏡像庫中已經(jīng)編譯好的安裝包通常對應(yīng)的是Hadoop的主流版本,如果用戶需要指定Hadoop版本編譯安裝包,可以在編譯過程中使用-Dhadoop.version參數(shù)指定Hadoop版本,目前Flink支持2.4以上版本的Hadoop。如果用戶使用的是供應(yīng)商提供的Hadoop平臺,如Cloudera的CDH等,則需要根據(jù)供應(yīng)商的系統(tǒng)版本來編譯Flink,可以指定-Pvendor-repos參數(shù)來激活類似于Cloudera的Maven Repositories,會在編譯過程中下載依賴對應(yīng)版本的包。

hadoop version

為了提高build速度,可以用-DskipTests -Dfast跳過tests、QA Plugins、JavsDocs等過程。

mvn clean install -DskipTests -Dfast -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.0

cloudera的maven倉庫已經(jīng)在pom文件里配置過了,這樣在打包時就能在倉庫找到特定版本的jar包:

如果想在build flink發(fā)布版本里包括Hadoop lib,可以加-Pinclude-hadoop,

scala版本

如果用scala語言的Flink API、lib包,項目中scala版本需要和Flink中scala版本相匹配,因為scala版本不是嚴格向后兼容的,不同版本差異較大。flink1.8版本支持scala2.11、scala2.12,如果是純java語言開發(fā),可以忽略這項,默認的是2.11版本,但可以用-Pscala-2.12激活2.12版本:

mvn clean install -DskipTests -Dfast -Pscala-2.12 -Pinclude-hadoop -Pvendor-repos -Dhadoop.version=2.6.0-cdh5.14.0

pom文件里scala2.12版本的配置:

??? build后的二進制文件目錄:

flink/flink-dist/target/flink-1.9-SNAPSHOT-bin/flink-1.9-SNAPSHOT

二、Flink部署

??? Flink壓縮包在集群上解壓縮后,可以修改conf/flink-conf.yaml配置flink集群,F(xiàn)link支持多種部署模式,不同部署模式對應(yīng)不同配置方式。

2.1?Standalone Cluster

standalone模式下的集群,包括至少一個JobManager、至少一個TaskManager,如果節(jié)點異常,master節(jié)點和work節(jié)點不會自動重啟。當worker節(jié)點異常時,如果集群有足夠可用的solt數(shù),job還可以自動恢復(fù);如果master節(jié)點異常,只有master配置了HA,job才能恢復(fù)執(zhí)行。

組件交互

配置

conf/flink-conf.yam是Flink集群的配置文件。

1. 系統(tǒng)配置

jobmanager.rpc.address:指定master節(jié)點IP地址

jobmanager.rpc.port:指定master端口號

env.java.opts:節(jié)點默認的JVM參數(shù)

env.java.opts.jobmanager:JM的JVM參數(shù)

env.java.opts.taskmanager:TM的JVM參數(shù)

2.? 內(nèi)存配置

jobmanager.heap.size:JM節(jié)點可分配的最大內(nèi)存,master主要管理集群資源(RM)、協(xié)調(diào)應(yīng)用程序執(zhí)行(TM),所需資源不是很多;但是如果集群上了跑有很多應(yīng)用或者一個應(yīng)用有很多算子,就要考慮增加內(nèi)存;默認1024m

taskmanager.heap.size:worker節(jié)點負責(zé)實際任務(wù)的執(zhí)行并處理潛在的大量數(shù)據(jù),而且當使用內(nèi)存或文件格式的狀態(tài)后端,也會占用JVM堆內(nèi)存。默認1024m,根據(jù)實際應(yīng)用情況適當調(diào)整。

On Yarn模式下,內(nèi)存大小是被TM的Yarn資源容器自動分配的。

taskmanager.memory.size/taskmanager.memory.fraction:TaskManager為排序、hashtable、中間結(jié)果的緩存而分配的內(nèi)存。taskmanager.memory.size配置絕對值、taskmanager.memory.fraction按照比例分配內(nèi)存。如果內(nèi)存不夠

taskmanager.memory.off-heap:

3.? work節(jié)點資源配置

taskmanager.numberOfTaskSlots:work進程可以同時運行的task數(shù),即work節(jié)點的并發(fā)數(shù)。需要注意的是,flink不為每個task或solt指定分配內(nèi)存,這樣一個任務(wù)就可能消耗JVM大部分內(nèi)存,這點在上篇文章里也有特別提到。通常是物理cpu的數(shù)量、或是其一半,默認1。

4.? 網(wǎng)絡(luò)配置

TaskManager的網(wǎng)絡(luò)緩存區(qū)用于跨節(jié)點的數(shù)據(jù)交換(task接收、發(fā)送事件),是網(wǎng)絡(luò)交換的關(guān)鍵組件。TaskManager在發(fā)送事件之前先把事件收集到緩存區(qū)、在接收到數(shù)據(jù)之后并把數(shù)據(jù)傳遞給應(yīng)用之前緩存到緩沖區(qū)。這種方式有效的利用網(wǎng)絡(luò)資源實現(xiàn)高吞吐。緩存區(qū)的總數(shù)量和任務(wù)之間的網(wǎng)絡(luò)連接總數(shù)相同,比如通過分區(qū)或廣播連接起來的兩個算子,緩存區(qū)總數(shù)是發(fā)送和接收算子的并行度的乘積。網(wǎng)絡(luò)緩存區(qū)在JVM堆外內(nèi)存分配。

如果沒有足夠數(shù)量的緩存區(qū),可能不能為同時建立的所有網(wǎng)絡(luò)連接分配緩存區(qū),會降低吞吐量。

taskmanager.network.memory.fraction:該參數(shù)配置TaskManager節(jié)點為網(wǎng)絡(luò)緩沖區(qū)分配的總內(nèi)存,默認占JVM堆內(nèi)存10%

taskmanager.memory.segment-size:配置一個網(wǎng)絡(luò)堆棧分配的內(nèi)存大小,默認32k。

????減少網(wǎng)絡(luò)堆棧(一個網(wǎng)絡(luò)數(shù)據(jù)交換管道的網(wǎng)絡(luò)緩沖區(qū))會增加網(wǎng)絡(luò)緩沖區(qū)數(shù)量,但有可能降低整體的網(wǎng)絡(luò)傳輸效率。

5.? 應(yīng)用并行度

parallelism.default:Flink應(yīng)用默認并行度,即應(yīng)用內(nèi)每個算子的并行度,與整個集群的CPU數(shù)量有關(guān),增加parallelism可以提高任務(wù)并行的計算的實例數(shù),提升數(shù)據(jù)處理效率,但也會占用更多Slot。一個算子的并行度有多種配置方式,其優(yōu)先級是:

配置文件配置的(parallelism.default) < 啟動時-p參數(shù)指定 < 應(yīng)用代碼設(shè)置的并行度(env.setParallelism)<應(yīng)用代碼里為每個算子配的(setParallelism)

6. 磁盤配置

Flink在運行時會把臨時數(shù)據(jù)寫到本地文件系統(tǒng),比如flink接收到的JAR文件、應(yīng)用程序狀態(tài)(當用RocksDB存儲應(yīng)用程序狀態(tài)時),要避免目錄里的數(shù)據(jù)被服務(wù)器自動清空,否則job重啟時可能因找不到元數(shù)據(jù)導(dǎo)致恢復(fù)失敗

io.tmp.dirs :指定默認的本地臨時目錄

7. checkpoint

state.backend:維護狀態(tài)數(shù)據(jù)的狀態(tài)后端,會影響應(yīng)用程序的性能??蛇x的配置項是jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

state.backend.async:是否異步checkpoint

state.backend.incremental:是否創(chuàng)建增量checkpoint,增量checkpoint僅存儲和之前checkpoint的差異,而不是完整的checkpoint。但不是所有的后端狀態(tài)都支持異步和增量checkpoint。

state.checkpoints.dir:checkpoint存儲數(shù)據(jù)文件、元數(shù)據(jù)的默認根目錄,這個目錄必須能被所有的進程、節(jié)點訪問到

state.savepoints.dir:savepoint的根目錄,用于狀態(tài)存儲寫文件時使用(MemoryStateBackend, FsStateBackend, RocksDBStateBackend)

conf/slaves:配置集群中所有的工作節(jié)點

啟動

????在JM節(jié)點執(zhí)行下面命令,完成的工作:本地啟動JM、并通過SSH命令啟動所有的work節(jié)點上的TM,work節(jié)點列表配置在slaves文件,要求JM節(jié)點可用免密碼ssh到TM節(jié)點。:

bin/start-cluster.sh

????也可以在集群的每個節(jié)點上單獨執(zhí)行命令,分別對JM、TM進行啟動、停止操作:

bin/jobmanager.sh((start|start-foreground)[host][webui-port])|stop|stop-all

bin/taskmanager.sh start|start-foreground|stop|stop-all

默認JobManager配置Rest綁定的配置項rest.port(默認值8081),集群的JM、TM啟動后,可以在JM的web頁面上看到相關(guān)信息:

Run Example

執(zhí)行內(nèi)置的demo:

./flink run ../examples/batch/WordCount.jar

-m 指定DispatcherRestEndpoint的ip和端口,即master節(jié)點的WEB UI

這些默認配置項會在集群啟動的時候加載到Flink集群中,當用戶提交任務(wù)時,可以通過-D符號來動態(tài)設(shè)定系統(tǒng)參數(shù),此時flink-conf.yaml配置文件中的參數(shù)就會被覆蓋掉

2.2 on Yarn

yarn是Hadoop上的一個資源管理組件,管理集群上的計算資源,比如集群的CPU、內(nèi)存,并把資源封裝成container分配給應(yīng)用。Flink在Yarn上兩種運行模式:job模式、session模式,job模式是以單個job的方式啟動Flink集群,一旦作業(yè)執(zhí)行完畢,F(xiàn)link集群就停止并釋放所有資源;session模式在Yarn上啟動一個長期運行的Flink集群,可以執(zhí)行多個Job,但要手動管理集群的停止。不論是哪種on Yarn模式,當Flink TaskManager失敗后,任務(wù)都會被自動轉(zhuǎn)移到其他TM。

Yarn主要由ResourceManager、ApplicationMaster、NodeManager組成,RM是一個全局的資源管理器,負責(zé)整個系統(tǒng)的資源管理和分配;每個往Yarn上提交的應(yīng)用程序都包含一個AM,主要是向RM請求資源,將獲得的資源進一步分配給內(nèi)部的任務(wù),監(jiān)控所有任務(wù)的運行狀態(tài)并在任務(wù)失敗時重新申請資源以重啟任務(wù);NM定時向RM匯報本節(jié)點上資源利用情況,接收并處理來自AM的任務(wù)啟動、停止等各種請求;資源分配的單位被抽象成container。

環(huán)境前置條件:設(shè)置HADOOP_CLASSPATH環(huán)境變量,F(xiàn)link依賴于hadoop lib,啟動Flink組件(如Client、JM、TM),如果flink發(fā)布包里已經(jīng)打包了hadoop lib,可不用再配置該變量;設(shè)置YARN_CONF_DIR或者HADOOP_CONF_DIR環(huán)境變量,以便可以讀取yarn和hdfs的配置,如果啟動的節(jié)點沒有相應(yīng)的配置文件,F(xiàn)link在啟動過程中無法正常連接到Y(jié)arn集群。需要注意的是,如果只是把hadoop相關(guān)配置文件(core-site.xml、yarn-site.xml等)放到flink配置文件目錄下,是不起作用的。

配置

yarn.appmaster.vcores:Yarn?ApplicationMaster分配的虛擬內(nèi)核(vcores),默認1.

yarn.containers.vcores:Yarn?containers分配的虛擬內(nèi)核,默認情況下,虛擬內(nèi)核數(shù)和TaskManager配置的slots數(shù)相同,默認是1.

2.2.1 Session Mode

當啟動yarn session時,Yarn會為ApplicationMaster分配一個container,這個ApplicationMaster運行Flink JobManager;用戶提交任務(wù)后,yarn session會向yarn申請資源container、啟動Flink所有必須服務(wù),比如JM、TM,即客戶端直接向yarn session請求,而不是向Yarn進行交互,session會自動分配container,用于運行Task Manager(一個TM占用一個container)。Flink Session是長期運行的服務(wù),可以同時運行多個flink 應(yīng)用。

Start

./bin/yarn-session.sh-jm1024m-tm4096m

?-jm:JobManager(Yarn Application Master)申請1G內(nèi)存資源、-tm:TM分配4G內(nèi)存

當session運行后,可在yarn管理界面上監(jiān)控到,點擊ApplicationMaster會打開JobManager頁面:

-d參數(shù):當Flink Yarn客戶的把flink提交到y(tǒng)arn集群后,就自動停止。

停止Yarn session可以用yarn的命令行:yarn application -kill <appId>

Yarn session提交以后,就可以用flink命令提交job到集群。

Submit Job

提交程序到y(tǒng)arn集群上:

./flink run ../examples/batch/WordCount.jar? --input hdfs:///xxxx --output hdfs:///xxx

可選參數(shù):-yid :指定Flink yarn-session的Application ID

job提交后,因為并沒有啟動新的Yarn Application Maser,所以在在Yarn ResourceManager Web上監(jiān)控不到,可以在Flink Web UI上看到。

2.2.2 Job Mode

? ? Flink任務(wù)直接以單個應(yīng)用提交到y(tǒng)arn上,每次提交的Flink任務(wù)都會啟動獨立的Yarn Aplication,每個任務(wù)都會有自己的JM、TM,所有的資源都獨立使用。啟動作業(yè)提交大致過程:客戶端向Yarn RM提交應(yīng)用,Yarn RM為該應(yīng)用申請一定數(shù)量的container(container數(shù)是可配置的),并在相應(yīng)的NM上啟動應(yīng)用程序的AM。每次提交任務(wù)時都會在Yarn上跑一個完整的Flink集群,這樣任務(wù)之間相互獨立、互不影響,任務(wù)跑完后創(chuàng)建的集群也隨之關(guān)閉。

Submit Job

./bin/flink run-myarn-cluster ./examples/batch/WordCount.jar

????在Yarn ResourceManager Web頁面上,能看到啟動了一個Flink類型的Application Master,當任務(wù)執(zhí)行完成后,該Application Master也自動停止。

其他可選參數(shù):

-yqu 指定Yarn資源隊列

-yjm Flink Master(Yarn ApplicationMaster)分配一個資源container,-yjm指定該資源擁有的內(nèi)存,怎么配置申請的資源虛擬內(nèi)存?

-ytm 指定為每個Flink TaskManager分配的內(nèi)存,即為每個TaskManager申請到的資源container內(nèi)存

-ys 配置每個TaskManager上的solt數(shù),默認一個solt占用一個虛擬內(nèi)核。

-yn:任務(wù)啟動的TM數(shù)。

-p 設(shè)置job的全局并行度,即使用Flink多少solt,默認并行度1

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容