RabbitMQ基本使用

本節(jié)詳細(xì)講述RabbitMQ的幾個(gè)基本API,圍繞Connection和channel兩個(gè)AMQP協(xié)議接口.詳解以下幾點(diǎn):連接、交換器、隊(duì)列的創(chuàng)建和綁定、消息的發(fā)送和消費(fèi)、消息確認(rèn)和關(guān)閉連接

一、連接RabbitMQ

不貼代碼了,參考上一節(jié)。這里要說一下,channel是非線程安全的,在有些情況下可能會(huì)導(dǎo)致網(wǎng)絡(luò)上出現(xiàn)錯(cuò)誤的通信幀交錯(cuò),同時(shí)也會(huì)影響發(fā)送方確認(rèn)機(jī)制的運(yùn)行。

二、使用Exchange和Queue

交換器和隊(duì)列是AMQP中high-level層面的構(gòu)建模塊,應(yīng)用程序應(yīng)確保使用前已經(jīng)存在了。使用前需要聲明。
生產(chǎn)者和消費(fèi)者都可以聲明一個(gè)交換器和隊(duì)列.如果嘗試聲明一個(gè)已存在的交換器或隊(duì)列,只要聲明的參數(shù)完全匹配,RabbitMQ就會(huì)什么都不做.否則會(huì)拋出異常.

  1. exchange的聲明和刪除
  • 聲明: exchangeDeclare方法詳解
    exchangeDeclare方法有多個(gè)重載方法,都是通過下面這個(gè)方法缺省某些參數(shù)構(gòu)成的
 /**
     * Declare an exchange, via an interface that allows the complete set of
     * arguments.
     * @see com.rabbitmq.client.AMQP.Exchange.Declare
     * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
     * @param exchange the name of the exchange
     * @param type the exchange type
     * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart)
     * @param autoDelete true if the server should delete the exchange when it is no longer in use
     * @param internal true if the exchange is internal, i.e. can't be directly
     * published to by a client.
     * @param arguments other properties (construction arguments) for the exchange
     * @return a declaration-confirm method to indicate the exchange was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Exchange.DeclareOk exchangeDeclare(String exchange,
                                              String type,
                                              boolean durable,
                                              boolean autoDelete,
                                              boolean internal,
                                              Map<String, Object> arguments) throws IOException;

詳解一下幾個(gè)參數(shù):

  • exchange: 交換器名稱

  • type: 交換器類型,上節(jié)提到的direct fanout headers和topic

  • durable: 是否持久化 持久話將會(huì)將交換器存盤,服務(wù)器重啟時(shí)不會(huì)丟失

  • autoDelete: 是否自動(dòng)刪除. 自動(dòng)刪除的前提是至少有一個(gè)隊(duì)列或者交換器與這個(gè)交換器綁定.之后所有的交換器和隊(duì)列與這個(gè)交換器解綁后,該交換器會(huì)自動(dòng)刪除.不能錯(cuò)誤的理解為連接斷開后交換器會(huì)刪除

  • internal: 是否內(nèi)置的 這個(gè)參數(shù)為true時(shí)表示這個(gè)交換器只能由其他交換器發(fā)送消息.不能與客戶端直連

  • arguments: 其他一些結(jié)構(gòu)化參數(shù)

  • 刪除交換器: exchangeDelete

 /**
     * Delete an exchange
     * @see com.rabbitmq.client.AMQP.Exchange.Delete
     * @see com.rabbitmq.client.AMQP.Exchange.DeleteOk
     * @param exchange the name of the exchange
     * @param ifUnused true to indicate that the exchange is only to be deleted if it is unused
     * @return a deletion-confirm method to indicate the exchange was successfully deleted
     * @throws java.io.IOException if an error is encountered
     */
    Exchange.DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
  1. queue的聲明和刪除
    聲明隊(duì)列:
/**
     * Like {@link Channel#queueDeclare(String, boolean, boolean, boolean, java.util.Map)} but sets nowait
     * flag to true and returns no result (as there will be no response from the server).
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @throws java.io.IOException if an error is encountered
     */
    void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException;

