本文對(duì)比 二階段事務(wù)、最大努力交付以及消息最終一致性,并給出部分解決方案,最終一致性方案參考阿里RockMQ事務(wù)消息:http://blog.csdn.net/chunlongyu/article/details/53844393)


一 2階段事務(wù)
分布式系統(tǒng)最終一致性有N種方案,比如2PC(2階段事務(wù)) ,以及三段提交等等,但開(kāi)銷較大,實(shí)現(xiàn)起來(lái)復(fù)雜,比如2階段事務(wù)為例,需要引入一個(gè)協(xié)調(diào)者(Coordinator)來(lái)統(tǒng)一掌控所有參與者(Participant)的操作結(jié)果
以開(kāi)會(huì)為例:
甲乙丙丁四人要組織一個(gè)會(huì)議,需要確定會(huì)議時(shí)間,不妨設(shè)甲是協(xié)調(diào)者,乙丙丁是參與者。
投票階段:
(1)甲發(fā)郵件給乙丙丁,周二十點(diǎn)開(kāi)會(huì)是否有時(shí)間;
(2)甲回復(fù)有時(shí)間;
(3)乙回復(fù)有時(shí)間;
(4)丙遲遲不回復(fù),此時(shí)對(duì)于這個(gè)活動(dòng),甲乙丙均處于阻塞狀態(tài),算法無(wú)法繼續(xù)進(jìn)行;
(5)丙回復(fù)有時(shí)間(或者沒(méi)有時(shí)間);
提交階段:
(1)協(xié)調(diào)者甲將收集到的結(jié)果反饋給乙丙?。ㄊ裁磿r(shí)候反饋,以及反饋結(jié)果如何,在此例中取決與丙的時(shí)間與決定);
(2)乙收到;
(3)丙收到;
(4)丁收到;
不僅要鎖住參與者的所有資源,而且要鎖住協(xié)調(diào)者資源,開(kāi)銷大。一句話總結(jié)就是:2PC效率很低,分布式事務(wù)很難做。
在對(duì)事實(shí)性要求沒(méi)有那么高的情況下,可以用基于最大努力交付 && 消息隊(duì)列以及消息存儲(chǔ)來(lái)解決最終一致性。
二 消息最大努力交付
所謂最大努力交付,就是俺反正用最大努力做,能不能成功,不做完全保證
會(huì)涉及到三個(gè)模塊
- 上游應(yīng)用,發(fā)消息到 MQ 隊(duì)列。
- 下游應(yīng)用(例如短信服務(wù)、郵件服務(wù)),接受請(qǐng)求,并返回通知結(jié)果。
- 最大努力通知服務(wù),監(jiān)聽(tīng)消息隊(duì)列,將消息存儲(chǔ)到數(shù)據(jù)庫(kù)中,并按照通知規(guī)則調(diào)用下游應(yīng)用的發(fā)送通知接口。
具體流程如下

