RabbitMQ簡(jiǎn)介以及應(yīng)用

一、簡(jiǎn)要介紹

  • 開(kāi)源AMQP實(shí)現(xiàn),Erlang語(yǔ)言編寫(xiě),支持多種客戶端
  • 分布式、高可用、持久化、可靠、安全
  • 支持多種協(xié)議:AMQP、STOMP、MQTT、HTTP
  • 適用于多系統(tǒng)之間的業(yè)務(wù)解耦的消息中間件

二、基本概念

1、exchange:交換器,負(fù)責(zé)接收消息,轉(zhuǎn)發(fā)消息至綁定的隊(duì)列,有四種類(lèi)型:

  • direct:完全匹配的路由
  • topic:模式匹配的路由
  • fanout:廣播模式
  • headers:鍵值對(duì)匹配路由

Exchange屬性:

  • 持久化:如果啟用,那么rabbit服務(wù)重啟之后仍然存在
  • 自動(dòng)刪除:如果啟用,那么交換器將會(huì)在其綁定的隊(duì)列都被刪除掉之后自動(dòng)刪除掉自身

2、Queue:隊(duì)列,rabbitmq的內(nèi)部對(duì)象,用于存儲(chǔ)消息,其屬性類(lèi)似于Exchange,同樣可以設(shè)置是否持久化、自動(dòng)刪除等。
消費(fèi)者重Queue中獲取消息并消費(fèi)。多個(gè)消費(fèi)者可以訂閱同一個(gè)Queue,這時(shí)Queue中的消息會(huì)被平均分?jǐn)偨o多個(gè)消費(fèi)者進(jìn)行處理,而不是每個(gè)消費(fèi)者都收到所有的消息并處理。

3、Binding:綁定,根據(jù)路由規(guī)則綁定交換器與隊(duì)列

4、Routing:路由鍵,路由的關(guān)鍵字

三、消息的可靠性

  • Message acknowledgment:消息確認(rèn),在消息確認(rèn)機(jī)制下,收到回執(zhí)才會(huì)刪除消息,未收到回執(zhí)而斷開(kāi)了連接,消息會(huì)轉(zhuǎn)發(fā)給其他消費(fèi)者,如果忘記回執(zhí),會(huì)導(dǎo)致消息堆積,消費(fèi)者重啟后會(huì)重復(fù)消費(fèi)這些消息并重復(fù)執(zhí)行業(yè)務(wù)邏輯。

  • Message durability:消息持久化,設(shè)置消息持久化可以避免絕大部分消息丟失,比如rabbitmq服務(wù)重啟,但是采用非持久化可以提升隊(duì)列的處理效率。如果要確保消息的持久化,那么消息對(duì)應(yīng)的Exchange和Queue同樣要設(shè)置為持久化。

  • Prefetch count,每次發(fā)送給消費(fèi)者消息的數(shù)量,默認(rèn)為1

另外,如果需要可靠性業(yè)務(wù),需要設(shè)置持久化和ack機(jī)制,如果系統(tǒng)高吞吐,可以設(shè)置為非持久化、noack、自動(dòng)刪除機(jī)制

四、簡(jiǎn)單應(yīng)用

模擬這樣一個(gè)業(yè)務(wù)場(chǎng)景,用戶下單成功后,需要給用戶增加積分,同時(shí)還需要給用戶發(fā)送下單成功的消息,這是在電商業(yè)務(wù)中很常見(jiàn)的一個(gè)業(yè)務(wù)場(chǎng)景。

如果系統(tǒng)是微服務(wù)架構(gòu),可能用戶下單功能在訂單服務(wù),給用戶增加積分的功能在積分服務(wù),給用戶發(fā)送通知消息的功能在通知服務(wù),各個(gè)服務(wù)之間解耦,互不影響。那么要實(shí)現(xiàn)上述的業(yè)務(wù)場(chǎng)景,消息中間件rabbitmq是一個(gè)很好的選擇。
原因如下:

  • 高性能,它的實(shí)現(xiàn)語(yǔ)言是天生具備高并發(fā)高可用的erlang 語(yǔ)言
  • 支持消息的持久化,即使服務(wù)器掛了,也不會(huì)丟失消息
  • 消息應(yīng)答(ack)機(jī)制,消費(fèi)者消費(fèi)完消息后發(fā)送一個(gè)消息應(yīng)答,rabbitmq才會(huì)刪除消息,確保消息的可靠性
  • 支持高可用集群
  • 靈活的路由

