Apache Flink 可以使用多用方式在不同的環(huán)境中部署,相對于部署環(huán)境的多樣性而言,F(xiàn)link集群的基本構(gòu)建方式仍是相同的,所應(yīng)用的操作原則也是相似的。
在本教程中,你將會學(xué)習(xí)如何管理和運行Flink任務(wù),了解如何部署和監(jiān)控應(yīng)用程序,F(xiàn)link如何從失敗的任務(wù)中恢復(fù),并且進行一些日常操作任務(wù)的演練,比如升級和擴容
場景說明
本場景需要的操作環(huán)境為:一個長期運行的Flink Session Cluster 和一個kafka集群。
Flink集群由一個JobManager以及一個或多個TaskManager組成。JobManager負責(zé)Job提交、監(jiān)控和資源的管理。TaskManagers中運行Worker進程,它負責(zé)執(zhí)行那些構(gòu)成Flink Job 的實際任務(wù)的執(zhí)行。在本教程中,將會從啟動一個單一的TaskManager開始,但是之后會擴展到運行多個TaskManager。在這里我們使用一個專用的Client Container用于提交Flink Job,并在之后執(zhí)行各種操作。Flink集群本身并不需要Client Container,只是為了使用方便,才引入了它。
kafka集群由一個 Zookeeper Server 和 一個 Kafka Broker組成。

開始的時候,向JobManager提交一個名為 Flink Event Count的 Flink Job 。此外,還創(chuàng)建了兩個 kafka Topic input和output。