其他幾個(gè)參數(shù)含義跟declareExchange相同,說一下exclusive:設(shè)置是否排他.為true隊(duì)列是排他的.如果一個(gè)隊(duì)列是排他的,該隊(duì)列僅對首次聲明他的連接可見.并在連接斷開時(shí)斷連.同一個(gè)連接的不同channel可以訪問該連接創(chuàng)建的排他隊(duì)列.
刪除隊(duì)列:

/**
     * Delete a queue
     * @see com.rabbitmq.client.AMQP.Queue.Delete
     * @see com.rabbitmq.client.AMQP.Queue.DeleteOk
     * @param queue the name of the queue
     * @param ifUnused true if the queue should be deleted only if not in use
     * @param ifEmpty true if the queue should be deleted only if empty
     * @return a deletion-confirm method to indicate the queue was successfully deleted
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;

刪除隊(duì)列中的消息:

/**
     * Purges the contents of the given queue.
     * @see com.rabbitmq.client.AMQP.Queue.Purge
     * @see com.rabbitmq.client.AMQP.Queue.PurgeOk
     * @param queue the name of the queue
     * @return a purge-confirm method if the purge was executed successfully
     * @throws java.io.IOException if an error is encountered
     */
    Queue.PurgeOk queuePurge(String queue) throws IOException;
  1. queueBind exchangeBind queueUnbind exchangeUnbind
  • queueBind方法詳解
/**
     * Bind a queue to an exchange.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue the name of the queue 要綁定的隊(duì)列名
     * @param exchange the name of the exchange 要綁定的交換器名
     * @param routingKey the routing key to use for the binding 綁定時(shí)的routingBind
     * @param arguments other properties (binding parameters) 綁定參數(shù)
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
    Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
  • queueUnbind 可以解綁已經(jīng)綁定的隊(duì)列和交換器
  1. 何時(shí)創(chuàng)建
  • 由生產(chǎn)者和消費(fèi)者聲明
    RabbitMQ 官方建議,生產(chǎn)者和消費(fèi)者都應(yīng)該嘗試創(chuàng)建隊(duì)列.
  • 程序上線前在服務(wù)器上創(chuàng)建好,比如通過頁面管理、RabbitMQ命令或更好的配置中心下發(fā)
    預(yù)先創(chuàng)建的好處:
    免去聲明過程;
    確保交換器和隊(duì)列正確綁定;
    配合mandatory參數(shù)或者備份交換器提高程序的健壯性

三、發(fā)送消息


    /**
     * Publish a message.
     *
     * Publishing to a non-existent exchange will result in a channel-level
     * protocol exception, which closes the channel.
     *
     * Invocations of <code>Channel#basicPublish</code> will eventually block if a
     * <a >resource-driven alarm</a> is in effect.
     *
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @see <a >Resource-driven alarms</a>
     * @param exchange the exchange to publish the message to
     * @param routingKey the routing key
     * @param mandatory true if the 'mandatory' flag is to be set
     * @param immediate true if the 'immediate' flag is to be
     * set. Note that the RabbitMQ server does not support this flag.
     * @param props other properties for the message - routing headers etc
     * @param body the message body
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;

  • exchange: 交換器名稱
  • RoutingKey: 路由鍵
  • props: 消息的基本屬性集 包括14個(gè)屬性
        private String contentType;
        private String contentEncoding;
        private Map<String,Object> headers;
        private Integer deliveryMode;
        private Integer priority;
        private String correlationId;
        private String replyTo;
        private String expiration;
        private String messageId;
        private Date timestamp;
        private String type;
        private String userId;
        private String appId;
        private String clusterId;
  • byte[] 消息體
  • mandatory和immediate下節(jié)講述

四、消費(fèi)消息

 /**
     * Start a non-nolocal, non-exclusive consumer.
     * @param queue the name of the queue 隊(duì)列名稱
     * @param autoAck true if the server should consider messages 是否自動(dòng)確認(rèn).建議設(shè)置成false
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param consumerTag a client-generated consumer tag to establish context 消費(fèi)者標(biāo)簽,用來區(qū)分不同消費(fèi)者
     * @param callback an interface to the consumer object 回調(diào)函數(shù),用來處理推送來的消息
     * @return the consumerTag associated with the new consumer
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
     */
    String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