- 上游應(yīng)用發(fā)送 MQ 消息到 MQ 組件內(nèi),消息內(nèi)包含通知規(guī)則和通知地址
- 最大努力通知服務(wù)監(jiān)聽(tīng)到 MQ 內(nèi)的消息,解析通知規(guī)則并放入延時(shí)隊(duì)列等待觸發(fā)通知
- 最大努力通知服務(wù)調(diào)用下游的通知地址,如果調(diào)用成功,則該消息標(biāo)記為通知成功,如果失敗則在滿足通知規(guī)則(例如 5 分鐘發(fā)一次,共發(fā)送 10 次)的情況下重新放入延時(shí)隊(duì)列等待下次觸發(fā)。
最大努力通知服務(wù)表示在不影響主業(yè)務(wù)的情況下,盡可能地確保數(shù)據(jù)的一致性。它需要開(kāi)發(fā)人員根據(jù)業(yè)務(wù)來(lái)指定通知規(guī)則,在滿足通知規(guī)則的前提下,盡可能的確保數(shù)據(jù)的一致,以達(dá)到最大努力的目的。
實(shí)現(xiàn)上也比較簡(jiǎn)單,目前主流消息隊(duì)列都有ack機(jī)制,當(dāng)沒(méi)收到ack的時(shí)候用規(guī)則做定時(shí)重發(fā)即可。
優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單
缺點(diǎn):無(wú)補(bǔ)償機(jī)制,不保證能夠送達(dá)
實(shí)現(xiàn)要點(diǎn): 保證消息發(fā)送失敗之后能夠和業(yè)務(wù)一起回滾;消息接受方保證冥等性;定時(shí)重發(fā)機(jī)制,采用一定的重發(fā)策略,例如說(shuō)指數(shù)增長(zhǎng),據(jù)說(shuō)阿里采用redis的zset來(lái)完成,參考https://zhuanlan.zhihu.com/p/26114119
消息進(jìn)到zset后,DelayQ會(huì)通過(guò)timer觸發(fā)(比如秒級(jí)),fork相應(yīng)的消費(fèi)線程去處理zset里ExecuteTime大于當(dāng)前時(shí)間的消息。DelayQ拿到一條消息后,解析其中的callbackurl,并組裝參數(shù),push業(yè)務(wù)消息給Consumer.
Consumer返回處理成功,那么zrem Codis里的消息。如果處理失敗,則計(jì)算其下次嘗試時(shí)間,并更新其ExecuteTime.
三 可靠消息最終一致性方案
此方案涉及 3 個(gè)模塊:
- 上游應(yīng)用,執(zhí)行業(yè)務(wù)并發(fā)送 MQ 消息。
- 可靠消息服務(wù)和 MQ 消息組件,協(xié)調(diào)上下游消息的傳遞,并確保上下游數(shù)據(jù)的一致性。
- 下游應(yīng)用,監(jiān)聽(tīng) MQ 的消息并執(zhí)行自身業(yè)務(wù)。

第一階段:上游應(yīng)用執(zhí)行業(yè)務(wù)并發(fā)送 MQ 消息
上游應(yīng)用將本地業(yè)務(wù)執(zhí)行和消息發(fā)送綁定在同一個(gè)本地事務(wù)中,保證要么本地操作成功并發(fā)送 MQ 消息,要么兩步操作都失敗并回滾。
上游應(yīng)用和可靠消息之間的業(yè)務(wù)交互圖如下:

- 上游應(yīng)用發(fā)送待確認(rèn)消息到可靠消息系統(tǒng)
- 可靠消息系統(tǒng)保存待確認(rèn)消息并返回
- 上游應(yīng)用執(zhí)行本地業(yè)務(wù)
- 上游應(yīng)用通知可靠消息系統(tǒng)確認(rèn)業(yè)務(wù)已執(zhí)行并發(fā)送消息。
- 可靠消息系統(tǒng)修改消息狀態(tài)為發(fā)送狀態(tài)并將消息投遞到 MQ 中間件。
以上每一步都可能出現(xiàn)失敗情況,分析一下這 5 步出現(xiàn)異常后上游業(yè)務(wù)和消息發(fā)送是否一致:

上游應(yīng)用執(zhí)行完成,下游應(yīng)用尚未執(zhí)行或執(zhí)行失敗時(shí),此事務(wù)即處于 BASE 理論的 Soft State 狀態(tài)。
第二階段:下游應(yīng)用監(jiān)聽(tīng) MQ 消息并執(zhí)行業(yè)務(wù)
下游應(yīng)用監(jiān)聽(tīng) MQ 消息并執(zhí)行業(yè)務(wù),并且將消息的消費(fèi)結(jié)果通知可靠消息服務(wù)。
可靠消息的狀態(tài)需要和下游應(yīng)用的業(yè)務(wù)執(zhí)行保持一致,可靠消息狀態(tài)不是已完成時(shí),確保下游應(yīng)用未執(zhí)行,可靠消息狀態(tài)是已完成時(shí),確保下游應(yīng)用已執(zhí)行。
下游應(yīng)用和可靠消息服務(wù)之間的交互圖如下:

