最近在研究MQ,考慮用RabbitMQ性價(jià)比會(huì)高一些。這次學(xué)習(xí)的途徑是看RabbitMQ官方網(wǎng)站的英文文檔(好歹CET 4 Boy)。基本上就是把文檔的東西翻譯過來,加上點(diǎn)自己的理解。代碼copy來,盡量把自己的注釋寫好。有什么錯(cuò)誤遺漏還請(qǐng)指點(diǎn)~~~
簡(jiǎn)介
RabbitMQ官方解釋是一種消息代理。它可以接受或轉(zhuǎn)發(fā)消息??梢杂每爝f來理解,我們把郵件包裝好送到快遞公司,快遞公司將我們發(fā)的貨物送到收件人的手中。
這里描述三個(gè)角色:Producing、Queue、Consumer
Producing:生產(chǎn)僅僅意味著發(fā)送,發(fā)送消息的程序是生產(chǎn)者。

Queue:隊(duì)列可以理解為RabbitMQ里的郵箱,消息雖然會(huì)貫穿我們的應(yīng)用程序和RabbitMQ,但是它們只能被存儲(chǔ)在隊(duì)列中。隊(duì)列只與主機(jī)的內(nèi)存和磁盤容量綁定,它本質(zhì)上是一個(gè)大型緩沖區(qū)。多個(gè)生產(chǎn)者可以將大量信息傳送到同一個(gè)隊(duì)列中,多個(gè)生產(chǎn)者也可以從同一個(gè)隊(duì)列獲取數(shù)據(jù)。

Consumer:同樣顧名思義,獲取消息的程序是消費(fèi)者

這里我們要注意,生產(chǎn)者、消費(fèi)者、代理不必部署在同一臺(tái)主機(jī)上,實(shí)際上,大多數(shù)應(yīng)用程序里,他們都是被部署在不同主機(jī)上的。
簡(jiǎn)單模式
簡(jiǎn)單模式,就像它的名字一樣很簡(jiǎn)單。我們只需要兩個(gè)程序:一個(gè)代表Producer,它發(fā)送單個(gè)消息。另一個(gè)代表Consumer,它接受這個(gè)消息并且把它打印到控制臺(tái)。官方使用字符串“Hello World”進(jìn)行傳遞。

這里會(huì)用官方提供的Java代碼給出示例,在官方的基礎(chǔ)上我會(huì)加一點(diǎn)注釋,屬于我個(gè)人的一些理解,如有錯(cuò)誤還請(qǐng)大家指出來:
POM.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.rabbitmq</groupId>
<artifactId>itcast-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.4.0.RELEASE</version>
</dependency>
</dependencies>
</project>
Sending程序:

public class Send {
//先給隊(duì)列起名
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)throws IOException {
//創(chuàng)建一個(gè)連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置服務(wù)地址,這里我的服務(wù)是在本地
factory.setHost("localhost");
//也可以設(shè)置賬號(hào)信息,比如用戶名、密碼、virtualHost 這些都可以在management上自己添加、修改。
//如果不設(shè)置用戶名、密碼和vshot,則自動(dòng)使用默認(rèn)的guest。這里和官方相同用guest的
//factory.setVirtualHost("/{這里填vhost}");
//factory.setUsername("{這里是賬戶}");
//factory.setPassword("{這里是密碼}");
//用連接工廠創(chuàng)建一個(gè)連接
Connection connection = factory.newConnection();
//利用這個(gè)連接,創(chuàng)建一個(gè)通道
Channel channel = connection.createChannel();
//用這個(gè)通道聲明(創(chuàng)建)隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//我們要傳的消息
String message = "Hello World!";
//傳送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//關(guān)閉連接和通道
channel.close();
connection.close();
}
}
Receiving程序:
public class Recv {
//隊(duì)列的名字
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws IOException,InterruptedException {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置服務(wù)地址
factory.setHost("localhost");
//...
//用連接工廠獲取連接
Connection connection = factory.newConnection();
//利用這個(gè)連接,獲取通道
Channel channel = connection.createChannel();
//聲明(創(chuàng)建)隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//打印,證明我們的接受程序打開了
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//!!!注意!!!從這里接收消息對(duì)于Consumer。寫的和官方不一樣了,官方的在下面單寫一下。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//接收
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
注意,我們?cè)赗eceiving程序中也聲明了隊(duì)列,原因就是我們可能會(huì)在生產(chǎn)者工作之前啟動(dòng)消費(fèi)者,所以我們希望能在使用生產(chǎn)者之前確保隊(duì)列是確實(shí)存在的!
官方使用的是DefaultConsumer,我們也可以使用QueueingConsumer:
//定義隊(duì)列的消費(fèi)者
QueueingConsumer consumer = new QueueingConsumer(channel);
//監(jiān)聽隊(duì)列
channel.basicConsume(QUEUE_NAME,true,consumer);
//獲取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Receive'"+message+"'");
}
這里翻譯下官方對(duì)于DefaultConsumer的解釋:
DefaultConsumer類實(shí)現(xiàn)了Consumer接口,它的作用是將服務(wù)器向我們推送的消息進(jìn)行緩沖。當(dāng)我們向服務(wù)器要求將消息從隊(duì)列傳遞給我們,由于推送消息的方式是異步的,所以我們提供一個(gè)對(duì)象進(jìn)行回調(diào),它的工作就是對(duì)消息進(jìn)行緩沖,直到我們準(zhǔn)備好去使用這些消息的時(shí)候?yàn)橹?。我們保持消費(fèi)者開啟的狀態(tài),每當(dāng)生產(chǎn)者發(fā)送消息,handleDelivery方法就會(huì)回調(diào)。
再說一下QueueingConsumer。QueueingConsumer繼承了DefaultConsumer。相比較于DefaultConsumer,它使用起來很方便,它能省去重寫handleDelivery的步驟,但官方使用DeafultConsumer就代表比較支持DefaultConsumer這種方式。
QueueingConsumer有他的弊端:
QueueingConsumer內(nèi)部其實(shí)是一個(gè)LinkBlockingQueue,它將從broker端接受到的信息先暫存到這個(gè)LinkBlockingQueue中,然后消費(fèi)端程序在從這個(gè)LinkBlockingQueue中take出消息。試下一下,如果我們不take消息或者說take的非常慢,那么LinkBlockingQueue中的消息就會(huì)越來越多,最終會(huì)造成內(nèi)存溢出。
QueueingConsumer在RabbitMQ流行于3.x版本,但是4.x版本中就Deprecated了,簡(jiǎn)單說就是不支持使用。
StackOverflow上有篇文章詳細(xì)的將兩者進(jìn)行了對(duì)比
[DefaultConsumer vs QueueingConsumer]https://stackoverflow.com/questions/22840247/rabbitmq-java-client-using-defaultconsumer-vs-queueingconsumer/22859778
所以可以根據(jù)具體的情景選擇如何取舍~~~