RabbitMq
AMQP,即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開(kāi)發(fā)語(yǔ)言等條件的限制。Erlang中的實(shí)現(xiàn)有 RabbitMQ等。
1.簡(jiǎn)介
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開(kāi)源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務(wù)器是用Erlang語(yǔ)言編寫(xiě)的,而群集和故障轉(zhuǎn)移是構(gòu)建在開(kāi)放電信平臺(tái)框架上的。所有主要的編程語(yǔ)言均有與代理接口通訊的客戶端庫(kù)。
2.使用
2.1 核心概念
Message :消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、 priority(相對(duì)于其他消息的優(yōu)先權(quán))、 delivery-mode(指出 該消息可能需要持久性存儲(chǔ))等。
Publisher:消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序
Exchange:交換器,用來(lái)接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。Exchange有4種類型: direct(默認(rèn)), fanout, topic, 和headers,不同類型的Exchange轉(zhuǎn)發(fā)消息的策略有所區(qū)別
Queue:消息隊(duì)列,用來(lái)保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。
Binding:綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連 接起來(lái)的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。Exchange 和Queue的綁定可以是多對(duì)多的關(guān)系。
Connection:網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。
Channel:信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)的虛擬連接, AMQP 命令都是通過(guò)信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過(guò)信道完成。因?yàn)閷?duì)于操作系統(tǒng)來(lái)說(shuō)建立和銷毀 TCP 都是非常昂貴的開(kāi)銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
Consumer:消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
Virtual Host:虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。 vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。

2.2 簡(jiǎn)單模式

一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者
* 獲取連接
* @return Connection
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.1.235");
factory.setPort(5672);
//設(shè)置vhost
factory.setVirtualHost("/tzb");
factory.setUsername("test");
factory.setPassword("123456");
//通過(guò)工廠獲取連接
Connection connection = factory.newConnection();
return connection;
}
?
//創(chuàng)建隊(duì)列,發(fā)送消息
public static void main(String[] args) throws Exception {
//獲取連接
Connection connection = ConnectionUtil.getConnection();
//創(chuàng)建通道
Channel channel = connection.createChannel();
//聲明創(chuàng)建隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//消息內(nèi)容
String message = "Hello World!";
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發(fā)送消息:"+message);
//關(guān)閉連接和通道
channel.close();
connection.close();
}
? //消費(fèi)者消費(fèi)消息
public static void main(String[] args) throws Exception {
//獲取連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明通道
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//定義消費(fèi)者
QueueingConsumer consumer = new QueueingConsumer(channel);
//監(jiān)聽(tīng)隊(duì)列
channel.basicConsume(QUEUE_NAME,true,consumer);
?
while(true){
//這個(gè)方法會(huì)阻塞住,直到獲取到消息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("接收到消息:"+message);
}
}
2.3 work模式

一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,每個(gè)消費(fèi)者獲取到的消息唯一
public static void main(String[] args) throws Exception {
//獲取連接和通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
//聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message = "";
for(int i = 0; i<100; i++){
message = "" + i;
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("發(fā)送消息:"+message);
Thread.sleep(i);
}
?
channel.close();
connection.close();
}
?
//消費(fèi)者1
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
?
//同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)端
channel.basicQos(1);
?
QueueingConsumer consumer = new QueueingConsumer(channel);
?
channel.basicConsume(QUEUE_NAME,false,consumer);
?
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(100);
//消息消費(fèi)完給服務(wù)器返回確認(rèn)狀態(tài),表示該消息已被消費(fèi)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
?
//生產(chǎn)者2
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
?
channel.basicQos(1);
?
QueueingConsumer consumer = new QueueingConsumer(channel);
?
channel.basicConsume(QUEUE_NAME,true,consumer);
?
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("recive1:"+message);
Thread.sleep(10);
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
消息消費(fèi)的兩種模式
1、 自動(dòng)模式
消費(fèi)者從消息隊(duì)列獲取消息后,服務(wù)端就認(rèn)為該消息已經(jīng)成功消費(fèi)。
2、 手動(dòng)模式
消費(fèi)者從消息隊(duì)列獲取消息后,服務(wù)端并沒(méi)有標(biāo)記為成功消費(fèi) ? 消費(fèi)者成功消費(fèi)后需要將狀態(tài)返回到服務(wù)端
2.4 訂閱模式
一個(gè)生產(chǎn)者發(fā)送的消息會(huì)被多個(gè)消費(fèi)者獲取。
生產(chǎn)者:可以將消息發(fā)送到隊(duì)列或者是交換機(jī)。
消費(fèi)者:只能從隊(duì)列中獲取消息。
如果消息發(fā)送到?jīng)]有隊(duì)列綁定的交換機(jī)上,那么消息將丟失。

2.5 路由模式
1、 發(fā)送消息到交換機(jī)并且要指定路由key
2、 消費(fèi)者將隊(duì)列綁定到交換機(jī)時(shí)需要指定路由key
是一種完全匹配,只有匹配到的消費(fèi)者才能消費(fèi)消息
