記錄Flink1.9線上checkpoint失敗的問(wèn)題

最新在線上更新了代碼之后導(dǎo)致了任務(wù)在消費(fèi)kafka數(shù)據(jù)的時(shí)候,突然就不消費(fèi)數(shù)據(jù)了,發(fā)現(xiàn)原因在公司的可視化界面中,看不到數(shù)據(jù)的更新,進(jìn)入flink監(jiān)控頁(yè)面中看到任務(wù)沒(méi)有failover過(guò)的記錄

任務(wù)界面

雖然任務(wù)在正常的運(yùn)行中,但實(shí)際情況是已經(jīng)不消費(fèi)數(shù)據(jù)了,最開(kāi)始以為代碼有問(wèn)題,經(jīng)過(guò)檢查發(fā)現(xiàn)代碼沒(méi)有問(wèn)題,然后查看checkpoint的情況,發(fā)現(xiàn)了問(wèn)題,flink在做checkpoint的時(shí)候出現(xiàn)了失敗

checkpoint失敗界面

在查看tm和jm的日志,tm日志中沒(méi)有任何報(bào)錯(cuò),但是在jm日志中看到一個(gè)有用的信息,說(shuō)在做checkpoint的時(shí)候checkpoint超時(shí),導(dǎo)致了checkpoint失敗,后來(lái)我通過(guò)

JM日志

因?yàn)樾氯蝿?wù)增加了state的使用,之前checkpoint的超時(shí)時(shí)間設(shè)置為1s,我以為是因?yàn)闋顟B(tài)大了導(dǎo)致checkpoint超時(shí),然后我更改了超時(shí)時(shí)間,設(shè)置成了1分鐘,但運(yùn)行一段時(shí)間后也發(fā)現(xiàn)同樣的錯(cuò)誤,在錯(cuò)誤中發(fā)現(xiàn)了一個(gè)問(wèn)題,在KeyedProcess算子中,有一個(gè)subtask acknowledgement Time為N/A,導(dǎo)致了checkpoint的超時(shí),因?yàn)樗腶ck Time沒(méi)有,使barrier一直處于對(duì)齊的過(guò)程中,導(dǎo)致checkpoint超時(shí)而失敗

問(wèn)題

在解決這個(gè)問(wèn)題之前,我看一下kafka的分區(qū)的數(shù)據(jù)出現(xiàn)了不平均的情況,以為是因?yàn)閗afka分區(qū)的數(shù)據(jù)原因,導(dǎo)致了flink source消費(fèi)不到數(shù)據(jù),無(wú)法發(fā)送barrier,但是后來(lái)仔細(xì)想了一下,checkpoint的barrier是自動(dòng)生成的然后發(fā)送到下游,也就是跟消費(fèi)數(shù)據(jù)是沒(méi)有關(guān)系的,那么也就是我flink的任務(wù)鏈整個(gè)都堵塞了,使barrier無(wú)法正常發(fā)送到下游,使checkpoint失敗,那么也就是因?yàn)檎麄€(gè)任務(wù)鏈的堵塞導(dǎo)致了無(wú)法在消費(fèi)新的數(shù)據(jù),

根據(jù)上面的線索我開(kāi)始排查代碼,和測(cè)試代碼一共找到兩個(gè)位置出現(xiàn)了問(wèn)題

kafka監(jiān)控頁(yè)面

1.在異步IO的時(shí)候,因?yàn)槲姨砑恿诉^(guò)濾條件,導(dǎo)致asyncInvoke方法不能正常的返回結(jié)果,導(dǎo)致一直處于堵塞的狀態(tài),最終使超過(guò)了capacity(容量),從而導(dǎo)致整個(gè)鏈路堵塞,即不消費(fèi)kafka數(shù)據(jù)
紅色圈圈之前沒(méi)有注釋導(dǎo)致了數(shù)據(jù)來(lái)了無(wú)法正常的發(fā)送下游,最終導(dǎo)致整個(gè)鏈路堵塞
灰色圈圈是需要注意的地方 這是我后加上去的,因?yàn)樵谶@里有一個(gè)異常,如果出現(xiàn)了異常也無(wú)法正常調(diào)用到complete方法,也會(huì)導(dǎo)致鏈路的堵塞,所以當(dāng)出現(xiàn)異常通過(guò)completeExceptionally方法來(lái)異常結(jié)束掉

異步IO問(wèn)題

在這里提一下,異步IO中 resultFuture.completeExceptionally方法和resultFuture.complete,也是通過(guò)這次問(wèn)題學(xué)習(xí)到的地方
這兩個(gè)方法任何一個(gè)被調(diào)用的時(shí)候都會(huì)生成一個(gè) triggered 標(biāo)志,代表這個(gè)CompletableFuture狀態(tài)變?yōu)橥瓿?在源碼中可以看到,這里順便看一下源碼

調(diào)用過(guò)程

1.asyncInvoke方法中調(diào)用 resultFuture.completeExceptionally - > StreamRecordQueueEntry.completeExceptionally -> CompletableFuture.completeExceptionally , 完成最終的觸發(fā)

    public boolean completeExceptionally(Throwable ex) {
        if (ex == null) throw new NullPointerException();
        boolean triggered = internalComplete(new AltResult(ex));
        postComplete();
        return triggered;
    }

2.asyncInvoke方法中調(diào)用 resultFuture.complete -> StreamRecordQueueEntry.complete -> CompletableFuture.complete , 完成最終的觸發(fā)

    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }

flink的異步IO 最終需要調(diào)用complete 或者completeExceptionally來(lái)完成一次觸發(fā),否則會(huì)因?yàn)槌^(guò)capacity而阻塞整個(gè)鏈路

image.png

2.在我的keyedProcess的onTimer方法中,錯(cuò)誤的刪除了所有定時(shí)器,導(dǎo)致無(wú)法在觸發(fā)新的定時(shí)器,是數(shù)據(jù)無(wú)法正常發(fā)送到下游,也間接的導(dǎo)致了checkpoint的失敗

onTimer代碼

改完了代碼之后重新執(zhí)行任務(wù)已經(jīng)可以正常的運(yùn)行了,并且checkpoint也沒(méi)有失敗過(guò)

flink web

記錄一下自己的解決過(guò)程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容