- 下游應(yīng)用監(jiān)聽(tīng) MQ 消息組件并獲取消息
- 下游應(yīng)用根據(jù) MQ 消息體信息處理本地業(yè)務(wù)
- 下游應(yīng)用向 MQ 組件自動(dòng)發(fā)送 ACK 確認(rèn)消息被消費(fèi)
- 下游應(yīng)用通知可靠消息系統(tǒng)消息被成功消費(fèi),可靠消息將該消息狀態(tài)更改為已完成。
以上每一步都可能出現(xiàn)失敗情況,分析一下這 4 步出現(xiàn)異常后下游業(yè)務(wù)和消息狀態(tài)是否一致:
通過(guò)分析以上兩個(gè)階段可能失敗的情況,為了確保上下游數(shù)據(jù)的最終一致性,在可靠消息系統(tǒng)中,需要開(kāi)發(fā) 消息狀態(tài)確認(rèn) 和 消息重發(fā) 兩個(gè)功能以實(shí)現(xiàn) BASE 理論的 Eventually Consistent 特性。
異常處理一:消息狀態(tài)確認(rèn)
可靠消息服務(wù)定時(shí)監(jiān)聽(tīng)消息的狀態(tài),如果存在狀態(tài)為待確認(rèn)并且超時(shí)的消息,則表示上游應(yīng)用和可靠消息交互中的步驟 4 或者 5 出現(xiàn)異常。
可靠消息則攜帶消息體內(nèi)的信息向上游應(yīng)用發(fā)起請(qǐng)求查詢?cè)摌I(yè)務(wù)是否已執(zhí)行。上游應(yīng)用提供一個(gè)可查詢接口供可靠消息追溯業(yè)務(wù)執(zhí)行狀態(tài),如果業(yè)務(wù)執(zhí)行成功則更改消息狀態(tài)為已發(fā)送,否則刪除此消息確保數(shù)據(jù)一致。具體流程如下:

- 可靠消息查詢超時(shí)的待確認(rèn)狀態(tài)的消息
- 向上游應(yīng)用查詢業(yè)務(wù)執(zhí)行的情況
- 業(yè)務(wù)未執(zhí)行,則刪除該消息,保證業(yè)務(wù)和可靠消息服務(wù)的一致性。業(yè)務(wù)已執(zhí)行,則修改消息狀態(tài)為已發(fā)送,并發(fā)送消息到 MQ 組件。
異常處理二:消息重發(fā)
消息已發(fā)送則表示上游應(yīng)用已經(jīng)執(zhí)行,接下來(lái)則確保下游應(yīng)用也能正常執(zhí)行。
可靠消息服務(wù)發(fā)現(xiàn)可靠消息服務(wù)中存在消息狀態(tài)為已發(fā)送并且超時(shí)的消息,則表示可靠消息服務(wù)和下游應(yīng)用中存在異常的步驟,無(wú)論哪個(gè)步驟出現(xiàn)異常,可靠消息服務(wù)都將此消息重新投遞到 MQ 組件中供下游應(yīng)用監(jiān)聽(tīng)。
下游應(yīng)用監(jiān)聽(tīng)到此消息后,在保證冪等性的情況下重新執(zhí)行業(yè)務(wù)并通知可靠消息服務(wù)此消息已經(jīng)成功消費(fèi),最終確保上游應(yīng)用、下游應(yīng)用的數(shù)據(jù)最終一致性。具體流程如下:

