消息派發(fā)
上篇《RabbitMQ入門-消息派發(fā)那些事兒》發(fā)布之后,收了不少反饋,其中問的最多的還是有關(guān)消息確認(rèn)以及超時(shí)等場景的處理。
樓主,有遇到消費(fèi)者后臺進(jìn)程不在,但consumer連接還在,當(dāng)前消息是unacked狀態(tài),導(dǎo)致這個(gè)消息一直不被消費(fèi)
隊(duì)列在等待回復(fù)的時(shí)候,這個(gè)消息是怎么存放的?如果一直沒有返回有超時(shí)么?
...
這里再對消息確認(rèn)做以下補(bǔ)充
有關(guān)超時(shí)
RabbitMQ是沒有超時(shí)概念的,如果一個(gè)消費(fèi)者消費(fèi)一條消息要花費(fèi)很長時(shí)間,比如10分鐘,那么這個(gè)過程會一直進(jìn)行下去。除非你采用其他策略來中斷它或者重試。
消費(fèi)者掛了怎么辦
如果我們不打開自動確認(rèn)的標(biāo)識autoAck,那么消費(fèi)者在消費(fèi)完成消息之后會發(fā)送一個(gè)確認(rèn)標(biāo)識給RabbitMQ。RabbitMQ接收到這個(gè)標(biāo)識之后,就會將這條消息從內(nèi)存中刪除。
但是正如上面的網(wǎng)友提到的那樣,如果消費(fèi)者后臺進(jìn)程不在即消費(fèi)者掛了,這時(shí)候RabbiMQ會一直傻等著么?當(dāng)然不會,RabbitMQ發(fā)現(xiàn)消費(fèi)者掛了之后,它會很快將這條消息轉(zhuǎn)而批發(fā)給下一個(gè)消費(fèi)者消費(fèi),這樣做也能夠避免消息丟失的情況。
下面我們啟動了兩個(gè)消費(fèi)者,一個(gè)發(fā)送端,在第二個(gè)消費(fèi)者接收到消息的時(shí)候,手動讓其進(jìn)程結(jié)束,這時(shí)候我們會發(fā)現(xiàn)最終生產(chǎn)的4條消息都被第一個(gè)消費(fèi)者消費(fèi)了,并沒有出現(xiàn)消息丟失的情況。

消息持久化
上面的情況是在RabbitMQ正常提供服務(wù)時(shí)避免了消息丟失的情況,但是如果遇到RabbitMQ服務(wù)掛了,該如何保證消息不丟失呢?這時(shí)候就需要做持久化,細(xì)心的同學(xué)應(yīng)該已經(jīng)發(fā)現(xiàn),在Work模式的發(fā)送端和接收端都做了持久化。****
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));
這里第三個(gè)參數(shù)指明了要持久化
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
這里第二個(gè)參數(shù)知名了要持久化
訂閱者模式

模型組成
一個(gè)消費(fèi)者Producer,一個(gè)交換機(jī)Exchange,多個(gè)消息隊(duì)列Queue,多個(gè)消費(fèi)者Consumer
Exchange
相比較于前兩種模型Hello World和Work,這里多一個(gè)一個(gè)Exchange。其實(shí)Exchange是RabbitMQ的標(biāo)配組成部件之一,前兩種沒有提到Exchange是為了簡化模型,即使模型中沒有看到Exchange的聲明,其實(shí)還是聲明了一個(gè)默認(rèn)的Exchange。
RabbitMQ中實(shí)際發(fā)送消息并不是直接將消息發(fā)送給消息隊(duì)列,消息隊(duì)列也沒那么聰明知道這條消息從哪來要到哪去。RabbitMQ會先將消息發(fā)送個(gè)Exchange,Exchange會根據(jù)這條消息打上的標(biāo)記知道該條消息從哪來到哪去。
Exchange憑什么知道消息的何去何從,因?yàn)镋xchange有幾種類型:direct,fanout,topic和headers。這里說的訂閱者模式就可以認(rèn)為是fanout模式了。
訂閱者模式有何不同
訂閱者模式相對前面的Work模式有和不同?Work也有多個(gè)消費(fèi)者,但是只有一個(gè)消息隊(duì)列,并且一個(gè)消息只會被某一個(gè)消費(fèi)者消費(fèi)。但是訂閱者模式不一樣,它有多個(gè)消息隊(duì)列,也有多個(gè)消費(fèi)者,而且一條消息可以被多個(gè)消費(fèi)者消費(fèi),類似廣播模式。下面通過實(shí)例代碼看看這種模式是如何收發(fā)消息的。
發(fā)送端
/**
* Created by jackie on 17/8/6.
*/
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] strings){
if (strings.length < 1)
return "info: Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);添加了Exchange的聲明,并且采用的是fanout類型
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));聲明了Exchange的名稱,而不是像之前那樣給了個(gè)空值
接收端
/**
* Created by jackie on 17/8/6.
*/
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.3.161");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
這里通過String queueName = channel.queueDeclare().getQueue();來聲明隊(duì)列,采取該種方式會生成一個(gè)隨機(jī)名字的消息隊(duì)列,并且在斷開連接時(shí)隊(duì)列會自動刪除,但是這并不會影響訂閱者模式,因?yàn)樵搱鼍跋滤薪壎ǖ膓ueue都會收到消息
通過channel.queueBind(queueName, EXCHANGE_NAME, "");將新建的Queue和Exchange綁定,因?yàn)槭莊anout模式,所以不需要指定routing key的值

啟動了兩個(gè)隨機(jī)名稱的消費(fèi)者,它們Queue的名稱不同
啟動生產(chǎn)者,發(fā)送一條消息,這時(shí)候可以發(fā)現(xiàn)兩個(gè)接受端都收到了消息,這就是訂閱者模式