rabbitmq介紹

RabbitMq

amqp協(xié)議

AMQP,即Advanced Message Queuing Protocol,一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開(kāi)發(fā)語(yǔ)言等條件的限制。Erlang中的實(shí)現(xiàn)有 RabbitMQ等。

1.簡(jiǎn)介

RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開(kāi)源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務(wù)器是用Erlang語(yǔ)言編寫(xiě)的,而群集和故障轉(zhuǎn)移是構(gòu)建在開(kāi)放電信平臺(tái)框架上的。所有主要的編程語(yǔ)言均有與代理接口通訊的客戶端庫(kù)。

2.使用

2.1 核心概念

Message :消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、 priority(相對(duì)于其他消息的優(yōu)先權(quán))、 delivery-mode(指出 該消息可能需要持久性存儲(chǔ))等。

Publisher:消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序

Exchange:交換器,用來(lái)接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。Exchange有4種類型: direct(默認(rèn)), fanout, topic, 和headers,不同類型的Exchange轉(zhuǎn)發(fā)消息的策略有所區(qū)別

Queue:消息隊(duì)列,用來(lái)保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。

Binding:綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連 接起來(lái)的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。Exchange 和Queue的綁定可以是多對(duì)多的關(guān)系。

Connection:網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。

Channel:信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)的虛擬連接, AMQP 命令都是通過(guò)信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過(guò)信道完成。因?yàn)閷?duì)于操作系統(tǒng)來(lái)說(shuō)建立和銷毀 TCP 都是非常昂貴的開(kāi)銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。

Consumer:消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。

Virtual Host:虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。 vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。

2019-06-14_145924.png

2.2 簡(jiǎn)單模式

dubbo-service-governance.jpg

一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者

 * 獲取連接
 * @return Connection
 * @throws Exception
 */
 public static Connection getConnection() throws Exception {
 //定義連接工廠
 ConnectionFactory factory = new ConnectionFactory();
 factory.setHost("192.168.1.235");
 factory.setPort(5672);
 //設(shè)置vhost
 factory.setVirtualHost("/tzb");
 factory.setUsername("test");
 factory.setPassword("123456");
 //通過(guò)工廠獲取連接
 Connection connection = factory.newConnection();
 return connection;
 }
?
 //創(chuàng)建隊(duì)列,發(fā)送消息
 public static void main(String[] args) throws Exception {
 //獲取連接
 Connection connection = ConnectionUtil.getConnection();
 //創(chuàng)建通道
 Channel channel = connection.createChannel();
 //聲明創(chuàng)建隊(duì)列
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 //消息內(nèi)容
 String message = "Hello World!";
 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 System.out.println("發(fā)送消息:"+message);
 //關(guān)閉連接和通道
 channel.close();
 connection.close();
 }
? //消費(fèi)者消費(fèi)消息
 public static void main(String[] args) throws Exception {
 //獲取連接和通道
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 //聲明通道
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 //定義消費(fèi)者
 QueueingConsumer consumer = new QueueingConsumer(channel);
 //監(jiān)聽(tīng)隊(duì)列
 channel.basicConsume(QUEUE_NAME,true,consumer);
?
 while(true){
 //這個(gè)方法會(huì)阻塞住,直到獲取到消息
 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 String message = new String(delivery.getBody());
 System.out.println("接收到消息:"+message);
 }
 }

2.3 work模式

2019-06-14_175154.png

一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,每個(gè)消費(fèi)者獲取到的消息唯一

public static void main(String[] args) throws Exception {
 //獲取連接和通道
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 //聲明隊(duì)列
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
 String message = "";
 for(int i = 0; i<100; i++){
 message = "" + i;
 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
 System.out.println("發(fā)送消息:"+message);
 Thread.sleep(i);
 }
?
 channel.close();
 connection.close();
 }
