RabbitMQ (三)Hello World

RabbitMQ官網(wǎng)中文版教程:

http://rabbitmq.mr-ping.com/tutorials_with_python/[1]Hello_World.html

上述教程示例為pathon版,Java版及相應(yīng)解釋如下:

生產(chǎn)者代碼

package com.xc.rabbitmq.helloworld;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 消息生產(chǎn)者
 *
 * Created by xc.
 */
public class Producer {

    private final static String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 地址
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("xc");
        factory.setPassword("xc");
        // 創(chuàng)建一個新的連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個新的頻道
        Channel channel = connection.createChannel();
        // 聲明一個隊列 -- 在 RabbitMQ 中, 隊列的聲明是冪等性的
        // 一個冪等操作的特點是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同
        // 也就是說, 如果不存在, 就創(chuàng)建, 如果存在, 不會對已經(jīng)存在的隊列產(chǎn)生任何影響
        // 如果并不知道是生產(chǎn)者還是消費者程序中的哪個先運行,在程序中重復(fù)將隊列重復(fù)聲明一下是種值得推薦的做法
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello world";
        // 發(fā)送到消息隊列中
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("P [x] Sent '" + message + "'");
        // 關(guān)閉頻道和連接
        channel.close();
        connection.close();
    }

}

程序執(zhí)行結(jié)果如下圖(rabbitmq Web管理界面):

RabbitMQ中收到了一條消息。

消費者代碼

package com.xc.rabbitmq.helloworld;

import com.rabbitmq.client.*;

import java.io.IOException;


/**
 * 消費者
 *
 * Created by xc.
 */
public class Consumer {

    private static final String QUEUE_NAME = "Hello";

    public static void main(String[] args) throws Exception {
        // 創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置 RabbitMQ 地址
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("rabbit");
        factory.setPassword("carrot");
        // 創(chuàng)建一個新的連接
        Connection connection = factory.newConnection();
        // 創(chuàng)建一個頻道
        Channel channel = connection.createChannel();
        // 聲明要關(guān)注的隊列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        System.out.println("C [*] Waiting for messages. To exit press CTRL + C");

        // DefaultConsumer類 實現(xiàn)了 Consumer 接口, 通過傳入一個頻道, 告訴服務(wù)器我們需要哪個頻道的消息
        // 如果頻道中有消息, 就會執(zhí)行回調(diào)函數(shù) handleDelivery
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("C [x] Received '" + message + "'");
            }
        };

        // 自動回復(fù)隊列應(yīng)答 -- RabbitMQ 中的消息確認(rèn)機(jī)制, 后面章節(jié)會詳細(xì)介紹
        channel.basicConsume(QUEUE_NAME, true, consumer);
    }
}

程序執(zhí)行結(jié)果如下圖(rabbitmq Web管理界面):

如圖,Hello隊列中的消息被消費掉了。

程序的maven依賴:

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>

API參數(shù)解釋:

1) Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;

@param queue : the name of the queue 隊列名

@param durable :true if we are declaring a durable queue (the queue will survive a server restart) 隊列是否持久化

@param exclusive : true if we are declaring an exclusive queue (restricted to this connection) 是否為當(dāng)前連接的專用隊列,在連接斷開后,會自動刪除該隊列

@param autoDelete : true if we are declaring an autodelete queue (server will delete it when no longer in use) 當(dāng)沒有任何消費者使用時,自動刪除該隊列

@param arguments : other properties (construction arguments) for the queue 其他隊列配置

2)void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;

@param exchange the exchange to publish the message to 消息發(fā)送到哪個交換器(之后章節(jié)會講到)。填空字符串表示指定默認(rèn)交換器,MQ會根據(jù)routingKey生成一個默認(rèn)的direct類型的交換器。

@param routingKey :the routing key 路由鍵,#匹配0個或多個單詞,*匹配一個單詞,在topic exchange做消息轉(zhuǎn)發(fā)用

@param mandatory :true if the 'mandatory' flag is to be set 如果exchange根據(jù)自身類型和消息routeKey無法找到一個符合條件的queue,那么會調(diào)用basic.return方法將消息返還給生產(chǎn)者。false:出現(xiàn)上述情形broker會直接將消息扔掉

@param immediate :true if the 'immediate' flag is to be set. Note that the RabbitMQ server does not support this flag. 如果exchange在將消息route到queue(s)時發(fā)現(xiàn)對應(yīng)的queue上沒有消費者,那么這條消息不會放入隊列中。當(dāng)與消息routeKey關(guān)聯(lián)的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產(chǎn)者。

@param props other properties for the message - routing headers etc 需要注意的是BasicProperties.deliveryMode,0:不持久化 1:持久化 這里指的是消息的持久化,配合exchange(durable=true),queue(durable)可以實現(xiàn),即使服務(wù)器宕機(jī),消息仍然保留。

@param body the message body 具體消息

簡單來說:mandatory標(biāo)志告訴服務(wù)器至少將該消息route到一個隊列中,否則將消息返還給生產(chǎn)者;immediate標(biāo)志告訴服務(wù)器如果該消息關(guān)聯(lián)的queue上有消費者,則馬上將消息投遞給它,如果所有queue都沒有消費者,直接把消息返還給生產(chǎn)者,不用將消息入隊列等待消費者了。

  1. String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

@param queue the name of the queue 隊列名

@param autoAck true if the server should consider messages acknowledged once delivered; false if the server should expect explicit acknowledgements 是否自動ACK。如果不自動ack,需要使用channel.ack、channel.nack、channel.basicReject 進(jìn)行消息應(yīng)答

@param callback an interface to the consumer object 消息消費的回調(diào)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,512評論 2 34
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,537評論 19 139
  • 什么叫消息隊列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,415評論 0 1
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒有及時更新。 術(shù)語對...
    joyenlee閱讀 7,804評論 0 3
  • AMQP大致內(nèi)容就是,將消息和隊列綁定起來,規(guī)定讓進(jìn)入到交換機(jī)中的具有某個路由鍵的消息進(jìn)入到指定隊列中去。 Rab...
    StevenMD閱讀 1,988評論 0 3

友情鏈接更多精彩內(nèi)容