消息發(fā)送一致性:是指產(chǎn)生消息的業(yè)務(wù)動(dòng)作與消息發(fā)送的一致。也就是說,如果業(yè)務(wù)操作成功,那么由這個(gè)業(yè)務(wù)操作所產(chǎn)生的消息一定要成功投遞出去(一般是發(fā)送到kafka、rocketmq、rabbitmq等消息中間件中),否則就丟消息。
? 柔性事務(wù)、可靠消息最終一致性、異步確保性
下面用偽代碼進(jìn)行演示消息發(fā)送和投遞的不可靠性:
1、先進(jìn)行數(shù)據(jù)庫操作,再發(fā)送消息
public void test1(){//1 數(shù)據(jù)庫操作//2 發(fā)送MQ消息}
這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因?yàn)榭赡軘?shù)據(jù)庫操作成功,發(fā)送消息失敗
2、先發(fā)送消息,再操作數(shù)據(jù)庫
public void test1(){//1 發(fā)送MQ消息//2 數(shù)據(jù)庫操作}
這種情況下無法保證數(shù)據(jù)庫操作與發(fā)送消息的一致性,因?yàn)榭赡馨l(fā)送消息成功,數(shù)據(jù)庫操作失敗
3、在數(shù)據(jù)庫事務(wù)中,先發(fā)送消息,后操作數(shù)據(jù)庫
@Transactionalpublic void test1(){//1 發(fā)送MQ消息//2 數(shù)據(jù)庫操作}
? 這里使用spring 的@Transactional注解,方法里面的操作都在一個(gè)事務(wù)中。同樣無法保證一致性,因?yàn)榘l(fā)送消息成功了,數(shù)據(jù)庫操作失敗的情況下,數(shù)據(jù)庫操作是回滾了,但是MQ消息沒法進(jìn)行回滾。
4、在數(shù)據(jù)庫事務(wù)中,先操作數(shù)據(jù)庫,后發(fā)送消息
@Transactionalpublic void test1(){//1 數(shù)據(jù)庫操作//2 發(fā)送MQ消息}
? 這種情況下,貌似沒有問題,如果發(fā)送MQ消息失敗,拋出異常,事務(wù)一定會(huì)回滾(加上了@Transactional注解后,spring方法拋出異常后,會(huì)自動(dòng)進(jìn)行回滾)。
? 這只是一個(gè)假象,因?yàn)榘l(fā)送MQ消息可能事實(shí)上已經(jīng)成功,如果是響應(yīng)超時(shí)導(dǎo)致的異常。這個(gè)時(shí)候,數(shù)據(jù)庫操作依然回滾,但是MQ消息實(shí)際上已經(jīng)發(fā)送成功,導(dǎo)致不一致。
5、使用JTA事務(wù)管理器
? 前面通過spring的@Transactional注解加在方法上,來開啟事務(wù)。其實(shí)有一個(gè)條件沒有明確的說出來,就是我們配置的事務(wù)管理器是DataSourceTransactionManager。
? 事實(shí)上,Spring還提供了另外一個(gè)分布式事務(wù)管理器JtaTransactionManager。這個(gè)是使用XA兩階段提交來保證事務(wù)的一致性。當(dāng)然前提是,你的消息中間件是實(shí)現(xiàn)了JMS規(guī)范中事務(wù)消息相關(guān)API(回顧前面我們介紹JTA規(guī)范時(shí),提到DB、MQ都只是資源管理器RM,對(duì)于事務(wù)管理器來說,二者是等價(jià)的)。
? 因此如果你滿足了2個(gè)條件:1、使用JtaTransactionManager 2、DB、MQ分別實(shí)現(xiàn)了JDBC、JMS規(guī)范中規(guī)定的RM應(yīng)該實(shí)現(xiàn)的兩階段提交的API,就可以保證消息發(fā)送的一致性。
? DB作為RM,一般都是支持兩階段提交的。不過,一些MQ中間件并不支持,所以你要找到支持兩階段提交的MQ中間件。另外,JtaTransactionManager只是一個(gè)代理,你需要提供一個(gè)真實(shí)的事務(wù)管理器(TM)實(shí)現(xiàn)。如前面提到了atomikos公司,就有這樣的產(chǎn)品。
? 但是筆者依然不建議,這樣玩。因?yàn)閄A兩階段提交性能低,我們使用消息中間件就是為了異步解耦,這種情況,雖然保證了一致性,但是響應(yīng)時(shí)間卻大大增加,系統(tǒng)可用性降低。
? 那么如何保證,數(shù)據(jù)庫操作和消息發(fā)送的一致性呢?
兩種方案:一種是基于MQ的事務(wù)消息,以下展示了RocketMQ的事務(wù)消息機(jī)制。

