[用官方文檔學(xué)習(xí)RabbitMQ]——1.RabbitMQ的簡(jiǎn)單模式

最近在研究MQ,考慮用RabbitMQ性價(jià)比會(huì)高一些。這次學(xué)習(xí)的途徑是看RabbitMQ官方網(wǎng)站的英文文檔(好歹CET 4 Boy)。基本上就是把文檔的東西翻譯過來,加上點(diǎn)自己的理解。代碼copy來,盡量把自己的注釋寫好。有什么錯(cuò)誤遺漏還請(qǐng)指點(diǎn)~~~

簡(jiǎn)介

RabbitMQ官方解釋是一種消息代理。它可以接受或轉(zhuǎn)發(fā)消息??梢杂每爝f來理解,我們把郵件包裝好送到快遞公司,快遞公司將我們發(fā)的貨物送到收件人的手中。
這里描述三個(gè)角色:Producing、Queue、Consumer
Producing:生產(chǎn)僅僅意味著發(fā)送,發(fā)送消息的程序是生產(chǎn)者。

producer

Queue:隊(duì)列可以理解為RabbitMQ里的郵箱,消息雖然會(huì)貫穿我們的應(yīng)用程序和RabbitMQ,但是它們只能被存儲(chǔ)在隊(duì)列中。隊(duì)列只與主機(jī)的內(nèi)存和磁盤容量綁定,它本質(zhì)上是一個(gè)大型緩沖區(qū)。多個(gè)生產(chǎn)者可以將大量信息傳送到同一個(gè)隊(duì)列中,多個(gè)生產(chǎn)者也可以從同一個(gè)隊(duì)列獲取數(shù)據(jù)。

queue

Consumer:同樣顧名思義,獲取消息的程序是消費(fèi)者
Consumer

這里我們要注意,生產(chǎn)者、消費(fèi)者、代理不必部署在同一臺(tái)主機(jī)上,實(shí)際上,大多數(shù)應(yīng)用程序里,他們都是被部署在不同主機(jī)上的。
簡(jiǎn)單模式
簡(jiǎn)單模式,就像它的名字一樣很簡(jiǎn)單。我們只需要兩個(gè)程序:一個(gè)代表Producer,它發(fā)送單個(gè)消息。另一個(gè)代表Consumer,它接受這個(gè)消息并且把它打印到控制臺(tái)。官方使用字符串“Hello World”進(jìn)行傳遞。

簡(jiǎn)單模式

這里會(huì)用官方提供的Java代碼給出示例,在官方的基礎(chǔ)上我會(huì)加一點(diǎn)注釋,屬于我個(gè)人的一些理解,如有錯(cuò)誤還請(qǐng)大家指出來:

POM.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>cn.itcast.rabbitmq</groupId>
  <artifactId>itcast-rabbitmq</artifactId>
  <version>0.0.1-SNAPSHOT</version>

  <dependencies>
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.4.1</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
    </dependency>
    <dependency>
      <groupId>org.apache.commons</groupId>
      <artifactId>commons-lang3</artifactId>
      <version>3.3.2</version>
    </dependency>

    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>1.4.0.RELEASE</version>
    </dependency>

  </dependencies>
</project>

Sending程序:

Sending程序
public class Send {
  //先給隊(duì)列起名
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)throws IOException {
    //創(chuàng)建一個(gè)連接工廠
    ConnectionFactory factory = new ConnectionFactory();
    //設(shè)置服務(wù)地址,這里我的服務(wù)是在本地
    factory.setHost("localhost");
    //也可以設(shè)置賬號(hào)信息,比如用戶名、密碼、virtualHost 這些都可以在management上自己添加、修改。
    //如果不設(shè)置用戶名、密碼和vshot,則自動(dòng)使用默認(rèn)的guest。這里和官方相同用guest的
    //factory.setVirtualHost("/{這里填vhost}");
    //factory.setUsername("{這里是賬戶}");
    //factory.setPassword("{這里是密碼}");
    //用連接工廠創(chuàng)建一個(gè)連接
    Connection connection = factory.newConnection();
    //利用這個(gè)連接,創(chuàng)建一個(gè)通道
    Channel channel = connection.createChannel();
    //用這個(gè)通道聲明(創(chuàng)建)隊(duì)列
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    //我們要傳的消息
    String message = "Hello World!";
    //傳送
    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    System.out.println(" [x] Sent '" + message + "'");
    //關(guān)閉連接和通道
    channel.close();
    connection.close();
  }
}    