- 可靠消息服務(wù)定時(shí)查詢狀態(tài)為已發(fā)送并超時(shí)的消息
- 可靠消息將消息重新投遞到 MQ 組件中
- 下游應(yīng)用監(jiān)聽(tīng)消息,在滿足冪等性的條件下,重新執(zhí)行業(yè)務(wù)。
- 下游應(yīng)用通知可靠消息服務(wù)該消息已經(jīng)成功消費(fèi)。
通過(guò)消息狀態(tài)確認(rèn)和消息重發(fā)兩個(gè)功能,可以確保上游應(yīng)用、可靠消息服務(wù)和下游應(yīng)用數(shù)據(jù)的最終一致性。
四 肉身實(shí)戰(zhàn)Rabbitmq
我們?cè)趓abbitmq上肉身實(shí)戰(zhàn)了一下可靠消息,rabbitmq的發(fā)送過(guò)程如下
- 發(fā)送消息到消息服務(wù)
- 消息隊(duì)列將消息發(fā)送給監(jiān)聽(tīng)
- 消息監(jiān)聽(tīng)接受并處理消息
我們來(lái)看看可能發(fā)送異常的四種
1 直接無(wú)法到達(dá)消息服務(wù)
網(wǎng)絡(luò)斷了,拋出異常,業(yè)務(wù)直接回滾即可。如果出現(xiàn)connenction error,直接增加 connection數(shù)即可
connectionFactory.setChannelCacheSize(100);
2 消息已經(jīng)到達(dá)服務(wù)器,但返回的時(shí)候出現(xiàn)異常
rabbitmq提供了確認(rèn)ack機(jī)制,可以用來(lái)確認(rèn)消息是否有返回。因此我們可以在發(fā)送前在db中(內(nèi)存或關(guān)系型數(shù)據(jù)庫(kù))先存一下消息,如果ack異常則進(jìn)行重發(fā)
/**confirmcallback用來(lái)確認(rèn)消息是否有送達(dá)消息隊(duì)列*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
//try to resend msg
} else {
//delete msg in db
}
});
/**若消息找不到對(duì)應(yīng)的Exchange會(huì)先觸發(fā)returncallback */
rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> {
try {
Thread.sleep(Constants.ONE_SECOND);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("send message failed: " + replyCode + " " + replyText);
rabbitTemplate.send(message);
});
如果消息沒(méi)有到exchange,則confirm回調(diào),ack=false
如果消息到達(dá)exchange,則confirm回調(diào),ack=true
但如果是找不到exchange,則會(huì)先觸發(fā)returncallback
3 消息送達(dá)后,消息服務(wù)自己掛了
如果設(shè)置了消息持久化,那么ack= true是在消息持久化完成后,就是存到硬盤上之后再發(fā)送的,確保消息已經(jīng)存在硬盤上,萬(wàn)一消息服務(wù)掛了,消息服務(wù)恢復(fù)是能夠再重發(fā)消息
4 未送達(dá)消費(fèi)者
消息服務(wù)收到消息后,消息會(huì)處于"UNACK"的狀態(tài),直到客戶端確認(rèn)消息
channel.basicQos(1); // accept only one unack-ed message at a time (see below)
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
//確認(rèn)收到消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
5 確認(rèn)消息丟失
消息返回時(shí)假設(shè)確認(rèn)消息丟失了,那么消息服務(wù)會(huì)重發(fā)消息。注意,如果你設(shè)置了autoAck= false,但又沒(méi)應(yīng)答 channel.baskAck也沒(méi)有應(yīng)答 channel.baskNack,那么會(huì)導(dǎo)致非常嚴(yán)重的錯(cuò)誤:消息隊(duì)列會(huì)被堵塞住,可參考http://blog.sina.com.cn/s/blog_48d4cf2d0102w53t.html,所以,無(wú)論如何都必須應(yīng)答
6 消費(fèi)者業(yè)務(wù)處理異常
消息監(jiān)聽(tīng)接受消息并處理,假設(shè)拋異常了,第一階段事物已經(jīng)完成,如果要配置回滾則過(guò)于麻煩,即使做事務(wù)補(bǔ)償也可能事務(wù)補(bǔ)償失效的情況,所以這里可以做一個(gè)重復(fù)執(zhí)行,比如guava的retry,設(shè)置一個(gè)指數(shù)時(shí)間來(lái)循環(huán)執(zhí)行,如果n次后依然失敗,發(fā)郵件、短信,用人肉來(lái)兜底。
參考:http://blog.csdn.net/revivedsun/article/details/53055250