該Job從 kafka 主題 input中消費點擊事件clickEvents, 每個點擊事件都包含一個時間戳 timestamp和一個page屬性。這些事件以page為key進行分組,并且基于 15s的時間窗口進行計數(shù)。結(jié)果數(shù)據(jù)被寫入到kafka主題output。
There are six different pages and we generate 1000 click events per page and 15 seconds. Hence, the output of the Flink job should show 1000 views per page and window.(這段后半句不知如何翻譯更加妥當(dāng)姑且在下面給出自己的理解)
總共有6種不同的 page,每個page每 15s 會產(chǎn)生 1000個點擊事件。因此,F(xiàn)link Job的輸出對于每個 page 的每個時間窗口 應(yīng)該顯示 1000個視圖。
環(huán)境搭建
操作的環(huán)境只需要簡單的幾個步驟就可以搭建起來。我們會引導(dǎo)你完成必要的操作命令,并且展示如何驗證所有正在運行的 是正確的。
我們假定你已經(jīng)在機器上部署了 Docker(1.12+) 和 dokcer-compose(2.1+)。
所需要的配置文件在flink-playgrounds倉庫中,拉取到本地并啟動Docker 環(huán)境:
git clone --branch release-1.12 https://github.com/apache/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose build
docker-compose up -d
之后,你可以運行下面的命令來查看正在運行的 Docker容器:
docker-compose ps
Name Command State Ports
-----------------------------------------------------------------------------------------------------------------------------
operations-playground_clickevent-generator_1 /docker-entrypoint.sh java ... Up 6123/tcp, 8081/tcp
operations-playground_client_1 /docker-entrypoint.sh flin ... Exit 0
operations-playground_jobmanager_1 /docker-entrypoint.sh jobm ... Up 6123/tcp, 0.0.0.0:8081->8081/tcp
operations-playground_kafka_1 start-kafka.sh Up 0.0.0.0:9094->9094/tcp
operations-playground_taskmanager_1 /docker-entrypoint.sh task ... Up 6123/tcp, 8081/tcp
operations-playground_zookeeper_1 /bin/sh -c /usr/sbin/sshd ... Up 2181/tcp, 22/tcp, 2888/tcp, 3888/tcp
上面的信息顯示Client container已經(jīng)成功提交 Flink job 并退出,和數(shù)據(jù)生成器一樣集群所有的組件都處于正在運行的狀態(tài)。
你可以通過下面的命令 停止 Docker正在運行的環(huán)境:
docker-compose down -v
環(huán)境講解
在搭建好的環(huán)境中你可以嘗試和驗證很多事情。在下面兩部分內(nèi)容中,我們將向你展示如何與Flink集群進行交互,并演示Flink的一些關(guān)鍵特性。
Flink WebUI
觀察Flink集群自然是以 Flink WebUI作為出發(fā)點,默認鏈接是 [http://localhost:8081](http://localhost:8081)。如果一切運行正常,你可以看到Flink WebUI 的首頁包含一個TaskManager和正在運行的Job Click Event Count。
Flink WebUI 包含了 關(guān)于Flink集群及集群上的Job 的 大量有用且有趣的信息,例如
JobGraph``Metrics``Checkpointing Statistics``TaskManager Status等
日志
JobManager
JobManager日志可以通過 docker -compose 命令進行查看
docker-compose logs -f jobmanager
在JobManager初始化啟動完成之后,你應(yīng)該主要查看每一個檢查點完成的日志信息。
TaskManager
TaskManager日志的查看方式和JobManager相同。
docker-compose logs -f taskmanager
Flink CLI
Flink CLI 相關(guān)命令可以在 Client Container容器內(nèi)使用。例如,你可以使用下面的命令打印出關(guān)于 Flink CLI的幫助信息
docker-compose run --no-deps client flink --help
Flink REST API
Flink REST API 可以通過本機的 localhost:8081 進行訪問,也可以在Client Container中通過 jobmanager:8081進行訪問。例如,使用下面的命令獲取所有正在運行的Job:
curl localhost:8081/jobs
Kafka Topics
你可以運行以下命令查看寫入kafka 的數(shù)據(jù)記錄:
//input topic (1000 records/s)
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic input
//output topic (24 records/min)
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
核心特性探索
現(xiàn)在,你已經(jīng)學(xué)會了如何與Flink以及Docker容器進行交互,讓我們來看一看常見的操作命令。所有的這些命令都是相互獨立的,也就是說你可以按照任何順序去執(zhí)行它們。大多數(shù)的命令都可以通過 CLI 和 REST API 去執(zhí)行。
列出正在運行的Job
- CLI
命令
docker-compose run --no-deps client flink list
預(yù)期輸出
Waiting for response...
------------------ Running/Restarting Jobs -------------------
16.07.2019 16:37:55 : <job-id> : Click Event Count (RUNNING)
--------------------------------------------------------------
No scheduled jobs.
- REST API
請求
curl localhost:8081/jobs
預(yù)期響應(yīng)結(jié)果
{
"jobs": [
{
"id": "<job-id>",
"status": "RUNNING"
}
]
}
作業(yè)在提交時會被分配一個 JobID,通過CLI或者REST API 進行操作時需要攜帶它。
Job 失敗與恢復(fù)
在Job(部分)失敗的情況下,Flink對事件處理依然能夠提供精確一次的保障。在本次練習(xí)中你可以觀察到并在一定程度上榜驗證這種行為。
步驟1:觀察輸出
如上所述,在本次練習(xí)中事件正好按照每個窗口包含1000條記錄的規(guī)則生成。因此,你可以跟蹤輸出的主題,并檢查恢復(fù)后 所有的窗口都存在并且計數(shù)是正確的,以證明Flink成功地從Taskmanager故障中恢復(fù)并且沒有造成數(shù)據(jù)的丟失或重復(fù)。
為此,需要從開始讀取output主題的數(shù)據(jù)直到故障恢復(fù)之后:
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
步驟2:模擬失敗
可以通過殺死TaskManager進程來模擬局部故障,這與在生產(chǎn)環(huán)境中,TaskManager進程的丟失、運行Taskmanager的機器故障,或者僅僅是用戶代碼或框架本身拋出的暫時性異常等相對應(yīng)。
docker-compose kill taskmanager
幾秒鐘之后,JobManager將會注意到TaskManager進程的丟失,然后取消受影響的Job,立馬重新提交以恢復(fù)該Job。當(dāng)Job重啟后,它包含的任務(wù)仍然處于 SCHEDULED狀態(tài)。如下圖中紫色方框所示:

注意:雖然Job的所有任務(wù)都處于SCHEDULED狀態(tài),但整個Job的狀態(tài)卻顯示為 RUNNING。
此時,J由于沒有可使用的資源(TaskManager提供的 TaskSlots),Job中任務(wù)的狀態(tài)不會從 SCHEDULED切換到 RUNNING。在直到有新的TaskManager可用之前,Job將會經(jīng)歷一個不斷取消和重新提交的循環(huán)。
與此同時,數(shù)據(jù)生成器會保持向 input主題寫入 ClickEvents數(shù)據(jù)。在哪生產(chǎn)環(huán)境中也經(jīng)常出現(xiàn)Job掛掉但源頭還在不斷產(chǎn)生數(shù)據(jù)的情況。
步驟3: 失敗恢復(fù)
一旦TaskManager重啟成功,它將會自動重新連接 JobManager。
docker-compose up -d taskmanager
當(dāng) TaskManager注冊成功之后,JobManager 就會將處于 SCHEDULED 狀態(tài)的所有任務(wù)調(diào)度到該 TaskManager的可用TaskSlots中運行,此時所有的任務(wù)將會從失敗前最近一次成功的 checkpoint進行恢復(fù),一旦恢復(fù)成功,它們的狀態(tài)將轉(zhuǎn)變?yōu)?RUNNING。
接下來該Job將快速處理 kafka input事件的全部積壓(在Job中斷1期間累積的數(shù)據(jù)),并且以更快的速度(> 24條記錄/分鐘)產(chǎn)生輸出,直到它追上 kafka 的 lag 延遲為止。此時觀察 output主題的輸出,對于所有 key (page) 的 每一個時間窗口的計數(shù)剛好是1000.由于我們使用的 是 FlinkKafkaProducer的至少一次的 模式,所以有一定的幾率看到一些重復(fù)的 結(jié)果數(shù)據(jù)出現(xiàn)。
注意:在大部分生產(chǎn)環(huán)境中都需要一個資源管理器 (Kubernetes、Yarn,、Mesos)對 失敗的 Job 進行自動重啟
Job升級與擴容
升級Flink Job 一般需要兩步:
第一,使用 保存點 savepoint優(yōu)雅地停止 Flink Job。Savepoint是在定義明確、全局一致的的時間點生成的 應(yīng)用程序的完整一致性快照。
第二,從 Savepoint 啟動升級后的 Flink Job。 在此,“升級”包含如下幾種含義:
- 配置升級(比如 Job 并行度修改)
- Job 拓撲升級(比如添加或者刪除算子)
- Job 的用戶自定義函數(shù)升級
在開始升級之前,你可能需要實時查看_Output_topic 輸出, 以便觀察在升級過程中沒有數(shù)據(jù)丟失或損壞。
docker-compose exec kafka kafka-console-consumer.sh \
--bootstrap-server localhost:9092 --topic output
步驟1:停止Job
要優(yōu)雅停止 Job,需要使用 JobID 通過 CLI 或 REST API 調(diào)用 stop 命令。 你可以通過 l列出所有正在運行的Job的相關(guān)命令或 Flink WebUI 界面獲取JobID,然后可以使用JobID來停止對應(yīng)的Job進程:
- CLI
命令
docker-compose run --no-deps client flink stop <job-id>
**預(yù)期輸出**
Suspending job "<job-id>" with a savepoint.
Savepoint completed. Path: file:<savepoint-path>
- RESTAPI
請求
# triggering stop
curl -X POST localhost:8081/jobs/<job-id>/stop -d '{"drain": false}'
**預(yù)期響應(yīng)結(jié)果**
{
"request-id": "<trigger-id>"
}
請求
# check status of stop action and retrieve savepoint path
curl localhost:8081/jobs/<job-id>/savepoints/<trigger-id>
預(yù)期響應(yīng)結(jié)果
Savepoint 已保存在state.savepoint.dir指定的路徑中,該配置在flink-conf.yaml中定義,flink-conf.yaml在本機的/tmp/flink-savepoints-directory/目錄下。 在下一步操作中我們會用到這個 Savepoint 路徑,如果我們是通過 REST API 操作的, 那么 Savepoint 路徑會隨著響應(yīng)結(jié)果一起返回,我們可以直接查看文件系統(tǒng)來確認 Savepoint 保存情況。
步驟2a: 重啟Job(不做任何變更)
現(xiàn)在你可以從Savepoint重啟升級后的Job。這里為了簡單起見在重啟之前并沒有對程序做任何修改。
- CLI
命令
docker-compose run --no-deps client flink run -s <savepoint-path> \
-d /opt/ClickCountJob.jar \
--bootstrap.servers kafka:9092 --checkpointing --event-time
預(yù)期輸出
Job has been submitted with JobID <job-id>
- REST API
請求
# Uploading the JAR from the Client container
docker-compose run --no-deps client curl -X POST -H "Expect:" \
-F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload
預(yù)期響應(yīng)結(jié)果
{
"filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
"status": "success"
}
請求
# Submitting the Job
curl -X POST http://localhost:8081/jars/<jar-id>/run \
-d '{"programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'
預(yù)期響應(yīng)結(jié)果
{
"jobid": "<job-id>"
}
一旦該 Job 再次處于RUNNING狀態(tài),你將從output主題中看到數(shù)據(jù)在以更高的速率產(chǎn)生, 這是由于剛啟動的 Job 正在處理停止期間積壓的大量數(shù)據(jù)。另外,你還會看到在升級期間 沒有產(chǎn)生任何數(shù)據(jù)丟失:所有窗口的計數(shù)都剛好是1000。
步驟2b: 重啟Job(修改并行度)
在從 Savepoint 重啟 Job 之前,你還可以通過修改并行度來達到擴容 Job 的目的。
- CLI
命令
docker-compose run --no-deps client flink run -p 3 -s <savepoint-path> \
-d /opt/ClickCountJob.jar \
--bootstrap.servers kafka:9092 --checkpointing --event-time
預(yù)期輸出
Starting execution of program
Job has been submitted with JobID <job-id>
- REST API
請求
# Uploading the JAR from the Client container
docker-compose run --no-deps client curl -X POST -H "Expect:" \
-F "jarfile=@/opt/ClickCountJob.jar" http://jobmanager:8081/jars/upload
預(yù)期響應(yīng)結(jié)果
{
"filename": "/tmp/flink-web-<uuid>/flink-web-upload/<jar-id>",
"status": "success"
}
請求
# Submitting the Job
curl -X POST http://localhost:8081/jars/<jar-id>/run \
-d '{"parallelism": 3, "programArgs": "--bootstrap.servers kafka:9092 --checkpointing --event-time", "savepointPath": "<savepoint-path>"}'
預(yù)期響應(yīng)結(jié)果
{
"jobid": "<job-id>"
}
現(xiàn)在 Job 已重新提交,但由于我們提高了并行度所以導(dǎo)致 TaskSlots 不夠用(1 個 TaskSlot 可用,總共需要 3 個),最終 Job 會重啟失敗。通過如下命令:
docker-compose scale taskmanager=2
你可以向 Flink 集群添加第二個 TaskManager(為 Flink 集群提供 2 個 TaskSlots 資源), 它會自動向 JobManager 注冊,TaskManager 注冊完成后,Job 會再次處于 “RUNNING” 狀態(tài)。
一旦 Job 再次運行起來,從output主題 的輸出中你會看到在擴容期間數(shù)據(jù)依然沒有丟失: 所有窗口的計數(shù)都正好是 1000。
查詢Job指標(biāo)
可以通過 JobManager 提供的 REST API 來獲取系統(tǒng)和用戶指標(biāo)
具體請求方式取決于我們想查詢哪類指標(biāo),Job 相關(guān)的指標(biāo)分類可通過jobs/<job-id>/metrics獲得,而要想查詢某類指標(biāo)的具體值則可以在請求地址后跟上get參數(shù)。
請求
curl "localhost:8081/jobs/<jod-id>/metrics?get=lastCheckpointSize"
預(yù)期響應(yīng)結(jié)果
[
{
"id": "lastCheckpointSize",
"value": "9378"
}
]
REST API 不僅可以用于查詢指標(biāo),還可以用于獲取正在運行中的 Job 詳細信息
請求
# 可以從結(jié)果中獲取感興趣的 vertex-id
curl localhost:8081/jobs/<jod-id>
預(yù)期響應(yīng)結(jié)果
{
"jid": "<job-id>",
"name": "Click Event Count",
"isStoppable": false,
"state": "RUNNING",
"start-time": 1564467066026,
"end-time": -1,
"duration": 374793,
"now": 1564467440819,
"timestamps": {
"CREATED": 1564467066026,
"FINISHED": 0,
"SUSPENDED": 0,
"FAILING": 0,
"CANCELLING": 0,
"CANCELED": 0,
"RECONCILING": 0,
"RUNNING": 1564467066126,
"FAILED": 0,
"RESTARTING": 0
},
"vertices": [
{
"id": "<vertex-id>",
"name": "ClickEvent Source",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066423,
"end-time": -1,
"duration": 374396,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 0,
"read-bytes-complete": true,
"write-bytes": 5033461,
"write-bytes-complete": true,
"read-records": 0,
"read-records-complete": true,
"write-records": 166351,
"write-records-complete": true
}
},
{
"id": "<vertex-id>",
"name": "Timestamps/Watermarks",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066441,
"end-time": -1,
"duration": 374378,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 5066280,
"read-bytes-complete": true,
"write-bytes": 5033496,
"write-bytes-complete": true,
"read-records": 166349,
"read-records-complete": true,
"write-records": 166349,
"write-records-complete": true
}
},
{
"id": "<vertex-id>",
"name": "ClickEvent Counter",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066469,
"end-time": -1,
"duration": 374350,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 5085332,
"read-bytes-complete": true,
"write-bytes": 316,
"write-bytes-complete": true,
"read-records": 166305,
"read-records-complete": true,
"write-records": 6,
"write-records-complete": true
}
},
{
"id": "<vertex-id>",
"name": "ClickEventStatistics Sink",
"parallelism": 2,
"status": "RUNNING",
"start-time": 1564467066476,
"end-time": -1,
"duration": 374343,
"tasks": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 2,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"metrics": {
"read-bytes": 20668,
"read-bytes-complete": true,
"write-bytes": 0,
"write-bytes-complete": true,
"read-records": 6,
"read-records-complete": true,
"write-records": 0,
"write-records-complete": true
}
}
],
"status-counts": {
"CREATED": 0,
"FINISHED": 0,
"DEPLOYING": 0,
"RUNNING": 4,
"CANCELING": 0,
"FAILED": 0,
"CANCELED": 0,
"RECONCILING": 0,
"SCHEDULED": 0
},
"plan": {
"jid": "<job-id>",
"name": "Click Event Count",
"nodes": [
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "ClickEventStatistics Sink",
"inputs": [
{
"num": 0,
"id": "<vertex-id>",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}
],
"optimizer_properties": {}
},
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "ClickEvent Counter",
"inputs": [
{
"num": 0,
"id": "<vertex-id>",
"ship_strategy": "HASH",
"exchange": "pipelined_bounded"
}
],
"optimizer_properties": {}
},
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "Timestamps/Watermarks",
"inputs": [
{
"num": 0,
"id": "<vertex-id>",
"ship_strategy": "FORWARD",
"exchange": "pipelined_bounded"
}
],
"optimizer_properties": {}
},
{
"id": "<vertex-id>",
"parallelism": 2,
"operator": "",
"operator_strategy": "",
"description": "ClickEvent Source",
"optimizer_properties": {}
}
]
}
}
請查閱REST API 參考,該參考上有完整的指標(biāo)查詢接口信息,包括如何查詢不同種類的指標(biāo)(例如 TaskManager 指標(biāo))。
延伸擴展
你可能已經(jīng)注意到了`Click Event Count`這個 Job 在啟動時總是會帶上`--checkpointing`和 `--event-time`兩個參數(shù)。如果我們?nèi)コ@兩個參數(shù),那么 Job 的行為也會隨之改變。
--checkpointing參數(shù)開啟了checkpoint配置,checkpoint是 Flink 容錯機制的重要保證。 如果你沒有開啟checkpoint,那么當(dāng)Job失敗并重啟后,你將會看到數(shù)據(jù)丟失現(xiàn)象發(fā)生。-
--event-time參數(shù)開啟了 Job 的事件時間機制,該機制會使用ClickEvent自帶的時間戳進行統(tǒng)計。 如果不指定該參數(shù),F(xiàn)link 將結(jié)合當(dāng)前機器時間使用事件處理時間進行統(tǒng)計。如此一來,每個窗口計數(shù)將不再是準確的 1000 了。Click Event Count這個 Job 還有另外一個選項,該選項默認是關(guān)閉的,你可以在client container的docker-compose.yaml文件中添加該選項從而觀察該 Job 在反壓下的表現(xiàn),該選項描述如下: --backpressure將一個額外算子添加到 Job 中,該算子會在偶數(shù)分鐘內(nèi)產(chǎn)生嚴重的反壓(比如:10:12 期間,而 10:13 期間不會)。這種現(xiàn)象可以通過多種網(wǎng)絡(luò)指標(biāo)觀察到,比如:outputQueueLength和outPoolUsage指標(biāo),通過 WebUI 上的反壓監(jiān)控也可以觀察到。