(轉(zhuǎn)) Flink開發(fā)利器StreamPark

1. 背景

Hadoop體系雖然在目前應(yīng)用非常廣泛,但架構(gòu)繁瑣、運(yùn)維復(fù)雜度過高、版本升級(jí)困難,且由于部門原因,數(shù)據(jù)中臺(tái)需求排期較長(zhǎng),我們急需探索敏捷性開發(fā)的數(shù)據(jù)平臺(tái)模式。在目前云原生架構(gòu)的普及和湖倉(cāng)一體化的大背景下,我們已經(jīng)確定了將Doris作為離線數(shù)據(jù)倉(cāng)庫,將TiDB(目前已經(jīng)應(yīng)用于生產(chǎn))作為實(shí)時(shí)數(shù)據(jù)平臺(tái),同時(shí)因?yàn)镈oris具有 on MySQL 的odbc能力,所以又可以對(duì)外部數(shù)據(jù)庫資源進(jìn)行整合,統(tǒng)一對(duì)外輸出報(bào)表 [圖片上傳中...(image-1d98f3-1676355289636-9)]

image.png

2. 遇到的問題

在數(shù)據(jù)引擎上,我們確定使用Spark和Flink

  • 使用Spark on K8s client 客戶端模式做離線數(shù)據(jù)處理
  • 使用Flink on K8s Native-Application/Session 模式做實(shí)時(shí)任務(wù)流管理

在這里,實(shí)際上有一些問題我們一直沒有徹底解決。用過Native-Application模式的朋友都知道,每提交一個(gè)任務(wù),都需要打包新的鏡像,提交到私有倉(cāng)庫,然后再調(diào)用Flink Run 指令溝通K8s,去拉取鏡像運(yùn)行Pod。任務(wù)提交之后,還需要去K8s查看log, 但是:

  1. 任務(wù)運(yùn)行監(jiān)控怎么處理?
  2. 使用Cluster模式還是Nodeport暴露端口訪問Web UI?
  3. 提交任務(wù)能否簡(jiǎn)化打包鏡像的流程?
  4. 如何減少開發(fā)壓力?

3. 解決問題的過程

以上的這些其實(shí)都是需要解決的問題,如果單純的使用命令行去提交每個(gè)任務(wù),是不現(xiàn)實(shí)的,任務(wù)量大了,會(huì)變得不可維護(hù)。如何解決這些問題變成一個(gè)不得不面對(duì)的問題。

簡(jiǎn)化鏡像構(gòu)建

首先,針對(duì)Flink原生鏡像需要二次build的問題:我們利用了MINIO作為外部存儲(chǔ),并使用s3-fuse通過DaemonSet的方式直接掛載在了每個(gè)宿主節(jié)點(diǎn)上,我們所需要提交的jar包都可以放到上面統(tǒng)一管理,這樣的話,即使擴(kuò)縮容Flink節(jié)點(diǎn),也能實(shí)現(xiàn)s3掛載自動(dòng)伸縮。

image

Flink從1.13版本開始,就支持Pod Template,我們可以在Pod Template中利用數(shù)據(jù)卷掛載的方式再將宿主機(jī)目錄掛載到每個(gè)pod中。從而無需鏡像打包而直接在K8s上運(yùn)行Flink程序。如上圖,我們將s3先通過s3-fuse Pod掛載在Node1、Node2的/mnt/data-s3fs目錄下,然后再將/mnt/data-s3fs掛載到Pod A中。 但是,因?yàn)閷?duì)象存儲(chǔ)隨機(jī)寫入或追加文件需要重寫整個(gè)對(duì)象,導(dǎo)致這種方式僅適合于頻繁讀。而這剛好滿足我們現(xiàn)在的場(chǎng)景。

引入StreamPark

之前我們寫Flink Sql 基本上都是使用Java包裝Sql,打jar包,提交到s3平臺(tái)上,通過命令行方式提交代碼,但這種方式始終不友好,流程繁瑣,開發(fā)和運(yùn)維成本太大。我們希望能夠進(jìn)一步簡(jiǎn)化流程,將Flink TableEnvironment 抽象出來,有平臺(tái)負(fù)責(zé)初始化、打包運(yùn)行Flink任務(wù),實(shí)現(xiàn)Flink應(yīng)用程序的構(gòu)建、測(cè)試和部署自動(dòng)化。

這是個(gè)開源興起的時(shí)代,我們自然而然的將目光投向開源領(lǐng)域中,在一眾開源項(xiàng)目中,經(jīng)過對(duì)比各個(gè)項(xiàng)目綜合評(píng)估發(fā)現(xiàn) Zeppelin 和 StreamPark 這兩個(gè)項(xiàng)目對(duì)Flink的支持較為完善,都宣稱支持 Flink on K8s ,最終進(jìn)入到我們的目標(biāo)選擇范圍中,以下是兩者在K8s相關(guān)支持的簡(jiǎn)單比較(目前如果有更新,麻煩批評(píng)指正)。

