For each checkpoint we create new FlinkKafkaProducer so that new transactions will not clash with transactions created during previous checkpoints (producer.initTransactions() assures that we obtain new producerId and epoch counters).
核心:
snapshot存儲pendingTxn
待恢復(fù)時pendingTXN對應(yīng)的ckid<restore id 的pendingtxn一定要保證提交。
默認實現(xiàn)中,
跨繪畫事務(wù),新建 txn ,會分配新的pid,
此時若是恢復(fù)后,進行resume and commit 可能會誤提交
在Kafka中
用相同的TID,producer掛了,新啟的producer會abort之前尚未提交的記錄,單由于notify在快照之后,故恢復(fù)時需要resume txn,kafka不支持resume txn,只能新建produer ,用反射設(shè)置PID,epoch,在resume txn。
單強行resume 事務(wù),commit會導(dǎo)致后續(xù)相同的Tid的事務(wù)被commit。
存儲的狀態(tài)落后于實際提交。
正常情況下,commit后一定能保證成功。
恢復(fù)了歷史pid 和 epoch 的producer ,commit時,不驗證pid ,epoch。
initTransaction
Needs to be called before any other methods when the transactional.id is set in the configuration. This method does the following: 1. Ensures any transactions initiated by previous instances of the producer with the same transactional.id are completed. If the previous instance had failed with a transaction in progress, it will be aborted. If the last transaction had begun completion, but not yet finished, this method awaits its completion. 2. Gets the internal producer id and epoch, used in all future transactional messages issued by the producer.
abort epoch之前的數(shù)據(jù),fence zombie
pid是在新建連接時隨機分配,用來保證單會話,單partition冪等
public void resumeTransaction(long producerId, short epoch)
恢復(fù)TID PID epoch對應(yīng)的txn,并提交
若不新建連接,則是相同的pid,epoch,會錯誤提交。
必須用相同的事務(wù)id fence之前的事務(wù)
流程
recoverandcommit
commit 快照時可能為提交的狀態(tài)(保存了狀態(tài)但是尚未notify的數(shù)據(jù)未收到notify)
之后abort poolsize (abort尚未保存狀態(tài))
由于快照失敗導(dǎo)致并發(fā)度是1時 依然可能會超過poolsize存在。
若擴大poolsize ,在notify之后才真正回收,但是無法加入狀態(tài)。