周末了,不想搞長篇大論,就寫寫這樣的流水賬吧。
Flink的常見異常眾多,不可能面面俱到,所以想到哪兒寫到哪兒,有漏掉的之后再補充。

部署和資源問題
(0) JDK版本過低
這不是個顯式錯誤,但是JDK版本過低很有可能會導(dǎo)致Flink作業(yè)出現(xiàn)各種莫名其妙的問題,因此在生產(chǎn)環(huán)境中建議采用JDK 8的較高update(我們使用的是181)。
(1) Could not build the program from JAR file
該信息不甚準(zhǔn)確,因為絕大多數(shù)情況下都不是JAR包本身有毛病,而是在作業(yè)提交過程中出現(xiàn)異常退出了。因此需要查看本次提交產(chǎn)生的客戶端日志(默認位于$FLINK_HOME/logs目錄下),再根據(jù)其中的信息定位并解決問題。
(2) ClassNotFoundException/NoSuchMethodError/IncompatibleClassChangeError/...
一般都是因為用戶依賴第三方包的版本與Flink框架依賴的版本有沖突導(dǎo)致。如果是采用Maven做項目管理的話,可參照我之前寫的這篇文章來解決沖突。
(3) Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster
就是字面意思,YARN集群內(nèi)沒有足夠的資源啟動Flink作業(yè)。檢查一下當(dāng)前YARN集群的狀態(tài)、正在運行的YARN App以及Flink作業(yè)所處的隊列,釋放一些資源或者加入新的資源。
(4) java.util.concurrent.TimeoutException: Slot allocation request timed out
slot分配請求超時,是因為TaskManager申請資源時無法正常獲得,按照上一條的思路檢查即可。
(5) org.apache.flink.util.FlinkException: The assigned slot <container_id> was removed
TaskManager的Container因為使用資源超限被kill掉了。首先需要保證每個slot分配到的內(nèi)存量足夠,特殊情況下可以手動配置SlotSharingGroup來減少單個slot中共享Task的數(shù)量。如果資源沒問題,那么多半就是程序內(nèi)部發(fā)生了內(nèi)存泄露。建議仔細查看TaskManager日志,并按處理JVM OOM問題的常規(guī)操作來排查。
(6) java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id <tm_id>timed out
TaskManager心跳超時。有可能是TaskManager已經(jīng)失敗,如果沒有失敗,那么有可能是因為網(wǎng)絡(luò)不好導(dǎo)致JobManager沒能收到心跳信號,或者TaskManager忙于GC,無法發(fā)送心跳信號。JobManager會重啟心跳超時的TaskManager,如果頻繁出現(xiàn)此異常,應(yīng)該通過日志進一步定位問題所在。
Flink on YARN的其他問題,還可以參考這篇,非常有幫助。
作業(yè)問題
(1) org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
該異常幾乎都是由于程序業(yè)務(wù)邏輯有誤,或者數(shù)據(jù)流里存在未處理好的臟數(shù)據(jù)導(dǎo)致的,繼續(xù)向下追溯異常棧一般就可以看到具體的出錯原因,比較常見的如POJO內(nèi)有空字段,或者抽取事件時間的時間戳為null等。
(2) java.lang.IllegalStateException: Buffer pool is destroyed || Memory manager has been shut down
很多童鞋拿著這兩條異常信息來求助,但實際上它們只是表示BufferPool、MemoryManager這些Flink運行時組件被銷毀,亦即作業(yè)已經(jīng)失敗。具體的原因多種多樣,根據(jù)經(jīng)驗,一般是上一條描述的情況居多(即Could not forward element to next operator錯誤會伴隨出現(xiàn)),其次是JDK版本問題。具體情況還是要根據(jù)TaskManager日志具體分析。
(3) akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://...]] after [10000 ms]
Akka超時導(dǎo)致,一般有兩種原因:一是集群負載比較大或者網(wǎng)絡(luò)比較擁塞,二是業(yè)務(wù)邏輯同步調(diào)用耗時的外部服務(wù)。如果負載或網(wǎng)絡(luò)問題無法徹底緩解,需考慮調(diào)大akka.ask.timeout參數(shù)的值(默認只有10秒);另外,調(diào)用外部服務(wù)時盡量異步操作(Async I/O)。
(4) java.io.IOException: Too many open files
這個異常我們應(yīng)該都不陌生,首先檢查系統(tǒng)ulimit -n的文件描述符限制,再注意檢查程序內(nèi)是否有資源(如各種連接池的連接)未及時釋放。值得注意的是,F(xiàn)link使用RocksDB狀態(tài)后端也有可能會拋出這個異常,此時需修改flink-conf.yaml中的state.backend.rocksdb.files.open參數(shù),如果不限制,可以改為-1。
關(guān)于文件描述符的一些有趣知識,可以參見之前我寫的這一篇。
(5) org.apache.flink.api.common.function.InvalidTypesException: The generic type parameters of '<class>' are missing
在Flink內(nèi)使用Java Lambda表達式時,由于類型擦除造成的副作用(詳情見這篇文章),注意調(diào)用returns()方法指定被擦除的類型。
檢查點和狀態(tài)問題
(1) Received checkpoint barrier for checkpoint <cp_id> before completing current checkpoint <cp_id>. Skipping current checkpoint
在當(dāng)前檢查點還未做完時,收到了更新的檢查點的barrier,表示當(dāng)前檢查點不再需要而被取消掉,一般不需要特殊處理。
(2) Checkpoint <cp_id> expired before completing
首先應(yīng)檢查CheckpointConfig.setCheckpointTimeout()方法設(shè)定的檢查點超時,如果設(shè)的太短,適當(dāng)改長一點。另外就是考慮發(fā)生了反壓或數(shù)據(jù)傾斜,或者barrier對齊太慢。具體思路不再贅述,看官可以參考這篇文章,非常詳細。
(3) org.apache.flink.util.StateMigrationException: The new state serializer cannot be incompatible
我們知道Flink的狀態(tài)是按key組織并保存的,如果程序邏輯內(nèi)改了keyBy()邏輯或者key的序列化邏輯,就會導(dǎo)致檢查點/保存點的數(shù)據(jù)無法正確恢復(fù)。所以如果必須要改key相關(guān)的東西,就棄用之前的狀態(tài)數(shù)據(jù)吧。
(4) org.apache.flink.util.StateMigrationException: The new serializer for a MapState requires state migration in order for the job to proceed. However, migration for MapState currently isn't supported
在1.9之前的Flink版本中,如果我們使用RocksDB狀態(tài)后端,并且更改了自用MapState的schema,恢復(fù)作業(yè)時會拋出此異常,表示不支持更改schema。這個問題已經(jīng)在FLINK-11947解決,升級版本即可。
The End
就醬吧,民那晚安(不是