RabbitMQ是一款使用Erlang開發(fā)的開源消息隊(duì)列。本文假設(shè)讀者對RabbitMQ是什么已經(jīng)有了基本的了解,如果你還不知道它是什么以及可以用來做什么,建議先從官網(wǎng)的 RabbitMQ Tutorials 入門教程開始學(xué)習(xí)。
本文將會講解如何使用RabbitMQ實(shí)現(xiàn)延時重試和失敗消息隊(duì)列,實(shí)現(xiàn)可靠的消息消費(fèi),消費(fèi)失敗后,自動延時將消息重新投遞,當(dāng)達(dá)到一定的重試次數(shù)后,將消息投遞到失敗消息隊(duì)列,等待人工介入處理。在這里我會帶領(lǐng)大家一步一步的實(shí)現(xiàn)一個帶有失敗重試功能的發(fā)布訂閱組件,使用該組件后可以非常簡單的實(shí)現(xiàn)消息的發(fā)布訂閱,在進(jìn)行業(yè)務(wù)開發(fā)的時候,業(yè)務(wù)開發(fā)人員可以將主要精力放在業(yè)務(wù)邏輯實(shí)現(xiàn)上,而不需要花費(fèi)時間去理解RabbitMQ的一些復(fù)雜概念。
本文將會持續(xù)修正和更新,最新內(nèi)容請參考我的 GITHUB 上的 程序猿成長計(jì)劃 項(xiàng)目,歡迎 Star,更多精彩內(nèi)容請 follow me。
概要
我們將會實(shí)現(xiàn)如下功能
- 結(jié)合RabbitMQ的Topic模式和Work Queue模式實(shí)現(xiàn)生產(chǎn)方產(chǎn)生消息,消費(fèi)方按需訂閱,消息投遞到消費(fèi)方的隊(duì)列之后,多個worker同時對消息進(jìn)行消費(fèi)
- 結(jié)合RabbitMQ的 Message TTL 和 Dead Letter Exchange 實(shí)現(xiàn)消息的延時重試功能
- 消息達(dá)到最大重試次數(shù)之后,將其投遞到失敗隊(duì)列,等待人工介入處理bug后,重新將其加入隊(duì)列消費(fèi)
具體流程見下圖

- 生產(chǎn)者發(fā)布消息到主Exchange
- 主Exchange根據(jù)Routing Key將消息分發(fā)到對應(yīng)的消息隊(duì)列
- 多個消費(fèi)者的worker進(jìn)程同時對隊(duì)列中的消息進(jìn)行消費(fèi),因此它們之間采用“競爭”的方式來爭取消息的消費(fèi)
- 消息消費(fèi)后,不管成功失敗,都要返回ACK消費(fèi)確認(rèn)消息給隊(duì)列,避免消息消費(fèi)確認(rèn)機(jī)制導(dǎo)致重復(fù)投遞,同時,如果消息處理成功,則結(jié)束流程,否則進(jìn)入重試階段
- 如果重試次數(shù)小于設(shè)定的最大重試次數(shù)(3次),則將消息重新投遞到Retry Exchange的重試隊(duì)列
- 重試隊(duì)列不需要消費(fèi)者直接訂閱,它會等待消息的有效時間過期之后,重新將消息投遞給Dead Letter Exchange,我們在這里將其設(shè)置為主Exchange,實(shí)現(xiàn)延時后重新投遞消息,這樣消費(fèi)者就可以重新消費(fèi)消息
- 如果三次以上都是消費(fèi)失敗,則認(rèn)為消息無法被處理,直接將消息投遞給Failed Exchange的Failed Queue,這時候應(yīng)用可以觸發(fā)報警機(jī)制,以通知相關(guān)責(zé)任人處理
- 等待人工介入處理(解決bug)之后,重新將消息投遞到主Exchange,這樣就可以重新消費(fèi)了
技術(shù)實(shí)現(xiàn)
Linus Torvalds 曾經(jīng)說過
Talk is cheap. Show me the code
我分別用Java和PHP實(shí)現(xiàn)了本文所講述的方案,讀者可以通過參考代碼以及本文中的基本步驟來更好的理解
創(chuàng)建Exchange
為了實(shí)現(xiàn)消息的延時重試和失敗存儲,我們需要創(chuàng)建三個Exchange來處理消息。
- master 主Exchange,發(fā)布消息時發(fā)布到該Exchange
- master.retry 重試Exchange,消息處理失敗時(3次以內(nèi)),將消息重新投遞給該Exchange
- master.failed 失敗Exchange,超過三次重試失敗后,消息投遞到該Exchange
所有的Exchange聲明(declare)必須使用以下參數(shù)
| 參數(shù) | 值 | 說明 |
|---|---|---|
| exchange | - | Exchange名稱 |
| type | topic | Exchange 類型 |
| passive | false | 如果Exchange已經(jīng)存在,則返回成功,不存在則創(chuàng)建 |
| durable | true | 持久化存儲Exchange,這里僅僅是Exchange本身持久化,消息和隊(duì)列需要單獨(dú)指定其持久化 |
| no-wait | false | 該方法需要應(yīng)答確認(rèn) |
Java代碼
// 聲明Exchange:主體,失敗,重試
channel.exchangeDeclare("master", "topic", true);
channel.exchangeDeclare("master.retry", "topic", true);
channel.exchangeDeclare("master.failed", "topic", true);
PHP代碼
// 普通交換機(jī)
$this->channel->exchange_declare('master', 'topic', false, true, false);
// 重試交換機(jī)
$this->channel->exchange_declare('master.retry', 'topic', false, true, false);
// 失敗交換機(jī)
$this->channel->exchange_declare('master.failed', 'topic', false, true, false);
在RabbitMQ的管理界面中,我們可以看到創(chuàng)建的三個Exchange

