前言
最近我們正式調(diào)研Zeppelin作為Flink SQL開發(fā)套件的可能性,于是clone了最新的Zeppelin v0.10-SNAPSHOT源碼,自行編譯并部署到了預(yù)發(fā)布環(huán)境的新Flink集群中。Flink版本為1.13.0,Hadoop版本為CDH 6.3.2自帶的3.0.0。經(jīng)過兩天的探索,發(fā)現(xiàn)了一些問題,在百忙之中抽出點(diǎn)時(shí)間簡要記錄一下并不成功的troubleshooting過程。

Flink Interpreter不加載
安裝好Zeppelin并配置好Flink Interpreter的各項(xiàng)參數(shù)之后(采用生產(chǎn)環(huán)境推薦的Flink on YARN + Interpreter on YARN + Isolated Per Note模式),編寫Note無法執(zhí)行,提示找不到FlinkInterpreter類,如下圖所示。

排查:
- 去${ZEPPELIN_HOME}/interpreter/flink目錄下觀察,可以發(fā)現(xiàn)名為zeppelin-flink-0.10.0-SNAPSHOT-2.11/2.12.jar的兩個(gè)JAR包,并且FlinkInterpreter已經(jīng)被正確地打進(jìn)了JAR包里。
- 去zeppelin-env.sh中修改
ZEPPELIN_JAVA_OPTS環(huán)境變量,添加-verbose:class參數(shù)打印類加載日志,從中未發(fā)現(xiàn)任何以org.apache.zeppelin.flink為前綴的類被加載。 - 登錄Interpreter進(jìn)程所在的那臺(tái)NodeManager,查看Interpreter的臨時(shí)目錄,結(jié)構(gòu)如下圖。

