Flink on Zeppelin問題四則(并沒有優(yōu)雅的解決方案

前言

最近我們正式調(diào)研Zeppelin作為Flink SQL開發(fā)套件的可能性,于是clone了最新的Zeppelin v0.10-SNAPSHOT源碼,自行編譯并部署到了預(yù)發(fā)布環(huán)境的新Flink集群中。Flink版本為1.13.0,Hadoop版本為CDH 6.3.2自帶的3.0.0。經(jīng)過兩天的探索,發(fā)現(xiàn)了一些問題,在百忙之中抽出點(diǎn)時(shí)間簡要記錄一下并不成功的troubleshooting過程。

Flink Interpreter不加載

安裝好Zeppelin并配置好Flink Interpreter的各項(xiàng)參數(shù)之后(采用生產(chǎn)環(huán)境推薦的Flink on YARN + Interpreter on YARN + Isolated Per Note模式),編寫Note無法執(zhí)行,提示找不到FlinkInterpreter類,如下圖所示。

排查:

  • 去${ZEPPELIN_HOME}/interpreter/flink目錄下觀察,可以發(fā)現(xiàn)名為zeppelin-flink-0.10.0-SNAPSHOT-2.11/2.12.jar的兩個(gè)JAR包,并且FlinkInterpreter已經(jīng)被正確地打進(jìn)了JAR包里。
  • 去zeppelin-env.sh中修改ZEPPELIN_JAVA_OPTS環(huán)境變量,添加-verbose:class參數(shù)打印類加載日志,從中未發(fā)現(xiàn)任何以org.apache.zeppelin.flink為前綴的類被加載。
  • 登錄Interpreter進(jìn)程所在的那臺(tái)NodeManager,查看Interpreter的臨時(shí)目錄,結(jié)構(gòu)如下圖。

但是,Interpreter進(jìn)程的classpath中并沒有zeppelin/interpreter/flink/*,自然無法加載Interpreter了。為什么會(huì)這樣?來到負(fù)責(zé)啟動(dòng)Interpreter的bin/interpreter.sh文件,第125行:

INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
if [[ "${INTERPRETER_ID}" != "flink" ]]; then
  # don't add interpreter jar for flink, FlinkInterpreterLauncher will choose the right interpreter jar based
  # on scala version of current FLINK_HOME.
  addJarInDirForIntp "${INTERPRETER_DIR}"
fi

可見這里對Flink做了一個(gè)特殊的處理。根據(jù)注釋的描述,F(xiàn)linkInterpreterLauncher會(huì)根據(jù)用戶的Flink版本選擇對應(yīng)Scala版本的JAR包。查看該類的源碼,確實(shí)如此(有一個(gè)chooseFlinkAppJar()方法,略去)。然而繼續(xù)向上追蹤FlinkInterpreterLauncher的調(diào)用鏈,發(fā)現(xiàn)它并沒有在任何與YARN有關(guān)的方法中被使用,也就是說上面選擇JAR包的動(dòng)作根本沒發(fā)生。

由于我們?nèi)匀粌H使用基于Scala 2.11的Flink,故可以將目錄中的2.12包刪掉,并修改interpreter.sh注釋掉if語句,問題臨時(shí)解決。更好的解決方法是將上述的選擇JAR包邏輯寫入YARN Launcher內(nèi),但侵入性較大,留待今后操作。

YARN Application模式無效

根據(jù)文檔描述,YARN Application模式與普通的YARN模式相比會(huì)更節(jié)省資源,因?yàn)镴obManager和Interpreter跑在一個(gè)Container內(nèi),如下圖所示。

我們確認(rèn)與Hadoop相關(guān)的各項(xiàng)參數(shù)、環(huán)境變量都設(shè)置好之后,將Note的flink.execution.mode參數(shù)改為yarn-application,運(yùn)行之,報(bào)出如下異常。

對比一下上節(jié)貼出的Interpreter臨時(shí)目錄結(jié)構(gòu),容易發(fā)現(xiàn)這里的路徑是錯(cuò)的。來到FlinkScalaInterpreter類,將flinkHomeflinkConfDir、hiveConfDir做如下的修改。

mode = ExecutionMode.withName(
  properties.getProperty("flink.execution.mode", "LOCAL")
    .replace("-", "_")
    .toUpperCase)
if (mode == ExecutionMode.YARN_APPLICATION) {
  if (flinkVersion.isFlink110) {
    throw new Exception("yarn-application mode is only supported after Flink 1.11")
  }
  // use current yarn container working directory as FLINK_HOME, FLINK_CONF_DIR and HIVE_CONF_DIR
  val workingDirectory = new File(".").getAbsolutePath
  flinkHome = workingDirectory + "/flink"
  flinkConfDir = workingDirectory + "/flink/conf"
  hiveConfDir = workingDirectory + "/hive_conf"
}

重新編譯打包并替換掉原來的Interpreter包,再次執(zhí)行,又報(bào)出如下異常,提示Application ID為空。

話休絮煩,直接貼出對應(yīng)的源碼:

val (effectiveConfig, cluster) = fetchConnectionInfo(config, configuration, flinkShims)
this.configuration = effectiveConfig
cluster match {
  case Some(clusterClient) =>
    // local mode or yarn
    if (mode == ExecutionMode.LOCAL) {
      LOGGER.info("Starting FlinkCluster in local mode")
      this.jmWebUrl = clusterClient.getWebInterfaceURL
      this.displayedJMWebUrl = this.jmWebUrl
    } else if (mode == ExecutionMode.YARN) {
      LOGGER.info("Starting FlinkCluster in yarn mode")
      this.jmWebUrl = clusterClient.getWebInterfaceURL
      val yarnAppId = HadoopUtils.getYarnAppId(clusterClient)
      this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
    } else {
      throw new Exception("Starting FlinkCluster in invalid mode: " + mode)
    }
  case None =>
    // yarn-application mode
    if (mode == ExecutionMode.YARN_APPLICATION) {
      // get yarnAppId from env `_APP_ID`
      val yarnAppId = System.getenv("_APP_ID")
      LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
      this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
      this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
    } else {
      LOGGER.info("Use FlinkCluster in remote mode")
      this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
      this.displayedJMWebUrl = getDisplayedJMWebUrl("")
    }
}

@Internal
def fetchConnectionInfo(
    config: Config,
    flinkConfig: Configuration,
    flinkShims: FlinkShims): (Configuration, Option[ClusterClient[_]]) = {
  config.executionMode match {
    case ExecutionMode.LOCAL => createLocalClusterAndConfig(flinkConfig)
    case ExecutionMode.REMOTE => createRemoteConfig(config, flinkConfig)
    case ExecutionMode.YARN => createYarnClusterIfNeededAndGetConfig(config, flinkConfig, flinkShims)
    case ExecutionMode.YARN_APPLICATION => (flinkConfig, None)
    case ExecutionMode.UNDEFINED => // Wrong input
      throw new IllegalArgumentException("please specify execution mode:\n" +
        "[local | remote <host> <port> | yarn | yarn-application ]")
  }
}

上面的代碼有些令人迷惑:為什么YARN Application模式下沒有做任何操作,只是返回了一個(gè)空的ClusterClient?另外,_APP_ID是Flink ApplicationMaster啟動(dòng)時(shí)設(shè)置的環(huán)境變量,這樣操作一定可以拿得到么?

當(dāng)然這個(gè)問題比較復(fù)雜,筆者也尚未認(rèn)真研究過YARN Application模式相關(guān)的源碼,需要時(shí)間來處理。但可以肯定至少在我們的環(huán)境下,需要做較大的改動(dòng)才能讓它正常使用。在完全解決之前,仍然采用傳統(tǒng)YARN模式也無傷大雅。

配置Note只讀權(quán)限后無法切換視圖

為了保證安全,我們強(qiáng)制新建的Note都為私有(即Reader、Writer、Runner、Owner初始值都是用戶自己),然后按需對相關(guān)同學(xué)開放權(quán)限。

一般情況下,所有人都可以讀Note。但是只將Reader權(quán)限放開后,除Owner之外的人看到的都是白板。這是因?yàn)镹ote對只讀權(quán)限者變成了report視圖,只能看到結(jié)果,不展示SQL源碼,如下圖所示。

但是,如果嘗試切換成default視圖,就會(huì)提示需要Writer權(quán)限才可以:

這就有些匪夷所思了。將Zeppelin日志等級設(shè)為DEBUG,重復(fù)切換視圖操作,可以發(fā)現(xiàn)在NotebookServer的事件循環(huán)里產(chǎn)生了NOTE_UPDATE事件。

DEBUG [2021-07-28 19:03:52,097] ({qtp306612792-317} NotebookServer.java[onMessage]:255) - RECEIVE: NOTE_UPDATE, RECEIVE PRINCIPAL: bigdata_dev, RECEIVE TICKET: f9118802-14cd-40fc-8e60-caeb0267aac2, RECEIVE ROLES: ["role1"], RECEIVE DATA: {id=2GE65N3RS, name=WorkflowAliBinlog, config={isZeppelinNotebookCronEnable=false, looknfeel=default, personalizedMode=false}}
 WARN [2021-07-28 19:03:52,098] ({qtp306612792-317} SimpleServiceCallback.java[onFailure]:50) - HTTP 403 Forbidden

這是因?yàn)镹ote的視圖風(fēng)格直接存儲(chǔ)在.zpln文件內(nèi)(叫做looknfeel),所以修改它就相當(dāng)于修改Note了 = =

將NOTE_UPDATE的權(quán)限賦給Reader顯然不現(xiàn)實(shí),考慮到我們幾乎不會(huì)用到simple和report視圖,將simple視圖作為只讀的情況比較合適。

但是,來到zeppelin-web項(xiàng)目下之后,發(fā)現(xiàn)代碼只讀性、代碼編輯器的可見性和視圖之間的耦合過緊,改了數(shù)十處HTML和JS代碼之后仍然未能達(dá)到想要的效果。經(jīng)過試驗(yàn),只讀用戶還是可以看到非HEAD commit的代碼的,切換版本湊合也能用,此事暫時(shí)擱置。

Zeppelin日志被"Saving note"信息淹沒

我們采用的Notebook Repo是GitNotebookRepo(本地)+FileSystemNotebookRepo(遠(yuǎn)程HDFS)的組合。啟動(dòng)了幾個(gè)Flink SQL任務(wù)之后,在Zeppelin日志中看到如下格式的信息刷屏。

INFO [2021-07-30 19:32:10,160] ({pool-11-thread-16} VFSNotebookRepo.java[save]:144) - Saving note 2GDVAVC4W to etl-mq/analytics_access_log_app_2GDVAVC4W.zpln

這是因?yàn)樵诿總€(gè)Note對應(yīng)作業(yè)的JobManager中,都會(huì)啟動(dòng)一個(gè)名為FlinkJobProgressPoller的線程,以zeppelin.flink.job.check_interval的間隔(默認(rèn)1秒,我們改成了5秒)檢查并更新任務(wù)的狀態(tài)。如上一節(jié)所述,這些信息也都保存在.zpln文件內(nèi),所以會(huì)導(dǎo)致頻繁寫文件。并且這個(gè)線程做的事情非常多,代碼如下所示。

@Override
public void run() {
  while (!Thread.currentThread().isInterrupted() && running.get()) {
    JsonNode rootNode = null;
    try {
      synchronized (running) {
        running.wait(checkInterval);
      }
      rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
              .asJson().getBody();
      JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
      int totalTasks = 0;
      int finishedTasks = 0;
      for (int i = 0; i < vertices.length(); ++i) {
        JSONObject vertex = vertices.getJSONObject(i);
        totalTasks += vertex.getInt("parallelism");
        finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED");
      }
      LOGGER.debug("Total tasks:" + totalTasks);
      LOGGER.debug("Finished tasks:" + finishedTasks);
      if (finishedTasks != 0) {
        this.progress = finishedTasks * 100 / totalTasks;
        LOGGER.debug("Progress: " + this.progress);
      }
      String jobState = rootNode.getObject().getString("state");
      if (jobState.equalsIgnoreCase("finished")) {
        break;
      }

      long duration = rootNode.getObject().getLong("duration") / 1000;
      if (isStreamingInsertInto) {
        if (isFirstPoll) {
          StringBuilder builder = new StringBuilder("%angular ");
          builder.append("<h1>Duration: {{duration}} </h1>");
          builder.append("\n%text ");
          context.out.clear(false);
          context.out.write(builder.toString());
          context.out.flush();
          isFirstPoll = false;
        }
        context.getAngularObjectRegistry().add("duration",
                toRichTimeDuration(duration),
                context.getNoteId(),
                context.getParagraphId());
      }

      // fetch checkpoints info and save the latest checkpoint into paragraph's config.
      rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString() + "/checkpoints")
              .asJson().getBody();
      if (rootNode.getObject().has("latest")) {
        JSONObject latestObject = rootNode.getObject().getJSONObject("latest");
        if (latestObject.has("completed") && latestObject.get("completed") instanceof JSONObject) {
          JSONObject completedObject = latestObject.getJSONObject("completed");
          if (completedObject.has("external_path")) {
            String checkpointPath = completedObject.getString("external_path");
            LOGGER.debug("Latest checkpoint path: {}", checkpointPath);
            if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)
              Map<String, String> config = new HashMap<>();
              config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
              context.getIntpEventClient().updateParagraphConfig(
                      context.getNoteId(), context.getParagraphId(), config);
              latestCheckpointPath = checkpointPath;
            }
          }
        }
      }
    } catch (Exception e) {
      LOGGER.error("Fail to poll flink job progress via rest api", e);
    }
  }
}

考慮到直接對它下手的復(fù)雜度,目前只能暫時(shí)在log4j配置中屏蔽掉VFSNotebookRepo的INFO日志輸出。隨著今后任務(wù)增多,會(huì)繼續(xù)評估Zeppelin Server和磁盤的壓力,并盡可能尋找優(yōu)化的方法。

The End

其實(shí)還有個(gè)Maven Repo解析與添加Nexus私服認(rèn)證方面的問題,但這個(gè)更復(fù)雜,并且與Flink無關(guān),就不廢話了。

民那周末快樂~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容