實(shí)現(xiàn)思路:

用戶下單成功后,rabbitmq發(fā)送一條消息至EXCHANGE.ORDER_CREATE交換器,該交換器綁定了兩個(gè)隊(duì)列,QUEUE.ORDER_INCREASESCORE、QUEUE.ORDER_NOTIFY,消費(fèi)者訂閱這兩個(gè)隊(duì)列分別用來(lái)處理增加積分、發(fā)送用戶通知。如果后續(xù)日志系統(tǒng)還需要記錄下單的相關(guān)日志,那么我們只需要再定義一個(gè)隊(duì)列并將其綁定到EXCHANGE.ORDER_CREATE即可。

下單發(fā)rabbitmq消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會(huì)跳舞的機(jī)器人
 * @date: 2017/10/13 10:46
 * @description: 模擬用戶下單之后發(fā)送rabbitmq消息
 */
public class OrderCreator {
    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 消息內(nèi)容
    private static String msg = "create order success";

    /**
     * 模擬創(chuàng)建訂單后發(fā)送mq消息
     */
    public void createOrder() {
        System.out.println("下單成功,開(kāi)始發(fā)送rabbitmq消息");

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");

        Connection connection;
        Channel channel;
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 持久化
            boolean durable = true;
            // topic類(lèi)型
            String type = "topic";
            // 聲明交換器,如果交換器不存在則創(chuàng)建之
            channel.exchangeDeclare(EXCHANGE, type, durable);

            String messgeId = UUID.randomUUID().toString();
            // deliveryMode>=2表示設(shè)置消息持久化
            AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2).messageId(messgeId).build();
            // 發(fā)布消息
            String routingKey = "order_create";
            channel.basicPublish(EXCHANGE, routingKey, props, msg.getBytes("utf-8"));
            connection.close();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

積分系統(tǒng)訂閱消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會(huì)跳舞的機(jī)器人
 * @date: 2017/10/13 16:02
 * @description: rabbitmq消費(fèi)者,模擬下單成功后給用戶增加積分
 */
public class IncreaseScoreConsumer implements Consumer {
    private Connection connection;
    private Channel channel;
    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 增加積分隊(duì)列名稱
    private static final String QUEUENAME = "QUEUE.ORDER_INCREASESCORE";


    public void consume() {
        // 初始化rabbitmq連接信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 聲明交換器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 聲明隊(duì)列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交換器與隊(duì)列綁定并設(shè)置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消費(fèi)消息,callback是該類(lèi),關(guān)閉自動(dòng)確認(rèn)消息,在完成業(yè)務(wù)邏輯后手動(dòng)確認(rèn)確認(rèn)
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《積分系統(tǒng)》收到訂單消息:" + msg + ",給用戶增加積分......");
        // 手動(dòng)確認(rèn)消息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);該方法會(huì)丟棄掉隊(duì)列中的這條消息
         * channel.basicReject(envelope.getDeliveryTag(), true);該方法會(huì)把消息重新放回隊(duì)列
         * 一般系統(tǒng)會(huì)設(shè)定一個(gè)重試次數(shù),如果超過(guò)重試次數(shù),則會(huì)丟棄消息,反之則會(huì)把消息再放入隊(duì)列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }

}


通知系統(tǒng)訂閱消息

package com.robot.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author: 會(huì)跳舞的機(jī)器人
 * @date: 2017/10/13 16:20
 * @description: rabbitmq消費(fèi)者,模擬下單成功后給用戶發(fā)送通知
 */
public class NotifyConsumer implements Consumer {
    private Connection connection;
    private Channel channel;

    // 交換器名稱
    private static final String EXCHANGE = "EXCHANGE.ORDER_CREATE";
    // 通知用戶下單成功通知隊(duì)列名稱
    private static final String QUEUENAME = "QUEUE.ORDER_NOTIFY";