功能 Zeppelin StreamPark
任務(wù)狀態(tài)監(jiān)控 稍低 ,不能作為任務(wù)狀態(tài)監(jiān)控工具 較高
任務(wù)資源管理 有 ,但目前版本還不是很健全
本地化部署 稍低 ,on K8s模式只能將Zeppelin部署在K8s中,否則就需要打通pod和外部網(wǎng)絡(luò),但是這在生產(chǎn)環(huán)境中很少這樣做的 可以本地化部署
多語言支持 較高 ,支持Python/Scala/Java多語言 一般 ,目前K8s模式和YARN模式同時(shí)支持FlinkSql,并可以根據(jù)自身需求,使用Java/Scala開發(fā)DataStream
Flink WebUI代理 目前還支持的不是很完整 ,主開發(fā)大佬目前是考慮整合ingress 較好 ,目前支持ClusterIp/NodePort/LoadBalance模式
學(xué)習(xí)成本 成本較低 ,需要增加額外的參數(shù)學(xué)習(xí),這個(gè)和原生的FlinkSql在參數(shù)上有點(diǎn)區(qū)別 無成本 ,K8s模式下FlinkSql為原生支持的sql格式;同時(shí)支持Custome-Code(用戶編寫代碼開發(fā)Datastream/FlinkSql任務(wù))
Flink多版本支持 支持 支持
Flink原生鏡像侵入 有侵入 ,需要在Flink鏡像中提前部署jar包,會(huì)同jobmanage啟動(dòng)在同一個(gè)pod中,和zeppelin-server通信 無侵入 ,但是會(huì)產(chǎn)生較多鏡像,需要定時(shí)清理
代碼多版本管理 支持 支持

<center style="box-sizing: border-box; font-family: Poppins, sans-serif, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji"; font-size: 16px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; color: gray;">(PS: 此處僅從調(diào)研用戶角度出發(fā),我們對(duì)雙方開發(fā)都保持極大的尊重)</center>

調(diào)研過程中,我們與兩者的主開發(fā)人員都進(jìn)行了多次溝通。經(jīng)過我們反復(fù)研究之后,還是決定將 StreamPark 作為我們目前的Flink開發(fā)工具來使用。

<center style="box-sizing: border-box; font-family: Poppins, sans-serif, -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, "Noto Sans", "Apple Color Emoji", "Segoe UI Emoji", "Segoe UI Symbol", "Noto Color Emoji"; font-size: 16px; font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; background-color: rgb(255, 255, 255); text-decoration-thickness: initial; text-decoration-style: initial; text-decoration-color: initial; color: gray;">(StreamPark 官網(wǎng)的閃屏)</center>

經(jīng)過開發(fā)同學(xué)長(zhǎng)時(shí)間開發(fā)測(cè)試,StreamPark 目前已經(jīng)具備:

  • 完善的Sql校驗(yàn)功能
  • 實(shí)現(xiàn)了自動(dòng)build/push鏡像
  • 使用自定義類加載器,通過Child-first 加載方式 解決了YARN和K8s兩種運(yùn)行模式、支持了自由切換Flink多版本
  • 與Flink-Kubernetes進(jìn)行深度整合,提交任務(wù)后返回WebUI,通過remote rest api + remote K8s ,追蹤任務(wù)執(zhí)行狀態(tài)
  • 同時(shí)支持了 Flink1.12、1.13、1.14 等版本

以上基本解決了我們目前開發(fā)和運(yùn)維中存在的大部分問題。

在目前最新發(fā)布的1.2.0版本中,StreamPark較為完善的支持了K8s-Native-Application和K8s-session-Application模式。

K8s Native Application 模式

在StreamPark中,我們只需要配置相應(yīng)的參數(shù),并在Maven pom中填寫相應(yīng)的依賴,或者上傳依賴jar包,點(diǎn)擊Apply,相應(yīng)的依賴就會(huì)生成。這就意味著我們也可以將所有使用的Udf打成jar包 and 各種 connector.jar,直接在sql中使用。如下圖:

[圖片上傳中...(image-9903e2-1676355289636-7)]

Sql校驗(yàn)?zāi)芰?Zeppelin基本一致:

image

我們也可以指定資源,指定Flink Run中的動(dòng)態(tài)參數(shù)Dynamic Option,甚至參數(shù)可以整合pod template

image

程序保存后,點(diǎn)擊運(yùn)行時(shí),也可以指定savepoint。任務(wù)提交成功后,StreamPark會(huì)根據(jù)FlinkPod網(wǎng)絡(luò)Exposed Type(loadBalancer/Nodeport/ClusterIp),返回相應(yīng)的WebURL,從而自然的實(shí)現(xiàn)WebUI跳轉(zhuǎn),但是目前因?yàn)榫€上私有K8s集群出于安全性考慮,尚未打通Pod與客戶端節(jié)點(diǎn)網(wǎng)絡(luò)(目前也沒有這個(gè)規(guī)劃),所以我們只使用Nodeport。如果后續(xù)任務(wù)數(shù)過多,有使用ClusterIP的需求的話,我們可能會(huì)將StreamPark 部署在K8s,或者同ingress做進(jìn)一步整合。