Receiving程序:

public class Recv {
  //隊(duì)列的名字
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv) throws IOException,InterruptedException {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //設(shè)置服務(wù)地址
        factory.setHost("localhost");
        //...
        //用連接工廠獲取連接
        Connection connection = factory.newConnection();
        //利用這個(gè)連接,獲取通道
        Channel channel = connection.createChannel();
        //聲明(創(chuàng)建)隊(duì)列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //打印,證明我們的接受程序打開了
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //!!!注意!!!從這里接收消息對(duì)于Consumer。寫的和官方不一樣了,官方的在下面單寫一下。
        Consumer consumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope,BasicProperties properties, byte[] body)throws IOException {
          String message = new String(body, "UTF-8");
          System.out.println(" [x] Received '" + message + "'");
        }
      };
      //接收
      channel.basicConsume(QUEUE_NAME, true, consumer);
   }
}

注意,我們?cè)赗eceiving程序中也聲明了隊(duì)列,原因就是我們可能會(huì)在生產(chǎn)者工作之前啟動(dòng)消費(fèi)者,所以我們希望能在使用生產(chǎn)者之前確保隊(duì)列是確實(shí)存在的!

官方使用的是DefaultConsumer,我們也可以使用QueueingConsumer:

        //定義隊(duì)列的消費(fèi)者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //監(jiān)聽隊(duì)列
        channel.basicConsume(QUEUE_NAME,true,consumer);
        //獲取消息
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Receive'"+message+"'");
        }

這里翻譯下官方對(duì)于DefaultConsumer的解釋:

DefaultConsumer類實(shí)現(xiàn)了Consumer接口,它的作用是將服務(wù)器向我們推送的消息進(jìn)行緩沖。當(dāng)我們向服務(wù)器要求將消息從隊(duì)列傳遞給我們,由于推送消息的方式是異步的,所以我們提供一個(gè)對(duì)象進(jìn)行回調(diào),它的工作就是對(duì)消息進(jìn)行緩沖,直到我們準(zhǔn)備好去使用這些消息的時(shí)候?yàn)橹?。我們保持消費(fèi)者開啟的狀態(tài),每當(dāng)生產(chǎn)者發(fā)送消息,handleDelivery方法就會(huì)回調(diào)。

再說一下QueueingConsumer。QueueingConsumer繼承了DefaultConsumer。相比較于DefaultConsumer,它使用起來很方便,它能省去重寫handleDelivery的步驟,但官方使用DeafultConsumer就代表比較支持DefaultConsumer這種方式。

QueueingConsumer有他的弊端:

QueueingConsumer內(nèi)部其實(shí)是一個(gè)LinkBlockingQueue,它將從broker端接受到的信息先暫存到這個(gè)LinkBlockingQueue中,然后消費(fèi)端程序在從這個(gè)LinkBlockingQueue中take出消息。試下一下,如果我們不take消息或者說take的非常慢,那么LinkBlockingQueue中的消息就會(huì)越來越多,最終會(huì)造成內(nèi)存溢出。

QueueingConsumer在RabbitMQ流行于3.x版本,但是4.x版本中就Deprecated了,簡(jiǎn)單說就是不支持使用。
StackOverflow上有篇文章詳細(xì)的將兩者進(jìn)行了對(duì)比
[DefaultConsumer vs QueueingConsumer]https://stackoverflow.com/questions/22840247/rabbitmq-java-client-using-defaultconsumer-vs-queueingconsumer/22859778
所以可以根據(jù)具體的情景選擇如何取舍~~~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,544評(píng)論 19 139
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,512評(píng)論 2 34
  • 背景介紹 Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O...
    高廣超閱讀 13,051評(píng)論 8 167
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,583評(píng)論 0 34
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 586,597評(píng)論 51 787

友情鏈接更多精彩內(nèi)容