?
 //消費(fèi)者1
 public static void main(String[] args) throws Exception {
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
?
 //同一時(shí)刻服務(wù)器只發(fā)送一條消息給消費(fèi)端
 channel.basicQos(1);
?
 QueueingConsumer consumer = new QueueingConsumer(channel);
?
 channel.basicConsume(QUEUE_NAME,false,consumer);
?
 while(true){
 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 String message = new String(delivery.getBody());
 System.out.println("recive1:"+message);
 Thread.sleep(100);
 //消息消費(fèi)完給服務(wù)器返回確認(rèn)狀態(tài),表示該消息已被消費(fèi)
 channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 }
 }
?
 //生產(chǎn)者2
 public static void main(String[] args) throws Exception {
 Connection connection = ConnectionUtil.getConnection();
 Channel channel = connection.createChannel();
 channel.queueDeclare(QUEUE_NAME,false,false,false,null);
?
 channel.basicQos(1);
?
 QueueingConsumer consumer = new QueueingConsumer(channel);
?
 channel.basicConsume(QUEUE_NAME,true,consumer);
?
 while(true){
 QueueingConsumer.Delivery delivery = consumer.nextDelivery();
 String message = new String(delivery.getBody());
 System.out.println("recive1:"+message);
 Thread.sleep(10);
 //channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
 }
 }

消息消費(fèi)的兩種模式

1、 自動(dòng)模式

消費(fèi)者從消息隊(duì)列獲取消息后,服務(wù)端就認(rèn)為該消息已經(jīng)成功消費(fèi)。

2、 手動(dòng)模式

消費(fèi)者從消息隊(duì)列獲取消息后,服務(wù)端并沒(méi)有標(biāo)記為成功消費(fèi) ? 消費(fèi)者成功消費(fèi)后需要將狀態(tài)返回到服務(wù)端

2.4 訂閱模式

一個(gè)生產(chǎn)者發(fā)送的消息會(huì)被多個(gè)消費(fèi)者獲取。

生產(chǎn)者:可以將消息發(fā)送到隊(duì)列或者是交換機(jī)。

消費(fèi)者:只能從隊(duì)列中獲取消息。

如果消息發(fā)送到?jīng)]有隊(duì)列綁定的交換機(jī)上,那么消息將丟失。

2019-06-14_175701.png

2.5 路由模式

1、 發(fā)送消息到交換機(jī)并且要指定路由key

2、 消費(fèi)者將隊(duì)列綁定到交換機(jī)時(shí)需要指定路由key

是一種完全匹配,只有匹配到的消費(fèi)者才能消費(fèi)消息

2019-06-14_175852.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 關(guān)于消息隊(duì)列,從前年開(kāi)始斷斷續(xù)續(xù)看了些資料,想寫(xiě)很久了,但一直沒(méi)騰出空,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    raysonfang閱讀 501評(píng)論 0 0
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評(píng)論 2 11
  • 1.關(guān)于RabbitMQ## RabbitMQ是一個(gè)開(kāi)源的消息代理和隊(duì)列服務(wù)器,用來(lái)通過(guò)普通協(xié)議在完全不同的應(yīng)用之...
    樸下柔閱讀 10,762評(píng)論 5 18
  • 消息服務(wù)擅長(zhǎng)于解決多系統(tǒng)、異構(gòu)系統(tǒng)間的數(shù)據(jù)交換(消息通知/通訊)問(wèn)題,你也可以把它用于系統(tǒng)間服務(wù)的相互調(diào)用(RPC...
    馮艷輝brook閱讀 669評(píng)論 0 51
  • 職場(chǎng)中,我們必須學(xué)會(huì)用碎片化時(shí)間完成系統(tǒng)性作業(yè)。 【大叔的碎片化寫(xiě)作心得】 1篇文章=1min靈感+5min構(gòu)思+...
    Krystal_6eef閱讀 162評(píng)論 0 0

友情鏈接更多精彩內(nèi)容