image

注意:K8s master 如果使用vip做均衡代理的情況下,F(xiàn)link 1.13版本會(huì)返回vip的ip地址,1.14版本已經(jīng)修復(fù)。 下面是K8s Application模式下具體提交流程

K8s Native Session 模式

StreamPark還較好的支持了 K8s Native-Sesson模式 ,這為我們后續(xù)做離線FlinkSql開發(fā)或部分資源隔離做了較好的技術(shù)支持。

Native-session模式需要事先使用Flink命令創(chuàng)建一個(gè)運(yùn)行在K8s中的Flink集群,如下:

./kubernetes-session.sh \-Dkubernetes.cluster-id=flink-on-k8s-flinkSql-test \-Dkubernetes.context=XXX \-Dkubernetes.namespace=XXXX \-Dkubernetes.service-account=XXXX \-Dkubernetes.container.image=XXXX \-Dkubernetes.container.image.pull-policy=Always \-Dkubernetes.taskmanager.node-selector=XXXX \-Dkubernetes.rest-service.exposed.type=Nodeport
image

如上圖,使用該ClusterId作為StreamPark的任務(wù)參數(shù)Kubernetes ClusterId。保存提交任務(wù)后,任務(wù)會(huì)很快處于Running狀態(tài)

image

我們順著application info的WebUI點(diǎn)擊跳轉(zhuǎn)

image

可以看到,其實(shí)StreamPark是將jar包通過Rest Api上傳到Flink集群上,并調(diào)度執(zhí)行任務(wù)的。

Custom Code模式

另我們驚喜的是,StreamPark 還支持代碼編寫DataStream/FlinkSql任務(wù)。對(duì)于特殊需求,我們可以自己寫Java/Scala實(shí)現(xiàn)。可以根據(jù)StreamPark推薦的腳手架方式編寫任務(wù),也可以編寫一個(gè)標(biāo)準(zhǔn)普通的Flink任務(wù),通過這種方式我們可以將代碼管理交由Git實(shí)現(xiàn),平臺(tái)可以用來自動(dòng)化編譯打包與部署。當(dāng)然,如果能用Sql實(shí)現(xiàn)的功能,我們會(huì)盡量避免自定義DataStream,減少不必要的運(yùn)維麻煩。

4. 意見和規(guī)劃

改進(jìn)意見

當(dāng)然StreamPark還有很多需要改進(jìn)的地方,就目前測(cè)試來看:

  • 資源管理還有待加強(qiáng) 多文件系統(tǒng)jar包等資源管理功能尚未添加,任務(wù)版本功能有待加強(qiáng)。
  • 前端buttern 功能還不夠豐富 比如任務(wù)添加后續(xù)可以增加復(fù)制等功能按鈕。
  • 任務(wù)提交日志也需要可視化展示 任務(wù)提交伴隨著加載class文件,打jar包,build鏡像,提交鏡像,提交任務(wù)等過程,每一個(gè)環(huán)節(jié)出錯(cuò),都會(huì)導(dǎo)致任務(wù)的失敗,但是失敗日志往往不明確,或者因?yàn)槟撤N原因?qū)е庐惓N凑伋?,沒有轉(zhuǎn)換任務(wù)狀態(tài),用戶會(huì)無從下手改進(jìn)。

眾所周知,一個(gè)新事物的出現(xiàn)一開始總會(huì)不是那么完美。盡管有些許問題和需要改進(jìn)的point,但是瑕不掩瑜,我們?nèi)匀贿x擇StreamPark作為我們的Flink DevOps,我們也將會(huì)和主開發(fā)人員一道共同完善StreamPark,也歡迎更多的人來使用,為StreamPark帶來更多進(jìn)步。

未來規(guī)劃

  • 我們會(huì)繼續(xù)跟進(jìn)doris,并將業(yè)務(wù)數(shù)據(jù) + 日志數(shù)據(jù)統(tǒng)一入doris,通過Flink實(shí)現(xiàn)湖倉(cāng)一體;

  • 我們也會(huì)逐步將探索StreamPark同dolphinscheduler 2.x進(jìn)行整合,完善dolphinscheduler離線任務(wù),逐步用Flink 替換掉Spark,實(shí)現(xiàn)真正的流批一體;

  • 基于我們自身在s3上的探索積累,fat-jar包 build 完成之后不再構(gòu)建鏡像,直接利用Pod Tempelet掛載pvc到Flink pod中的目錄,進(jìn)一步優(yōu)化代碼提交流程;

  • 將StreamPark持續(xù)應(yīng)用到我們生產(chǎn)中,并匯同社區(qū)開發(fā)人員,共同努力,增強(qiáng)StreamPark在Flink流上的開發(fā)部署能力與運(yùn)行監(jiān)控能力,努力把StreamPark打造成一個(gè)功能完善的流數(shù)據(jù) DevOps。

附:

StreamPark Github: https://github.com/apache/incubator-streampark
Doris Github: https://github.com/apache/incubator-doris

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容