    public void consume() {
        // 初始化rabbitmq連接信息
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.12.44");
        connectionFactory.setPort(56720);
        connectionFactory.setUsername("baibei");
        connectionFactory.setPassword("baibei");
        try {
            connection = connectionFactory.newConnection();
            channel = connection.createChannel();
            // 聲明交換器
            channel.exchangeDeclare(EXCHANGE, "topic", true);
            // 聲明隊(duì)列
            channel.queueDeclare(QUEUENAME, true, false, false, null);
            // 交換器與隊(duì)列綁定并設(shè)置routingKey
            channel.queueBind(QUEUENAME, EXCHANGE, "order_create");
            // 消費(fèi)消息,callback是該類(lèi),關(guān)閉自動(dòng)確認(rèn)消息,在完成業(yè)務(wù)邏輯后手動(dòng)確認(rèn)確認(rèn)
            channel.basicConsume(QUEUENAME, false, this);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }

    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String msg = new String(body, "UTF-8");
        System.out.println("《通知系統(tǒng)》收到訂單消息:" + msg + ",開(kāi)始給用戶發(fā)送通知......");
        // 手動(dòng)確認(rèn)消息
        channel.basicAck(envelope.getDeliveryTag(), false);

        /**
         * channel.basicReject(envelope.getDeliveryTag(), false);該方法會(huì)丟棄掉隊(duì)列中的這條消息
         * channel.basicReject(envelope.getDeliveryTag(), true);該方法會(huì)把消息重新放回隊(duì)列
         * 一般系統(tǒng)會(huì)設(shè)定一個(gè)重試次數(shù),如果超過(guò)重試次數(shù),則會(huì)丟棄消息,反之則會(huì)把消息再放入隊(duì)列
         */
    }

    public void handleConsumeOk(String consumerTag) {

    }

    public void handleCancelOk(String consumerTag) {

    }

    public void handleCancel(String consumerTag) throws IOException {

    }

    public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

    }

    public void handleRecoverOk(String consumerTag) {

    }
}

測(cè)試

package com.robot.rabbitmq;

/**
 * @author: 會(huì)跳舞的機(jī)器人
 * @date: 2017/10/13 16:27
 * @description:
 */
public class Test {
    public static void main(String[] args) {

        IncreaseScoreConsumer increaseScoreConsumer = new IncreaseScoreConsumer();
        increaseScoreConsumer.consume();

        NotifyConsumer notifyConsumer = new NotifyConsumer();
        notifyConsumer.consume();

        OrderCreator orderCreator = new OrderCreator();
        for (int i = 0; i < 3; i++) {
            orderCreator.createOrder();
        }
    }
}


輸出:

下單成功,開(kāi)始發(fā)送rabbitmq消息
《積分系統(tǒng)》收到訂單消息:create order success,給用戶增加積分......
《通知系統(tǒng)》收到訂單消息:create order success,開(kāi)始給用戶發(fā)送通知......
下單成功,開(kāi)始發(fā)送rabbitmq消息
《積分系統(tǒng)》收到訂單消息:create order success,給用戶增加積分......
《通知系統(tǒng)》收到訂單消息:create order success,開(kāi)始給用戶發(fā)送通知......
下單成功,開(kāi)始發(fā)送rabbitmq消息
《積分系統(tǒng)》收到訂單消息:create order success,給用戶增加積分......
《通知系統(tǒng)》收到訂單消息:create order success,開(kāi)始給用戶發(fā)送通知......


最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 來(lái)源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評(píng)論 2 34
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,612評(píng)論 19 139
  • 1. 歷史 RabbitMQ是一個(gè)由erlang開(kāi)發(fā)的AMQP(Advanced Message Queue )的...
    高廣超閱讀 6,236評(píng)論 3 51
  • 1 RabbitMQ安裝部署 這里是ErLang環(huán)境的下載地址http://www.erlang.org/down...
    Bobby0322閱讀 2,381評(píng)論 0 11
  • 什么叫消息隊(duì)列 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡(jiǎn)單,比如只包含文本字符串,也可以更復(fù)雜...
    lijun_m閱讀 1,417評(píng)論 0 1

友情鏈接更多精彩內(nèi)容