最新在線上更新了代碼之后導(dǎo)致了任務(wù)在消費(fèi)kafka數(shù)據(jù)的時(shí)候,突然就不消費(fèi)數(shù)據(jù)了,發(fā)現(xiàn)原因在公司的可視化界面中,看不到數(shù)據(jù)的更新,進(jìn)入flink監(jiān)控頁(yè)面中看到任務(wù)沒(méi)有failover過(guò)的記錄

雖然任務(wù)在正常的運(yùn)行中,但實(shí)際情況是已經(jīng)不消費(fèi)數(shù)據(jù)了,最開(kāi)始以為代碼有問(wèn)題,經(jīng)過(guò)檢查發(fā)現(xiàn)代碼沒(méi)有問(wèn)題,然后查看checkpoint的情況,發(fā)現(xiàn)了問(wèn)題,flink在做checkpoint的時(shí)候出現(xiàn)了失敗

在查看tm和jm的日志,tm日志中沒(méi)有任何報(bào)錯(cuò),但是在jm日志中看到一個(gè)有用的信息,說(shuō)在做checkpoint的時(shí)候checkpoint超時(shí),導(dǎo)致了checkpoint失敗,后來(lái)我通過(guò)

因?yàn)樾氯蝿?wù)增加了state的使用,之前checkpoint的超時(shí)時(shí)間設(shè)置為1s,我以為是因?yàn)闋顟B(tài)大了導(dǎo)致checkpoint超時(shí),然后我更改了超時(shí)時(shí)間,設(shè)置成了1分鐘,但運(yùn)行一段時(shí)間后也發(fā)現(xiàn)同樣的錯(cuò)誤,在錯(cuò)誤中發(fā)現(xiàn)了一個(gè)問(wèn)題,在KeyedProcess算子中,有一個(gè)subtask acknowledgement Time為N/A,導(dǎo)致了checkpoint的超時(shí),因?yàn)樗腶ck Time沒(méi)有,使barrier一直處于對(duì)齊的過(guò)程中,導(dǎo)致checkpoint超時(shí)而失敗

在解決這個(gè)問(wèn)題之前,我看一下kafka的分區(qū)的數(shù)據(jù)出現(xiàn)了不平均的情況,以為是因?yàn)閗afka分區(qū)的數(shù)據(jù)原因,導(dǎo)致了flink source消費(fèi)不到數(shù)據(jù),無(wú)法發(fā)送barrier,但是后來(lái)仔細(xì)想了一下,checkpoint的barrier是自動(dòng)生成的然后發(fā)送到下游,也就是跟消費(fèi)數(shù)據(jù)是沒(méi)有關(guān)系的,那么也就是我flink的任務(wù)鏈整個(gè)都堵塞了,使barrier無(wú)法正常發(fā)送到下游,使checkpoint失敗,那么也就是因?yàn)檎麄€(gè)任務(wù)鏈的堵塞導(dǎo)致了無(wú)法在消費(fèi)新的數(shù)據(jù),
根據(jù)上面的線索我開(kāi)始排查代碼,和測(cè)試代碼一共找到兩個(gè)位置出現(xiàn)了問(wèn)題

1.在異步IO的時(shí)候,因?yàn)槲姨砑恿诉^(guò)濾條件,導(dǎo)致asyncInvoke方法不能正常的返回結(jié)果,導(dǎo)致一直處于堵塞的狀態(tài),最終使超過(guò)了capacity(容量),從而導(dǎo)致整個(gè)鏈路堵塞,即不消費(fèi)kafka數(shù)據(jù)
紅色圈圈之前沒(méi)有注釋導(dǎo)致了數(shù)據(jù)來(lái)了無(wú)法正常的發(fā)送下游,最終導(dǎo)致整個(gè)鏈路堵塞
灰色圈圈是需要注意的地方 這是我后加上去的,因?yàn)樵谶@里有一個(gè)異常,如果出現(xiàn)了異常也無(wú)法正常調(diào)用到complete方法,也會(huì)導(dǎo)致鏈路的堵塞,所以當(dāng)出現(xiàn)異常通過(guò)completeExceptionally方法來(lái)異常結(jié)束掉

在這里提一下,異步IO中 resultFuture.completeExceptionally方法和resultFuture.complete,也是通過(guò)這次問(wèn)題學(xué)習(xí)到的地方
這兩個(gè)方法任何一個(gè)被調(diào)用的時(shí)候都會(huì)生成一個(gè) triggered 標(biāo)志,代表這個(gè)CompletableFuture狀態(tài)變?yōu)橥瓿?在源碼中可以看到,這里順便看一下源碼
調(diào)用過(guò)程
1.asyncInvoke方法中調(diào)用 resultFuture.completeExceptionally - > StreamRecordQueueEntry.completeExceptionally -> CompletableFuture.completeExceptionally , 完成最終的觸發(fā)
public boolean completeExceptionally(Throwable ex) {
if (ex == null) throw new NullPointerException();
boolean triggered = internalComplete(new AltResult(ex));
postComplete();
return triggered;
}
2.asyncInvoke方法中調(diào)用 resultFuture.complete -> StreamRecordQueueEntry.complete -> CompletableFuture.complete , 完成最終的觸發(fā)
public boolean complete(T value) {
boolean triggered = completeValue(value);
postComplete();
return triggered;
}
flink的異步IO 最終需要調(diào)用complete 或者completeExceptionally來(lái)完成一次觸發(fā),否則會(huì)因?yàn)槌^(guò)capacity而阻塞整個(gè)鏈路

2.在我的keyedProcess的onTimer方法中,錯(cuò)誤的刪除了所有定時(shí)器,導(dǎo)致無(wú)法在觸發(fā)新的定時(shí)器,是數(shù)據(jù)無(wú)法正常發(fā)送到下游,也間接的導(dǎo)致了checkpoint的失敗

改完了代碼之后重新執(zhí)行任務(wù)已經(jīng)可以正常的運(yùn)行了,并且checkpoint也沒(méi)有失敗過(guò)
