flink 1.11 集成zeppelin實現(xiàn)簡易實時計算平臺

背景

隨著flink的蓬勃發(fā)展,zeppelin社區(qū)也大力推進(jìn)flink與zeppelin的集成.zeppelin的定位是一種使用sql或者scala等語言的一個交互式的分析查詢分析工具。

Web-based notebook that enables data-driven,
interactive data analytics and collaborative documents with SQL, Scala and more.

所以zeppelin與flink或者是其他的解釋器集成的時候,就會有這么一個架構(gòu)的特點,我需要啟動一個處理數(shù)據(jù)的服務(wù),相關(guān)的任務(wù)都提交到這個上面,拿flink來說,就是需要啟動一個flink的集群,比如local、remote、session模式的集群。 當(dāng)我們執(zhí)行一些flink sql的時候,都是提交到這個集群來執(zhí)行的。

zeppelin不提供per job模式

但是我們在生產(chǎn)環(huán)境中,對于一些flink的流式任務(wù),我們一般會采用per job的模式提交任務(wù),主要是為了任務(wù)資源的隔離,每個任務(wù)互不影響。目前zeppelin是不支持這種模式的。所以很多公司都會開發(fā)一個自己的實時流式任務(wù)計算平臺,可以實現(xiàn)使用sql或者jar的方式通過平臺來提交任務(wù)到集群,避免了底層一些復(fù)雜的操作,使一些只會sql的人也能開發(fā)flink任務(wù)。

實時平臺開發(fā)周期長

但是開發(fā)一個實時計算平臺其實是相對比較復(fù)雜的,它需要有前端的寫sql的頁面,后端的提交邏輯,以及前后端的交互等等。所以我的想法是既然zeppelin已經(jīng)提供了我們做一個實時平臺的很多的功能,比如寫sql的頁面、前后端交互、提交任務(wù)、獲取任務(wù)的狀態(tài)等等,那么我們是不是可以用zeppelin來開發(fā)一個簡化版的實時計算平臺呢。

基于zeppelin開發(fā)一個簡易實時平臺

今天我們談?wù)勗趺赐ㄟ^zeppelin來實現(xiàn)一個簡易的實時平臺,目的是可以把flink的sql和jar的流式任務(wù)以per job的方式提交到y(tǒng)arn集群。

我們簡單的看下zeppelin中flink 解釋器的源碼,他底層是使用了flink scala shell,具體相關(guān)內(nèi)容可以參考 Flink Scala REPL :https://ci.apache.org/projects/flink/flink-docs-stable/ops/scala_shell.html.

zeppelin在提交flink的任務(wù)的時候,會判斷下集群是否啟動,如果沒有啟動flink集群,會根據(jù)設(shè)置的模式(local、yarn)先啟動一個非隔離模式的flink集群(remote模式需要提前啟動好一個集群),然后客戶端保持著和服務(wù)器的連接,后續(xù)有用戶提交的任務(wù),就把任務(wù)提交到剛起啟動的集群。我研究了一下代碼覺得在這個上面加一個per job模式的話可能會破壞原來的架構(gòu),改動還會比較大,所以后來想自己做一個zepplin的解釋器,功能就是通過sql或者jar的方式專門用來提交flink的流式任務(wù)。

最后我們基于zeppelin開發(fā)的實時平臺可以提供以下功能:

  1. 以per job的方式提交flink流任務(wù)到y(tǒng)arn集群
  2. 支持sql和jar任務(wù)
  3. 支持跳轉(zhuǎn)到y(tǒng)arn集群的任務(wù)鏈接
  4. 可以停止flink任務(wù)
  5. zepplin集群或者Interpreter重啟之后,yarn上面的flink任務(wù)不會停止
  6. zepplin集群重啟之后,原來是running狀態(tài)的任務(wù)自動加載。

開發(fā)zeppelin Interpreter

具體zeppelin的Interpreter的開發(fā)可以參考這篇文章。

https://zeppelin.apache.org/docs/0.9.0-preview1/development/writing_zeppelin_interpreter.html

核心的代碼就是繼承抽象類Interpreter,實現(xiàn)其中的幾個方法,我們簡單來講講。

public abstract class Interpreter {
    
  /**
  * 初始化的時候調(diào)用,可以在這個里面加一些系統(tǒng)初始化的工作,這個方法只調(diào)用一次。
  * 寫過flink自定義source和sink的同學(xué)應(yīng)該不會陌生。
   */
  @ZeppelinApi
  public abstract void open() throws InterpreterException;

  /**
   * 
   * 釋放Interpreter資源,也只會被調(diào)用一次。
   */
  @ZeppelinApi
  public abstract void close() throws InterpreterException;
    
    /**
   * 異步的運行輸入框里面的代碼并返回結(jié)果。.
   *
   * @param st 就是頁面那個框里你輸入的東西
   */
  @ZeppelinApi
  public abstract InterpreterResult interpret(String st,
                                              InterpreterContext context)
      throws InterpreterException;    
    
}

除了上面列出來的這幾個,還有其他的幾個,我這里就不羅列代碼了,大家有興趣的可以自己看下。

底層我使用的是flink application模式來提交的任務(wù),在open里面做一些提交flink初始化的工作,比如構(gòu)造配置文件,啟動yarnClient等等。在interpret方法解析內(nèi)容,執(zhí)行提交任務(wù)的工作。

最終我們實現(xiàn)了可以通過jar包和sql的方式來提交任務(wù)到y(tǒng)arn集群。

提交sql任務(wù)

我們可以指定一些任務(wù)的參數(shù),比如jobname,并行度、checkpoint間隔等等,頁面大概長這個樣子,提交任務(wù)之后,可以在yarn集群看到相關(guān)的任務(wù)。

在這里插入圖片描述

提交jar任務(wù)

首先把相應(yīng)的jar上傳到hdfs相關(guān)路徑,然后提交任務(wù)之前,指定jar的路徑,以及jobname、并行度等等,正文就不需要寫什么了,然后把這個任務(wù)提交到y(tǒng)arn集群。

在這里插入圖片描述

目前只是實現(xiàn)了一些核心的功能,還有一些其他的功能需要后續(xù)完善。

注意的點

  • zeppelin.interpreter.close.cancel_job
    設(shè)置為false,這樣的話停止集群或者interpreter的時候就不會停止任務(wù),否則的話,zeppelin會在停止集群的時候把所有的任務(wù)都cancel掉。
  • zeppelin.recovery.storage.class

在zeppelin-site里配置,改成org.apache.zeppelin.interpreter.recovery.FileSystemRecoveryStorage,這樣的話,就可以使zeppelin在重啟的時候,自動加載那些running狀態(tài)的任務(wù)。

  • zeppelin.recovery.dir
    在zeppelin-site里配置,路徑是nodebook的存儲路徑,也就是zeppelin.notebook.dir的配置項。(如果你配置的是hdfs存儲的話)

更多內(nèi)容,歡迎關(guān)注我的公眾號【大數(shù)據(jù)技術(shù)與應(yīng)用實戰(zhàn)】

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容