但是,Interpreter進(jìn)程的classpath中并沒有zeppelin/interpreter/flink/*,自然無法加載Interpreter了。為什么會(huì)這樣?來到負(fù)責(zé)啟動(dòng)Interpreter的bin/interpreter.sh文件,第125行:
INTERPRETER_ID=$(basename "${INTERPRETER_DIR}")
if [[ "${INTERPRETER_ID}" != "flink" ]]; then
# don't add interpreter jar for flink, FlinkInterpreterLauncher will choose the right interpreter jar based
# on scala version of current FLINK_HOME.
addJarInDirForIntp "${INTERPRETER_DIR}"
fi
可見這里對Flink做了一個(gè)特殊的處理。根據(jù)注釋的描述,F(xiàn)linkInterpreterLauncher會(huì)根據(jù)用戶的Flink版本選擇對應(yīng)Scala版本的JAR包。查看該類的源碼,確實(shí)如此(有一個(gè)chooseFlinkAppJar()方法,略去)。然而繼續(xù)向上追蹤FlinkInterpreterLauncher的調(diào)用鏈,發(fā)現(xiàn)它并沒有在任何與YARN有關(guān)的方法中被使用,也就是說上面選擇JAR包的動(dòng)作根本沒發(fā)生。
由于我們?nèi)匀粌H使用基于Scala 2.11的Flink,故可以將目錄中的2.12包刪掉,并修改interpreter.sh注釋掉if語句,問題臨時(shí)解決。更好的解決方法是將上述的選擇JAR包邏輯寫入YARN Launcher內(nèi),但侵入性較大,留待今后操作。
YARN Application模式無效
根據(jù)文檔描述,YARN Application模式與普通的YARN模式相比會(huì)更節(jié)省資源,因?yàn)镴obManager和Interpreter跑在一個(gè)Container內(nèi),如下圖所示。

我們確認(rèn)與Hadoop相關(guān)的各項(xiàng)參數(shù)、環(huán)境變量都設(shè)置好之后,將Note的flink.execution.mode參數(shù)改為yarn-application,運(yùn)行之,報(bào)出如下異常。

對比一下上節(jié)貼出的Interpreter臨時(shí)目錄結(jié)構(gòu),容易發(fā)現(xiàn)這里的路徑是錯(cuò)的。來到FlinkScalaInterpreter類,將flinkHome、flinkConfDir、hiveConfDir做如下的修改。
mode = ExecutionMode.withName(
properties.getProperty("flink.execution.mode", "LOCAL")
.replace("-", "_")
.toUpperCase)
if (mode == ExecutionMode.YARN_APPLICATION) {
if (flinkVersion.isFlink110) {
throw new Exception("yarn-application mode is only supported after Flink 1.11")
}
// use current yarn container working directory as FLINK_HOME, FLINK_CONF_DIR and HIVE_CONF_DIR
val workingDirectory = new File(".").getAbsolutePath
flinkHome = workingDirectory + "/flink"
flinkConfDir = workingDirectory + "/flink/conf"
hiveConfDir = workingDirectory + "/hive_conf"
}
重新編譯打包并替換掉原來的Interpreter包,再次執(zhí)行,又報(bào)出如下異常,提示Application ID為空。

話休絮煩,直接貼出對應(yīng)的源碼:
val (effectiveConfig, cluster) = fetchConnectionInfo(config, configuration, flinkShims)
this.configuration = effectiveConfig
cluster match {
case Some(clusterClient) =>
// local mode or yarn
if (mode == ExecutionMode.LOCAL) {
LOGGER.info("Starting FlinkCluster in local mode")
this.jmWebUrl = clusterClient.getWebInterfaceURL
this.displayedJMWebUrl = this.jmWebUrl
} else if (mode == ExecutionMode.YARN) {
LOGGER.info("Starting FlinkCluster in yarn mode")
this.jmWebUrl = clusterClient.getWebInterfaceURL
val yarnAppId = HadoopUtils.getYarnAppId(clusterClient)
this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
} else {
throw new Exception("Starting FlinkCluster in invalid mode: " + mode)
}
case None =>
// yarn-application mode
if (mode == ExecutionMode.YARN_APPLICATION) {
// get yarnAppId from env `_APP_ID`
val yarnAppId = System.getenv("_APP_ID")
LOGGER.info("Use FlinkCluster in yarn application mode, appId: {}", yarnAppId)
this.jmWebUrl = "http://localhost:" + HadoopUtils.getFlinkRestPort(yarnAppId)
this.displayedJMWebUrl = getDisplayedJMWebUrl(yarnAppId)
} else {
LOGGER.info("Use FlinkCluster in remote mode")
this.jmWebUrl = "http://" + config.host.get + ":" + config.port.get
this.displayedJMWebUrl = getDisplayedJMWebUrl("")
}
}
@Internal
def fetchConnectionInfo(
config: Config,
flinkConfig: Configuration,
flinkShims: FlinkShims): (Configuration, Option[ClusterClient[_]]) = {
config.executionMode match {
case ExecutionMode.LOCAL => createLocalClusterAndConfig(flinkConfig)
case ExecutionMode.REMOTE => createRemoteConfig(config, flinkConfig)
case ExecutionMode.YARN => createYarnClusterIfNeededAndGetConfig(config, flinkConfig, flinkShims)
case ExecutionMode.YARN_APPLICATION => (flinkConfig, None)
case ExecutionMode.UNDEFINED => // Wrong input
throw new IllegalArgumentException("please specify execution mode:\n" +
"[local | remote <host> <port> | yarn | yarn-application ]")
}
}
上面的代碼有些令人迷惑:為什么YARN Application模式下沒有做任何操作,只是返回了一個(gè)空的ClusterClient?另外,_APP_ID是Flink ApplicationMaster啟動(dòng)時(shí)設(shè)置的環(huán)境變量,這樣操作一定可以拿得到么?
當(dāng)然這個(gè)問題比較復(fù)雜,筆者也尚未認(rèn)真研究過YARN Application模式相關(guān)的源碼,需要時(shí)間來處理。但可以肯定至少在我們的環(huán)境下,需要做較大的改動(dòng)才能讓它正常使用。在完全解決之前,仍然采用傳統(tǒng)YARN模式也無傷大雅。
配置Note只讀權(quán)限后無法切換視圖
為了保證安全,我們強(qiáng)制新建的Note都為私有(即Reader、Writer、Runner、Owner初始值都是用戶自己),然后按需對相關(guān)同學(xué)開放權(quán)限。
一般情況下,所有人都可以讀Note。但是只將Reader權(quán)限放開后,除Owner之外的人看到的都是白板。這是因?yàn)镹ote對只讀權(quán)限者變成了report視圖,只能看到結(jié)果,不展示SQL源碼,如下圖所示。

但是,如果嘗試切換成default視圖,就會(huì)提示需要Writer權(quán)限才可以:

這就有些匪夷所思了。將Zeppelin日志等級設(shè)為DEBUG,重復(fù)切換視圖操作,可以發(fā)現(xiàn)在NotebookServer的事件循環(huán)里產(chǎn)生了NOTE_UPDATE事件。
DEBUG [2021-07-28 19:03:52,097] ({qtp306612792-317} NotebookServer.java[onMessage]:255) - RECEIVE: NOTE_UPDATE, RECEIVE PRINCIPAL: bigdata_dev, RECEIVE TICKET: f9118802-14cd-40fc-8e60-caeb0267aac2, RECEIVE ROLES: ["role1"], RECEIVE DATA: {id=2GE65N3RS, name=WorkflowAliBinlog, config={isZeppelinNotebookCronEnable=false, looknfeel=default, personalizedMode=false}}
WARN [2021-07-28 19:03:52,098] ({qtp306612792-317} SimpleServiceCallback.java[onFailure]:50) - HTTP 403 Forbidden
這是因?yàn)镹ote的視圖風(fēng)格直接存儲(chǔ)在.zpln文件內(nèi)(叫做looknfeel),所以修改它就相當(dāng)于修改Note了 = =

將NOTE_UPDATE的權(quán)限賦給Reader顯然不現(xiàn)實(shí),考慮到我們幾乎不會(huì)用到simple和report視圖,將simple視圖作為只讀的情況比較合適。
但是,來到zeppelin-web項(xiàng)目下之后,發(fā)現(xiàn)代碼只讀性、代碼編輯器的可見性和視圖之間的耦合過緊,改了數(shù)十處HTML和JS代碼之后仍然未能達(dá)到想要的效果。經(jīng)過試驗(yàn),只讀用戶還是可以看到非HEAD commit的代碼的,切換版本湊合也能用,此事暫時(shí)擱置。
Zeppelin日志被"Saving note"信息淹沒
我們采用的Notebook Repo是GitNotebookRepo(本地)+FileSystemNotebookRepo(遠(yuǎn)程HDFS)的組合。啟動(dòng)了幾個(gè)Flink SQL任務(wù)之后,在Zeppelin日志中看到如下格式的信息刷屏。
INFO [2021-07-30 19:32:10,160] ({pool-11-thread-16} VFSNotebookRepo.java[save]:144) - Saving note 2GDVAVC4W to etl-mq/analytics_access_log_app_2GDVAVC4W.zpln
這是因?yàn)樵诿總€(gè)Note對應(yīng)作業(yè)的JobManager中,都會(huì)啟動(dòng)一個(gè)名為FlinkJobProgressPoller的線程,以zeppelin.flink.job.check_interval的間隔(默認(rèn)1秒,我們改成了5秒)檢查并更新任務(wù)的狀態(tài)。如上一節(jié)所述,這些信息也都保存在.zpln文件內(nèi),所以會(huì)導(dǎo)致頻繁寫文件。并且這個(gè)線程做的事情非常多,代碼如下所示。
@Override
public void run() {
while (!Thread.currentThread().isInterrupted() && running.get()) {
JsonNode rootNode = null;
try {
synchronized (running) {
running.wait(checkInterval);
}
rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
.asJson().getBody();
JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
int totalTasks = 0;
int finishedTasks = 0;
for (int i = 0; i < vertices.length(); ++i) {
JSONObject vertex = vertices.getJSONObject(i);
totalTasks += vertex.getInt("parallelism");
finishedTasks += vertex.getJSONObject("tasks").getInt("FINISHED");
}
LOGGER.debug("Total tasks:" + totalTasks);
LOGGER.debug("Finished tasks:" + finishedTasks);
if (finishedTasks != 0) {
this.progress = finishedTasks * 100 / totalTasks;
LOGGER.debug("Progress: " + this.progress);
}
String jobState = rootNode.getObject().getString("state");
if (jobState.equalsIgnoreCase("finished")) {
break;
}
long duration = rootNode.getObject().getLong("duration") / 1000;
if (isStreamingInsertInto) {
if (isFirstPoll) {
StringBuilder builder = new StringBuilder("%angular ");
builder.append("<h1>Duration: {{duration}} </h1>");
builder.append("\n%text ");
context.out.clear(false);
context.out.write(builder.toString());
context.out.flush();
isFirstPoll = false;
}
context.getAngularObjectRegistry().add("duration",
toRichTimeDuration(duration),
context.getNoteId(),
context.getParagraphId());
}
// fetch checkpoints info and save the latest checkpoint into paragraph's config.
rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString() + "/checkpoints")
.asJson().getBody();
if (rootNode.getObject().has("latest")) {
JSONObject latestObject = rootNode.getObject().getJSONObject("latest");
if (latestObject.has("completed") && latestObject.get("completed") instanceof JSONObject) {
JSONObject completedObject = latestObject.getJSONObject("completed");
if (completedObject.has("external_path")) {
String checkpointPath = completedObject.getString("external_path");
LOGGER.debug("Latest checkpoint path: {}", checkpointPath);
if (!StringUtils.isBlank(checkpointPath) && !checkpointPath.equals(latestCheckpointPath)
Map<String, String> config = new HashMap<>();
config.put(LATEST_CHECKPOINT_PATH, checkpointPath);
context.getIntpEventClient().updateParagraphConfig(
context.getNoteId(), context.getParagraphId(), config);
latestCheckpointPath = checkpointPath;
}
}
}
}
} catch (Exception e) {
LOGGER.error("Fail to poll flink job progress via rest api", e);
}
}
}
考慮到直接對它下手的復(fù)雜度,目前只能暫時(shí)在log4j配置中屏蔽掉VFSNotebookRepo的INFO日志輸出。隨著今后任務(wù)增多,會(huì)繼續(xù)評估Zeppelin Server和磁盤的壓力,并盡可能尋找優(yōu)化的方法。
The End
其實(shí)還有個(gè)Maven Repo解析與添加Nexus私服認(rèn)證方面的問題,但這個(gè)更復(fù)雜,并且與Flink無關(guān),就不廢話了。
民那周末快樂~