五、消費(fèi)端的確認(rèn)和拒絕

為了保證消息從隊(duì)列可靠地到達(dá)消費(fèi)者,RabbitMQ提供了消息確認(rèn)機(jī)制.當(dāng)自動(dòng)確認(rèn)autoack參數(shù)為false時(shí),消費(fèi)者客戶端需要顯示的調(diào)用basicAck確認(rèn)消費(fèi).RabbitMQ不會(huì)為未確認(rèn)的消息設(shè)置過期時(shí)間.它判斷此消息是否需要重新投遞的唯一依據(jù)是該消費(fèi)者的連接是否已斷開.
RabbitMQ提供了basicReject方法來告訴RabbitMQ拒絕一個(gè)消息.定義如下:

/**
     * Reject a message. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
     * containing the received message being rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Reject
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param requeue true if the rejected message should be requeued rather than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicReject(long deliveryTag, boolean requeue) throws IOException;

當(dāng)requeue為true時(shí),拒絕的消息將重新存入隊(duì)列,發(fā)送給下一個(gè)訂閱的消費(fèi)者.
這里提供一個(gè)demo:

public void consumeMessage() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        boolean autoAck = false;
        channel.basicConsume("test_queue", autoAck, "testConsumerTag", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(envelope);
                System.out.println(new String(body));
                long tag = envelope.getDeliveryTag();
                if (tag / 2 == 0) {
                    channel.basicReject(tag, true);
                } else {
                    channel.basicAck(envelope.getDeliveryTag(), true);
                }
            }
        });
         RabbitMQUtils.close(connection, channel);
    }

rabbitMQ還提供了basicNack方法.可以批量拒絕消息
注意:
basicReject或者basicNack中的requeue設(shè)置為false時(shí),可以啟用"死信隊(duì)列"功能.死信隊(duì)列可以通過檢測被拒絕或者未送達(dá)的消息來追蹤問題.
channel.basicReject

六、關(guān)閉連接

RabbitMQ為channel和connection提供了addShutdownListener方法對連接的關(guān)閉進(jìn)行回調(diào).如下:

public void closeChannel() throws IOException {
        Connection connection = ConnectionCreator.getConnection();
        Channel channel = connection.createChannel();
        channel.addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException cause) {
                System.out.println(cause);
            }
        });
        RabbitMQUtils.close(channel);
    }

附:
兩個(gè)測試時(shí)省代碼的封裝類

  • 創(chuàng)建連接:
package com.pctf.basic;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionCreator {
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 5672;
    private static final String USER_NAME = "root";
    private static final String PASSWORD = "123456";

    private static final ConnectionFactory connectionFactory;

    static {
        connectionFactory = new ConnectionFactory();
        connectionFactory.setPassword(PASSWORD);
        connectionFactory.setUsername(USER_NAME);
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(PORT);
    }

    public static Connection getConnection() {
        try {
            return connectionFactory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }
}
  • 關(guān)閉連接:
package com.pctf.utils;

import java.io.Closeable;
import java.io.IOException;

public class RabbitMQUtils {

    public static void close(AutoCloseable... closeables) {
        if (closeables != null) {
            for (AutoCloseable c : closeables) {
                if (c != null) {
                    try {
                        c.close();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評論 2 11
  • 本文章翻譯自http://www.rabbitmq.com/api-guide.html,并沒有及時(shí)更新。 術(shù)語對...
    joyenlee閱讀 7,806評論 0 3
  • 什么叫消息隊(duì)列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡單,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,438評論 0 24
  • 我一直在鼓勵(lì)家長擁抱自己的孩子,因?yàn)槲沂亲黾彝ソ逃模宜芤蟮闹挥屑议L,相信很多讀我文字的人都會(huì)明白。 可是在...
    王書朋閱讀 800評論 1 1
  • 我會(huì)為你唱情歌到聲沙,靜靜看著你鬧的笑話,給你說一輩子情話,酸,甜,苦和辣,你不會(huì)害怕,不會(huì)害怕,讓我做孩子的...
    怒濤閱讀 589評論 0 2

友情鏈接更多精彩內(nèi)容