事務(wù)消息的邏輯,由發(fā)送端 Producer進(jìn)行保證(消費(fèi)端無需考慮)
? 首先,發(fā)送一個(gè)事務(wù)消息,這個(gè)時(shí)候,RocketMQ將消息狀態(tài)標(biāo)記為Prepared,注意此時(shí)這條消息消費(fèi)者是無法消費(fèi)到的。
? 接著,執(zhí)行業(yè)務(wù)代碼邏輯,可能是一個(gè)本地?cái)?shù)據(jù)庫事務(wù)操作
? 最后,確認(rèn)發(fā)送消息,這個(gè)時(shí)候,RocketMQ將消息狀態(tài)標(biāo)記為可消費(fèi),這個(gè)時(shí)候消費(fèi)者,才能真正的保證消費(fèi)到這條數(shù)據(jù)。
? 如果確認(rèn)消息發(fā)送失敗了怎么辦?RocketMQ會(huì)定期掃描消息集群中的事務(wù)消息,如果發(fā)現(xiàn)了Prepared消息,它會(huì)向消息發(fā)送端(生產(chǎn)者)確認(rèn)。RocketMQ會(huì)根據(jù)發(fā)送端設(shè)置的策略來決定是回滾還是繼續(xù)發(fā)送確認(rèn)消息。這樣就保證了消息發(fā)送與本地事務(wù)同時(shí)成功或同時(shí)失敗。
? 如果消費(fèi)失敗怎么辦?阿里提供給我們的解決方法是:人工解決。
?
? 另外一種實(shí)現(xiàn),并不是所有的mq都支持事務(wù)消息。也就是消息一旦發(fā)送到消息隊(duì)列中,消費(fèi)者立馬就可以消費(fèi)到。此時(shí)可以使用獨(dú)立消息服務(wù)、或者本地事務(wù)表。

? 可以看到,其實(shí)就是將消息先發(fā)送到一個(gè)我們自己編寫的一個(gè)"獨(dú)立消息服務(wù)"應(yīng)用中,剛開始處于prepare狀態(tài),業(yè)務(wù)邏輯處理成功后,確認(rèn)發(fā)送消息,這個(gè)時(shí)候"獨(dú)立消息服務(wù)"才會(huì)真正的把消息發(fā)送給消息隊(duì)列。消費(fèi)者消費(fèi)成功后,ack時(shí),除了對(duì)消息隊(duì)列進(jìn)行ack(圖中沒有畫出),對(duì)于獨(dú)立消息服務(wù)也要進(jìn)行ack,"獨(dú)立消息服務(wù)"一般是把這條消息刪除。而定時(shí)掃描prepare狀態(tài)的消息,向消息發(fā)送端(生產(chǎn)者)確認(rèn)的工作也由獨(dú)立消息服務(wù)來完成。
? 對(duì)于"本地事務(wù)表",其實(shí)和"獨(dú)立消息服務(wù)"的作用類似,只不過"獨(dú)立消息服務(wù)"是需要獨(dú)立部署的,而"本地事務(wù)表"是將"獨(dú)立消息服務(wù)"的功能內(nèi)嵌到應(yīng)用中。