title: 【MQ】可靠消息
date: 2017-12-08 21:55:53
tags: MQ
categories: MQ
初始【MQ】最后說到默認(rèn)情況下,消息發(fā)送后 MQ 不會向發(fā)送方確認(rèn)消息到達(dá),也不會進(jìn)行持久化處理。即在發(fā)送方眼里消息只要發(fā)出去,就不再關(guān)心消息消息了。這確實做到了生產(chǎn)者與 MQ 的解耦,并且效率很高。但缺點也非常明顯,無法確定消息投遞是可靠的:
- 正在運行的 MQ 宕機(jī)后,無法恢復(fù)已發(fā)送的消息(持久化問題)
- 沒有匹配的 queue,那么消息將被 exchange 直接丟棄,而發(fā)送方對此毫不知情(確認(rèn)問題)
- 消息發(fā)送過程中在網(wǎng)絡(luò)中丟失,發(fā)送方毫不知情(確認(rèn)問題)
Rabbit MQ 是被設(shè)計為金融行業(yè)服務(wù)的,在這些方面當(dāng)然有考慮。本文將從持久化和消息確認(rèn)兩方面來了解 Rabbit MQ 的可靠消息實踐。
持久化
為了確保消息在 MQ 各個環(huán)節(jié)的不丟失,需要將 exchange, queue, 投遞方式都進(jìn)行持久化聲明。具體持久化的方式很簡單,調(diào)用 API 就可以了。
exchange 持久化
exchange 聲明時,將 durable 設(shè)置為 true 就可以了。這順便看一下 exchange 創(chuàng)建方法
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable)
throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable,
boolean autoDelete,Map<String, Object> arguments)
throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type)
throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, // 交換器名稱
String type, // 交換器類型
boolean durable, // 是否持久化
boolean autoDelete, // 是否自動刪除
boolean internal, // 內(nèi)部
Map<String, Object> arguments // 其他構(gòu)造參數(shù)
) throws IOException;
// 等價于 exchangeDeclare 方法設(shè)置 nowait 參數(shù)
void exchangeDeclareNoWait(String exchange, String type, boolean durable, boolean autoDelete,
boolean internal, Map<String, Object> arguments)
throws IOException;
// 被動聲明隊列,聲明前先檢查
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
exchange 聲明持久化后只能確保重啟后 exchange 重新創(chuàng)建。否則 exchange 將丟失,生產(chǎn)者就無法正常發(fā)送消息了。
queue 持久化
queue 持久化也是一樣的套路,將 durable 設(shè)置為 true 就可以了。queue 創(chuàng)建的 AIP:
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, // queue 名稱
boolean durable, // 持久化
boolean exclusive, // 排他隊列
boolean autoDelete, // 自動刪除
Map<String, Object> arguments // 其他構(gòu)造參數(shù)
) throws IOException;
void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
Queue.DeclareOk queueDeclarePassive(String queue) throws IOException;
對 durable 沒什么好說的,確保重啟后 queue 重新創(chuàng)建,但消息無法恢復(fù),消息的持久化依賴于投遞方式的持久化。
注意一下 exclusive 參數(shù):一個隊列被聲明為排他隊列,該隊列僅對首次申明它的連接可見,并在連接斷開時自動刪除:
- 排他隊列是基于連接可見的,同一連接的不同信道是可以同時訪問同一連接創(chuàng)建的排他隊列;
- “首次”,如果一個連接已經(jīng)聲明了一個排他隊列,其他連接是不允許建立同名的排他隊列的,這個與普通隊列不同;
- 即使該隊列是持久化的,一旦連接關(guān)閉或者客戶端退出,該排他隊列都會被自動刪除的,這種隊列適用于一個客戶端發(fā)送讀取消息的應(yīng)用場景。
投遞方式持久化聲明
套路基本一致,還是看 API:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body)throws IOException;
void basicPublish(String exchange, // 交換器
String routingKey, // routing key
boolean mandatory, // 消息確認(rèn)
boolean immediate, // 廢棄
BasicProperties props, // 參數(shù)
byte[] body // 消息有效負(fù)載
) throws IOException;
持久化的參數(shù)包含在 BasicProperties 定義中:
public static class BasicProperties extends AMQBasicProperties {
private String contentType; // 消息類型
private String contentEncoding; // 編碼
private Map<String, Object> headers;
private Integer deliveryMode; // 持久化。1:非持久化;2:持久化
private Integer priority; // 優(yōu)先級
private String correlationId;
private String replyTo; // 反饋隊列
private String expiration; // expiration到期時間
private String messageId;
private Date timestamp;
private String type;
private String userId;
private String appId;
private String clusterId;
// 省略方法
}
BasicProperties 的構(gòu)造除了提供默認(rèn)的方法外,對常用的參數(shù)可以直接獲得,還支持使用 builder 模式構(gòu)造。
如果單獨持久化投遞方式,重啟后因為交換器、隊列已不存在所以毫無意義
持久化的影響
-
性能
《Rabbit MQ 實戰(zhàn)》 一書在說明持久化對性能影響時,舉例:“使用持久化機(jī)制而導(dǎo)致消息吞吐量降低至少 10 倍的情況并不少見”。這個說法還是很讓我震驚的,很好奇 Rabbit MQ 的持久化策略是怎么做的影響這么大,還是說非持久化策略太優(yōu)秀了,以至于磁盤性能極大影響了整體吞吐量。這里挖個坑,爭取以后看看內(nèi)部實現(xiàn)吧,畢竟 erlang 對我是個大問題。
-
集群模式下工作的不好
暫時不清楚集群模式下的影響,先 mark 一下
-
依舊無法 100% 數(shù)據(jù)不丟失
即使 exchange,queue,投遞方式都進(jìn)行持久化聲明依舊不能做到 100% 數(shù)據(jù)不丟失,原因有二:
-
Rabbit MQ 不是為每條消息進(jìn)行 fsync(同步 IO) 處理
依舊可能出現(xiàn)掛掉時有消息沒有持久化的情況,解決有兩種方式:鏡像隊列和消息確認(rèn)
看到網(wǎng)上有提到 erlang 寫文件的實時問題,不懂,先 mark,待求證
-
消息確認(rèn)
消息確認(rèn)可以分為生產(chǎn)者確認(rèn)消息正確投遞和消費者確認(rèn)消息正確接收,對 Rabbit MQ 有三種更具體的情況:
- confire/事務(wù):確認(rèn)消息到達(dá) broker,避免消息在生產(chǎn)者發(fā)出后丟失
- 客戶端 ACK:確認(rèn)消費者接收消息,避免消息在消息隊列發(fā)出后丟失
- mandatory/immediate:確認(rèn)消息到達(dá)隊列,避免到達(dá)交換器后找不到隊列而丟棄
事務(wù)/confire
事務(wù)
確認(rèn)消息成功被 exchange 接收。事務(wù)是 AMQP 協(xié)議內(nèi)定義的, Rabbit MQ 也做了相應(yīng)的實現(xiàn)。與事務(wù)相關(guān)有三個方法,具體使用的模板:
try {
channel.txSelect();
channel.basicPublish(...);
channel.txCommit();
} catch (Exception e) {
e.printStackTrace();
channel.txRollback();
}
事務(wù)缺點:最大的問題是執(zhí)行前后需要開啟事務(wù),提交/回滾事務(wù),而這幾個過程又必須是同步的因此會造成很大的性能問題
confire
confire 是 Rabbit MQ 為解決事務(wù)性能問題設(shè)計的確認(rèn)機(jī)制,主要的做法是為每條消息都設(shè)置唯一 ID 且 ID 以 1 為步長生序,MQ 通過發(fā)送 ACK, NACK 異步確認(rèn)消息是否到達(dá)交換器。
網(wǎng)上普遍對 confire 的描述都集中在異步性上。除了異步,可以設(shè)置 basic.ack 的 multiple 域進(jìn)行累計確認(rèn),這有點 TCP 的確認(rèn)方式。
confire 最大的問題是無法回滾,導(dǎo)致生產(chǎn)者本身也不確定消息是否放成功。如果程序需要實現(xiàn)類似回滾功能,則維護(hù)一個 unconfire 消息的集合,每次收到 ACK/NACK 時更新集合(還需要考慮是否是累計確認(rèn))
我使用了三種方式實現(xiàn) confire 并進(jìn)行對比:
- 對每條消息要求接收對應(yīng)的 confire 消息
- 對一組消息要求接收一條 confire 消息
- 使用監(jiān)聽器完全異步的接收 confire 消息
不出意外的第三種方式的性能是最好的。
客戶端 ACK
聲明隊列時指定 noAck 參數(shù):
- noAck=false:Rabbit MQ 向消費者發(fā)出消息后等待消費者顯式發(fā)出 ack 信號后才移除消息
- noAck=true:Rabbit MQ 向消費者發(fā)出消息后立即移除消息
當(dāng)設(shè)置隊列 noAck 為 false 時,客戶端必須根據(jù)消息的處理情況向 MQ 反饋,默認(rèn)情況下 會自動確認(rèn)。如果希望手動確認(rèn)需要關(guān)閉自動確認(rèn)。
客戶端除了 ACK 為還可以向 MQ 反饋其他信息,反饋的 API 分別有:
- channel.basicAck:向 MQ 確認(rèn)消息正確接收
- channel.basicRecover:向 MQ 確認(rèn)消息需要重發(fā),可以根據(jù)參數(shù)重發(fā)給當(dāng)前消費者或重新入隊
- channel.basicReject:向 MQ 確認(rèn)消息退回
- channel.basicNack:向 MQ 確認(rèn)批量退回消息,可以根據(jù)參數(shù)選擇是否批量
mandatory/immediate
mandatory
mandatory 設(shè)置為 true 時:MQ 至少將該消息路由到至少一個隊列中,否則將消息返還給生產(chǎn)者
mandatory 實現(xiàn)時只需要:
-
投遞消息時設(shè)置 mandatory 參數(shù)為true
void basicPublish(String exchange, // 交換器 String routingKey, // routing key boolean mandatory, // 消息確認(rèn) boolean immediate, // 廢棄 BasicProperties props, // 參數(shù) byte[] body // 消息有效負(fù)載 ) throws IOException; -
設(shè)置監(jiān)聽器
channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { // TODO } });
當(dāng)消息沒有被正確路由到至少一個隊列時,AMQP協(xié)議會返回對應(yīng)消息,監(jiān)聽器內(nèi)的代碼將被調(diào)用;
當(dāng)消息正確投遞,什么也不發(fā)生
immediate
Rabbit MQ 3.0 之后已移除。設(shè)置為 true 時:消息路由到 queue 前,如果 queue 有消費者,則馬上將消息投遞給 queue,否則直接把消息返還給生產(chǎn)者,消息不再入隊。
參考:
《Rabbit MQ 實戰(zhàn)》
RabbitMQ(二):mandatory標(biāo)志的作用
RabbitMQ:Publisher的消息確認(rèn)機(jī)制
RabbitMQ之消息確認(rèn)機(jī)制(事務(wù)+Confirm)
rabbitMq生產(chǎn)者角度:消息持久化、事務(wù)機(jī)制、PublisherConfirm、mandatory