上一篇中提到flink+kafka如何做到任務(wù)級(jí)順序保證,而端到端一致性即為實(shí)現(xiàn)用戶數(shù)據(jù)目標(biāo)端與源端的準(zhǔn)確一致,當(dāng)源端數(shù)據(jù)發(fā)生更改時(shí),保證目標(biāo)端及時(shí)、正確、持久的寫入更改數(shù)據(jù)。為實(shí)現(xiàn)端到端一致性應(yīng)在順序保證的基礎(chǔ)上,實(shí)現(xiàn)一致性語(yǔ)義exactly once的保證??v觀各底層組件:Debezium、Kafka、Flink構(gòu)成了端到端一致性中至關(guān)重要的每一環(huán),應(yīng)充分考慮、分析各組件的一致性語(yǔ)義特性的支持。
為實(shí)現(xiàn)exactly once語(yǔ)義的一致性,必須提供處理過(guò)程的容錯(cuò)性以及處理結(jié)果的冪等性。處理過(guò)程的容錯(cuò)性是指當(dāng)計(jì)算過(guò)程中數(shù)據(jù)還沒(méi)插入目標(biāo)端就發(fā)生出現(xiàn)網(wǎng)絡(luò)異常等情況導(dǎo)致程序重啟,會(huì)出現(xiàn)數(shù)據(jù)丟失的問(wèn)題,為此source端必須可重設(shè)數(shù)據(jù)的讀取位置,同時(shí)配合Flink內(nèi)部的 checkpoint機(jī)制來(lái)解決這個(gè)問(wèn)題。Kafka可以設(shè)置讀取的offset,每次做 checkpoint 的時(shí)候,會(huì)把當(dāng)前消費(fèi) Kafka 的 offset、計(jì)算結(jié)果等寫入到狀態(tài)后端中。任務(wù)異?;謴?fù)的時(shí)候,只需要從最近的一次成功的checkpoint 中拿到 offset 和計(jì)算結(jié)果,從這個(gè)地方接著開(kāi)始消費(fèi)和計(jì)算即可。冪等性指的是要求sink端從故障恢復(fù)時(shí),數(shù)據(jù)不會(huì)因?yàn)橹卦O(shè)讀取位置和重新計(jì)算,而被重復(fù)寫入到外部系統(tǒng)。
flink為不同數(shù)據(jù)源提供了不同的一致性保證:
AT-MOST-ONCE(最多一次):當(dāng)任務(wù)故障時(shí),最簡(jiǎn)單的做法是什么都不干,既不恢復(fù)丟失的狀態(tài),也不重播丟失的數(shù)據(jù)。At-most-once 語(yǔ)義的含義是最多處理一次事件。
AT-LEAST-ONCE(至少一次):在大多數(shù)的真實(shí)應(yīng)用場(chǎng)景,我們希望不丟失事件。這種類型的保障稱為 at least-once,意思是所有的事件都得到了處理,而一些事件還可能被處理多次。
EXACTLY-ONCE(精確一次):恰好處理一次是最嚴(yán)格的保證,也是最難實(shí)現(xiàn)的。恰好處理一次語(yǔ)義不僅僅意味著沒(méi)有事件丟失,還意味著針對(duì)每一個(gè)數(shù)據(jù),內(nèi)部狀態(tài)僅僅更新一次。
<table class="wrapped fixed-table confluenceTable"><colgroup><col style="width: 154.0px;" /><col style="width: 280.0px;" /><col style="width: 194.0px;" /><col style="width: 335.0px;" /></colgroup><thead><tr><th class="confluenceTh" colspan="1">component</th><th class="confluenceTh" style="text-align: left;">Source</th><th class="confluenceTh" style="text-align: left;">Sink</th><th class="confluenceTh" colspan="1">Notes</th></tr></thead><tbody><tr><td class="confluenceTd" colspan="1">Apache Kafka</td><td class="confluenceTd">exactly once</td><td class="confluenceTd">at least once / exactly once</td><td class="confluenceTd" colspan="1">exactly once with transactional producers (v 0.11+)</td></tr><tr><td class="confluenceTd" colspan="1">AWS Kinesis Streams</td><td class="confluenceTd">exactly once</td><td class="confluenceTd">at least once</td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">RabbitMQ</td><td class="confluenceTd">at most once (v 0.10) / exactly once (v 1.0)</td><td class="confluenceTd"><br /></td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Twitter Streaming API</td><td class="confluenceTd">at most once</td><td class="confluenceTd"><br /></td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Google PubSub</td><td class="confluenceTd">at least once</td><td class="confluenceTd"><br /></td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Collections</td><td class="confluenceTd">exactly once</td><td class="confluenceTd"><br /></td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Files</td><td class="confluenceTd">exactly once</td><td class="confluenceTd">exactly once</td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Sockets</td><td class="confluenceTd">at most once</td><td class="confluenceTd">at least once</td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Cassandra?</td><td class="confluenceTd" colspan="1"><br /></td><td class="confluenceTd" colspan="1">at least once / exactly once</td><td class="confluenceTd" colspan="1">exactly once only for idempotent updates</td></tr><tr><td class="confluenceTd" colspan="1">Elasticsearch</td><td class="confluenceTd" colspan="1"><br /></td><td class="confluenceTd" colspan="1">at least once</td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Standard output</td><td class="confluenceTd" colspan="1"><br /></td><td class="confluenceTd" colspan="1">at least once</td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">Redis?</td><td class="confluenceTd" colspan="1"><br /></td><td class="confluenceTd" colspan="1">at least once</td><td class="confluenceTd" colspan="1"><br /></td></tr><tr><td class="confluenceTd" colspan="1">JDBC</td><td class="confluenceTd" colspan="1"><br /></td><td class="confluenceTd" colspan="1">at least once</td><td class="confluenceTd" colspan="1"><br /></td></tr></tbody></table>
綜上,sink端冪等性是實(shí)現(xiàn)一致性語(yǔ)義的重要難點(diǎn)。
冪等性
一個(gè)相同的操作, 無(wú)論重復(fù)多少次, 造成的效果都和只操作一次相等;比如更新一個(gè)key/Value, 無(wú)論你update多少次, 只要key和value不變,那么效果是一樣的;再比如更新計(jì)數(shù)器處理一次消息就計(jì)數(shù)器+1, 這個(gè)操作本身不冪等, 同一個(gè)消息被中間件重"發(fā)+收"兩次就會(huì)造成計(jì)數(shù)器統(tǒng)計(jì)兩次;而如果我們的消息有id, 那么更新計(jì)數(shù)器的邏輯修改為, 把處理過(guò)的消息的id全記錄起來(lái), 接到消息先查重, 然后才更新計(jì)數(shù)器, 那么這個(gè)"更新計(jì)數(shù)器的邏輯"就變成冪等操作了。
把本不冪等的操作轉(zhuǎn)化為冪等操作是end to end consistency的關(guān)鍵之一。
冪等性問(wèn)題延申
確定性計(jì)算和冪等有些類似, 不過(guò)是針對(duì)一個(gè)計(jì)算;相同的input必得到相同的output, 則是一個(gè)確定性計(jì)算;比如從一個(gè)msg里計(jì)算出一個(gè)key和一個(gè)value, 如果對(duì)同一個(gè)消息運(yùn)算無(wú)數(shù)次得到的key和value都相同, 那么這個(gè)計(jì)算就是確定性的, 而如果key里加上一個(gè)當(dāng)前的時(shí)鐘的字符串表示, 那么這個(gè)計(jì)算就不是確定性的, 因?yàn)槿绻匦掠?jì)算一次這個(gè)msg, 得到的是完全不同的key。
注意1: 非確定性計(jì)算一般會(huì)導(dǎo)致不冪等的操作, 比如我們?nèi)绻焉线吚永锏膋ey/value存在數(shù)據(jù)庫(kù)里, 重復(fù)處理多少次同一個(gè)msg, 我們就會(huì)重復(fù)的插入多少條數(shù)據(jù)(因?yàn)閗ey里的時(shí)間戳字符串不同。
注意2: 非確定性計(jì)算并非必然導(dǎo)致不冪等的操作,比如這個(gè)時(shí)間戳沒(méi)有添加在key里而是添加在value里, 且key總是相同的, 那么這個(gè)計(jì)算還是"非確定性"計(jì)算;但是當(dāng)我們存數(shù)據(jù)的時(shí)候先查重才存key/value, 那么無(wú)論我們重復(fù)處理多少次同一個(gè)msg, 我們也只會(huì)成功存入第一個(gè)key/Value, 之后的key/Value都會(huì)被過(guò)濾掉。
缺陷
- 僅在目標(biāo)端表有主鍵的情況下適用,適用于HBase、Redis和Cassandra這樣的KV數(shù)據(jù)庫(kù);也可以通過(guò)給消息設(shè)置 SequcenceId 實(shí)現(xiàn)去重。
- 無(wú)法處理非確定性計(jì)算。
- 需要注意的是,也會(huì)出現(xiàn)中間狀態(tài)短暫的不一致,最終結(jié)果一致的情景。
預(yù)寫日志
預(yù)寫日志(Write-ahead logging,以下簡(jiǎn)稱 WAL)是關(guān)系數(shù)據(jù)庫(kù)系統(tǒng)中用于提供原子性和持久性(ACID屬性中的兩個(gè))的一系列技術(shù)。在使用WAL的系統(tǒng)中,所有的修改在生效之前都要先寫入log文件中。
log文件中通常包括redo和undo信息。這樣做的目的可以通過(guò)一個(gè)例子來(lái)說(shuō)明。假設(shè)一個(gè)程序在執(zhí)行某些操作的過(guò)程中機(jī)器掉電了。在重新啟動(dòng)時(shí),程序可能需要知道當(dāng)時(shí)執(zhí)行的操作是成功了還是部分成功或者是失敗了。如果使用了WAL,程序就可以檢查log文件,并對(duì)突然掉電時(shí)計(jì)劃執(zhí)行的操作內(nèi)容跟實(shí)際上執(zhí)行的操作內(nèi)容進(jìn)行比較。在這個(gè)比較的基礎(chǔ)上,程序就可以決定是撤銷已做的操作還是繼續(xù)完成已做的操作,或者是保持原樣。
缺陷
- 微批處理,存在一定延遲。
- 不能保證一批數(shù)據(jù)全部成功,且按批寫入的時(shí)候若沒(méi)有做事務(wù)隔離,過(guò)程中發(fā)生故障恢復(fù)后就會(huì)導(dǎo)致重復(fù)寫入。
- 讀和寫可以并發(fā)執(zhí)行,不會(huì)互相阻塞(但是寫之間仍然不能并發(fā))。
兩階段提交
對(duì)于關(guān)系型數(shù)據(jù)庫(kù)來(lái)說(shuō),開(kāi)啟事務(wù)即可避免此問(wèn)題,但對(duì)于一個(gè)分布式處理系統(tǒng),如何開(kāi)啟一個(gè)分布式事務(wù),或者目標(biāo)端本身是否支持(分布式)事務(wù)成為關(guān)鍵。
一般意義的兩階段提交
兩階段提交(Two-phase Commit,以下簡(jiǎn)稱 2PC)是指在計(jì)算機(jī)網(wǎng)絡(luò)以及數(shù)據(jù)庫(kù)領(lǐng)域內(nèi),為了使基于分布式系統(tǒng)架構(gòu)下的所有節(jié)點(diǎn)在進(jìn)行事務(wù)提交時(shí)保持一致性而設(shè)計(jì)的一種算法。通常,2PC也被稱為是一種協(xié)議(Protocol)。在分布式系統(tǒng)中,每個(gè)節(jié)點(diǎn)雖然可以知曉自己的操作時(shí)成功或者失敗,卻無(wú)法知道其他節(jié)點(diǎn)的操作的成功或失敗。當(dāng)一個(gè)事務(wù)跨越多個(gè)節(jié)點(diǎn)時(shí),為了保持事務(wù)的ACID特性,需要引入一個(gè)作為協(xié)調(diào)者的組件來(lái)統(tǒng)一掌控所有節(jié)點(diǎn)(稱作參與者)的操作結(jié)果并最終指示這些節(jié)點(diǎn)是否要把操作結(jié)果進(jìn)行真正的提交(比如將更新后的數(shù)據(jù)寫入磁盤等等)。因此,2PC的算法思路可以概括為: 參與者將操作成敗通知協(xié)調(diào)者,再由協(xié)調(diào)者根據(jù)所有參與者的反饋信息決定各參與者是否要提交操作還是中止操作。
要求
外部 sink 系統(tǒng)必須提供事務(wù)支持,或者 sink 任務(wù)必須能夠模擬外部系統(tǒng)上的事務(wù)。
Flink中的兩階段提交
Flink 將2PC的邏輯放在 checkpoint 的過(guò)程之中,并給出了實(shí)現(xiàn)模板類TwoPhaseCommitsinkFunction,F(xiàn)link 的 JobManager 對(duì)應(yīng) 2PC 的協(xié)調(diào)者,Operator 實(shí)例對(duì)應(yīng) 2PC 的參與者。繼承TwoPhaseCommitsinkFunction需要三個(gè)類型參數(shù):IN用于指定輸入數(shù)據(jù)的類型;TXN定義了用于故障后事務(wù)識(shí)別與恢復(fù)的事務(wù)標(biāo)識(shí)符的類型;CONTEXT用于指定一個(gè)可選的自定義上下文對(duì)象的類型。繼承自TwoPhaseCommitsinkFunction的子類構(gòu)造函數(shù)需要傳入兩個(gè)TypeSerializer,一個(gè)用于TXN類型,一個(gè)用于CONTEXT類型,TwoPhaseCommitsinkFunction中定義了 5 個(gè)抽象方法:beginTransaction()用于開(kāi)啟事務(wù),可從連接池中獲取連接,并返回事務(wù)句柄;invoke()每來(lái)一條數(shù)據(jù)就會(huì)觸發(fā)一次,當(dāng)前數(shù)據(jù)為schema時(shí),可直接執(zhí)行對(duì)應(yīng)的query語(yǔ)句,當(dāng)前數(shù)據(jù)為data時(shí),按照元數(shù)據(jù)中的變更類型r(全量)、c(增量插入)、u(增量更新)、d(增量刪除)、before(變更前數(shù)據(jù))、after(數(shù)據(jù)變更后數(shù)據(jù))、db(庫(kù)名)、table(表名)等信息重組對(duì)應(yīng)的SQL語(yǔ)句,將數(shù)據(jù)寫入到已開(kāi)啟的事務(wù)中;preCommit()預(yù)提交后的事務(wù)將不會(huì)再接收數(shù)據(jù)的寫入;commit()提交指定事務(wù);abort()用于終止并回滾指定事務(wù)。開(kāi)發(fā)者可實(shí)現(xiàn)上述抽象方法自定義實(shí)現(xiàn)其對(duì)應(yīng)功能。
protected abstract TXN beginTransaction() throws Exception;
protected abstract void invoke(TXN transaction, IN value, Context context) throws Exception;
protected abstract void preCommit(TXN transaction) throws Exception;
protected abstract void commit(TXN transaction);
protected abstract void abort(TXN transaction);
TwoPhaseCommitsinkFunction中的上述抽象方法與兩階段提交的執(zhí)行流程如下:
上一個(gè)checkpoint完成時(shí),會(huì)開(kāi)啟一個(gè)新的事務(wù)beginTransaction,本次事務(wù)中每條數(shù)據(jù)到來(lái)時(shí)觸發(fā)一次 invoke;當(dāng)前checkpoint 到來(lái)時(shí),會(huì)對(duì)本次事務(wù)進(jìn)行預(yù)提交 preCommit。如果 invoke和 preCommit 全部成功了,才表示第一個(gè)階段成功了。如果在第一個(gè)階段中有機(jī)器故障,或者 invoke、 preCommit失敗,則會(huì)觸發(fā) abort 方法。在第一個(gè)階段結(jié)束時(shí),數(shù)據(jù)會(huì)被寫入到外部存儲(chǔ)。如果外部存儲(chǔ)的事務(wù)隔離級(jí)別為讀已提交時(shí)(Read Committed),并不能讀取到我們的寫入的數(shù)據(jù),因?yàn)闆](méi)有執(zhí)行 commit 操作。
當(dāng)所有的Operator 實(shí)例做完checkpoint,并且都執(zhí)行完 preCommit 時(shí),會(huì)把快照完成的消息發(fā)送給 JobManager,JobManager 收到后認(rèn)為本次 checkpoint 全部完成了,通知所有Operator 實(shí)例執(zhí)行 commit 方法正式提交,此時(shí)外部存儲(chǔ)就可以讀取到我們提交的數(shù)據(jù)了。
關(guān)注作者公眾號(hào),一起討論更多,私信“TwoPhaseCommit”可獲得實(shí)現(xiàn)案例demo文件
本文由博客一文多發(fā)平臺(tái) OpenWrite 發(fā)布!