消息發(fā)布
消息發(fā)布時,使用basic_publish方法,參數(shù)如下
| 參數(shù) | 值 | 說明 |
|---|---|---|
| message | - | 發(fā)布的消息對象 |
| exchange | master | 消息發(fā)布到的Exchange |
| routing-key | - | 路由KEY,用于標(biāo)識消息類型 |
| mandatory | false | 是否強(qiáng)制路由,指定了該選項(xiàng)后,如果沒有訂閱該消息,則會返回路由不可達(dá)錯誤 |
| immediate | false | 指定了當(dāng)消息無法直接路由給消費(fèi)者時如何處理 |
發(fā)布消息時,對于message對象,其內(nèi)容建議使用json編碼后的字符串,同時消息需要標(biāo)識以下屬性
'delivery_mode'=> 2 // 1為非持久化,2為持久化
Java代碼
channel.basicPublish(
"master",
routingKey,
MessageProperties.PERSISTENT_BASIC, // delivery_mode
message.getBytes()
);
PHP代碼
$msg = new AMQPMessage($message->serialize(), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
$this->channel->basic_publish($msg, 'master', $routingKey);
消息訂閱
消息訂閱的實(shí)現(xiàn)相對復(fù)雜一些,需要完成隊(duì)列的聲明以及隊(duì)列和Exchange的綁定。
Declare Queue
對于每一個訂閱消息的服務(wù),都必須創(chuàng)建一個該服務(wù)對應(yīng)的隊(duì)列,將該隊(duì)列綁定到關(guān)注的路由規(guī)則,這樣之后,消息生產(chǎn)者將消息投遞給Exchange之后,就會按照路由規(guī)則將消息分發(fā)到對應(yīng)的隊(duì)列供消費(fèi)者消費(fèi)了。
消費(fèi)服務(wù)需要declare三個隊(duì)列
-
[queue_name]隊(duì)列名稱,格式符合[服務(wù)名稱]@訂閱服務(wù)標(biāo)識 -
[queue_name]@retry重試隊(duì)列 -
[queue_name]@failed失敗隊(duì)列
訂閱服務(wù)標(biāo)識是客戶端自己對訂閱的分類標(biāo)識符,比如用戶中心服務(wù)(服務(wù)名稱ucenter),包含兩個訂閱:user和enterprise,這里兩個訂閱的隊(duì)列名稱就為ucenter@user和ucenter@enterprise,其對應(yīng)的重試隊(duì)列為ucenter@user@retry和ucenter@enterprise@retry。
Declare隊(duì)列時,參數(shù)規(guī)定規(guī)則如下
| 參數(shù) | 值 | 說明 |
|---|---|---|
| queue | - | 隊(duì)列名稱 |
| passive | false | 隊(duì)列不存在則創(chuàng)建,存在則直接成功 |
| durable | true | 隊(duì)列持久化 |
| exclusive | false | 排他,指定該選項(xiàng)為true則隊(duì)列只對當(dāng)前連接有效,連接斷開后自動刪除 |
| no-wait | false | 該方法需要應(yīng)答確認(rèn) |
| auto-delete | false | 當(dāng)不再使用時,是否自動刪除 |
對于@retry重試隊(duì)列,需要指定額外參數(shù)
'x-dead-letter-exchange' => 'master'
'x-message-ttl' => 30 * 1000 // 重試時間設(shè)置為30s
這里的兩個header字段的含義是,在隊(duì)列中延遲30s后,將該消息重新投遞到
x-dead-letter-exchange對應(yīng)的Exchange中
Java代碼
// 聲明監(jiān)聽隊(duì)列
channel.queueDeclare(
queueName, // 隊(duì)列名稱
true, // durable
false, // exclusive
false, // autoDelete
null // arguments
);
channel.queueDeclare(queueName + "@failed", true, false, false, null);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", exchangeName());
arguments.put("x-message-ttl", 30 * 1000);
channel.queueDeclare(queueName + "@retry", true, false, false, arguments);
PHP代碼
$this->channel->queue_declare($queueName, false, true, false, false, false);
$this->channel->queue_declare($failedQueueName, false, true, false, false, false);
$this->channel->queue_declare(
$retryQueueName, // 隊(duì)列名稱
false, // passive
true, // durable
false, // exclusive
false, // auto_delete
false, // nowait
new AMQPTable([
'x-dead-letter-exchange' => 'master',
'x-message-ttl' => 30 * 1000,
])
);
在RabbitMQ的管理界面中,Queues部分可以看到我們創(chuàng)建的三個隊(duì)列

查看隊(duì)列的詳細(xì)信息,我們可以看到 queueName@retry 隊(duì)列與其它兩個隊(duì)列的不同

Bind Exchange & Queue
創(chuàng)建完隊(duì)列之后,需要將隊(duì)列與Exchange綁定(bind),不同隊(duì)列需要綁定到之前創(chuàng)建的對應(yīng)的Exchange上面
| Queue | Exchange |
|---|---|
| [queue_name] | master |
| [queue_name]@retry | master.retry |
| [queue_name]@failed | master.failed |
綁定時,需要提供訂閱的路由KEY,該路由KEY與消息發(fā)布時的路由KEY對應(yīng),區(qū)別是這里可以使用通配符同時訂閱多種類型的消息。
| 參數(shù) | 值 | 說明 |
|---|---|---|
| queue | - | 綁定的隊(duì)列 |
| exchange | - | 綁定的Exchange |
| routing-key | - | 訂閱的消息路由規(guī)則 |
| no-wait | false | 該方法需要應(yīng)答確認(rèn) |
Java代碼
// 綁定監(jiān)聽隊(duì)列到Exchange
channel.queueBind(queueName, "master", routingKey);
channel.queueBind(queueName + "@failed", "master.failed", routingKey);
channel.queueBind(queueName + "@retry", "master.retry", routingKey);
PHP代碼
$this->channel->queue_bind($queueName, 'master', $routingKey);
$this->channel->queue_bind($retryQueueName, 'master.retry', $routingKey);
$this->channel->queue_bind($failedQueueName, 'master.failed', $routingKey);
在RabbitMQ的管理界面中,我們可以看到該隊(duì)列與Exchange和routing-key的綁定關(guān)系



消息消費(fèi)實(shí)現(xiàn)
使用 basic_consume 對消息進(jìn)行消費(fèi)的時候,需要注意下面參數(shù)
| 參數(shù) | 值 | 說明 |
|---|---|---|
| queue | - | 消費(fèi)的隊(duì)列名稱 |
| consumer-tag | - | 消費(fèi)者標(biāo)識,留空即可 |
| no_local | false | 如果設(shè)置了該字段,服務(wù)器將不會發(fā)布消息到 發(fā)布它的客戶端 |
| no_ack | false | 需要消費(fèi)確認(rèn)應(yīng)答 |
| exclusive | false | 排他訪問,設(shè)置后只允許當(dāng)前消費(fèi)者訪問該隊(duì)列 |
| nowait | false | 該方法需要應(yīng)答確認(rèn) |
消費(fèi)端在消費(fèi)消息時,需要從消息中獲取消息被消費(fèi)的次數(shù),以此判斷該消息處理失敗時重試還是發(fā)送到失敗隊(duì)列。
Java代碼
protected Long getRetryCount(AMQP.BasicProperties properties) {
Long retryCount = 0L;
try {
Map<String, Object> headers = properties.getHeaders();
if (headers != null) {
if (headers.containsKey("x-death")) {
List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death");
if (deaths.size() > 0) {
Map<String, Object> death = deaths.get(0);
retryCount = (Long) death.get("count");
}
}
}
} catch (Exception e) {}
return retryCount;
}
PHP代碼
protected function getRetryCount(AMQPMessage $msg): int
{
$retry = 0;
if ($msg->has('application_headers')) {
$headers = $msg->get('application_headers')->getNativeData();
if (isset($headers['x-death'][0]['count'])) {
$retry = $headers['x-death'][0]['count'];
}
}
return (int)$retry;
}
消息消費(fèi)完成后,需要發(fā)送消費(fèi)確認(rèn)消息給服務(wù)端,使用basic_ack方法
ack(delivery-tag=消息的delivery-tag標(biāo)識)
Java代碼
// 消息消費(fèi)處理
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
...
// 注意,由于使用了basicConsume的autoAck特性,因此這里就不需要手動執(zhí)行
// channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 執(zhí)行消息消費(fèi)處理
channel.basicConsume(
queueName,
true, // autoAck
consumer
);
PHP代碼
$this->channel->basic_consume(
$queueName,
'', // customer_tag
false, // no_local
false, // no_ack
false, // exclusive
false, // nowait
function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) {
...
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
);
如果消息處理中出現(xiàn)異常,應(yīng)該將該消息重新投遞到重試Exchange,等待下次重試
basic_publish(msg, 'master.retry', routing-key)
ack(delivery-tag) // 不要忘記了應(yīng)答消費(fèi)成功消息
如果判斷重試次數(shù)大于3次,仍然處理失敗,則應(yīng)該講消息投遞到失敗Exchange,等待人工處理
basic_publish(msg, 'master.failed', routing-key)
ack(delivery-tag) // 不要忘記了應(yīng)答消費(fèi)成功消息
一定不要忘記ack消息,因?yàn)橹卦?、失敗都是通過將消息重新投遞到重試、失敗Exchange來實(shí)現(xiàn)的,如果忘記ack,則該消息在超時或者連接斷開后,會重新被重新投遞給消費(fèi)者,如果消費(fèi)者依舊無法處理,則會造成死循環(huán)。
Java代碼
try {
String message = new String(body, "UTF-8");
// 消息處理函數(shù)
handler.handle(message, envelope.getRoutingKey());
} catch (Exception e) {
long retryCount = getRetryCount(properties);
if (retryCount > 3) {
// 重試次數(shù)大于3次,則自動加入到失敗隊(duì)列
channel.basicPublish("master.failed", envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body);
} else {
// 重試次數(shù)小于3,則加入到重試隊(duì)列,30s后再重試
channel.basicPublish("master.retry", envelope.getRoutingKey(), properties, body);
}
}
失敗任務(wù)重試
如果任務(wù)重試三次仍未成功,則會被投遞到失敗隊(duì)列,這時候需要人工處理程序異常,處理完畢后,需要將消息重新投遞到隊(duì)列進(jìn)行處理,這里唯一需要做的就是從失敗隊(duì)列訂閱消息,然后獲取到消息后,清空其application_headers頭信息,然后重新投遞到master這個Exchange即可。
Java代碼
channel.basicPublish(
'master',
envelope.getRoutingKey(),
MessageProperties.PERSISTENT_BASIC,
body
);
PHP代碼
$msg->set('application_headers', new AMQPTable([]));
$this->channel->basic_publish(
$msg,
'master',
$msg->get('routing_key')
);
怎么使用
隊(duì)列和Exchange以及發(fā)布訂閱的關(guān)系我們就說完了,那么使用起來是什么效果呢?這里我們以Java代碼為例
// 發(fā)布消息
Publisher publisher = new Publisher(factory.newConnection(), 'master');
publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create");
// 訂閱消息
new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME)
.init("user-monitor", "user.*")
.subscribe((message, routingKey) -> {
// TODO 業(yè)務(wù)邏輯
System.out.printf(" <%s> message consumed: %s\n", routingKey, message);
}
);
總結(jié)
使用RabbitMQ時,實(shí)現(xiàn)延時重試和失敗隊(duì)列的方式并不僅僅局限于本文中描述的方法,如果讀者有更好的實(shí)現(xiàn)方案,歡迎拍磚,在這里我也只是拋磚引玉了。本文中講述的方法還有很多優(yōu)化空間,讀者也可以試著去改進(jìn)其實(shí)現(xiàn)方案,比如本文中使用了三個Exchagne,是否只使用一個Exchange也能實(shí)現(xiàn)本文中所講述的功能。
本文將會持續(xù)修正和更新,最新內(nèi)容請參考我的 GITHUB 上的 程序猿成長計(jì)劃 項(xiàng)目,歡迎 Star,更多精彩內(nèi)容請 follow me。