RabbitMQ

RabbitMQ

存在的問題:比如客戶端調用商品服務添加一條數(shù)據(jù)到數(shù)據(jù)庫,數(shù)據(jù)庫添加成功后商品服務還必須要等待搜索服務、緩存服務都完成后才能給客戶端響應結果。RabbitMQ就是為了解決這種問題。商品服務完成添加數(shù)據(jù)到數(shù)據(jù)庫之后可以將其余的消息交給RabbitMQ,然后商品服務就可以先向客戶端響應結果,RabbitMQ再通知搜索服務和緩存服務添加數(shù)據(jù)。

一、RabbitMQ安裝

用docker-compose安裝即可

version: "3.1"

services: 
  rabbitmq:
    image: daocloud.io/library/rabbitmq:management
    restart: always
    container_name: rabbitmq
    ports:
      - 5672:5672
      - 15672:15672  # 這個端口是RabbitMQ自帶的圖形監(jiān)控界面
    volumes:
      - ./mydata/rabbitmq:/var/lib/rabbitmq
      

二、RabbitMQ架構

2.1 架構圖

由生產(chǎn)者發(fā)送消息到一個指定Exchange中,然后指定Exchange和某個Queue的路由關系,Exchange就會將消息發(fā)送到Queue中,消費者和Virtual Host建立連接,和Queue建立Channel后,就能拿到最新的消息。

注意:一個Queue中的消息只會被一個消費者消費一次(即消費完之后消息直接出隊),另一個消費者消費不到同一個Queue中的同一條消息

2.2 查看圖形化界面

直接訪問http://ip:15672,默認用戶/密碼都是guest,是一個管理員用戶。

在界面的Connections可以查看客戶端與Virtual Host建立的連接;在Channels可以查看建立連接后建立的管道;在Exchange可以查看已有的Exchange,如果發(fā)送消息的時候沒有指定Exchange,會默認用 “/”D的Exchange;在Queues可以查看隊列,隊列需要我們手動創(chuàng)建;在Admin,可以添加其他用戶,可以添加Virtual Host,可以指定哪個用戶可以管理哪幾個Virtual Host。

三、RabbitMQ的使用

3.1 RabbitMQ的路由策略

  1. HelloWorld
  2. WorkQueues
  3. Publish/Subscribe
  4. Routing
  5. Topics
  6. Publisher confirms

3.2 Java連接RabbitMQ

Step1:創(chuàng)建Maven工程,導入RabbitMQ依賴

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

Step2:創(chuàng)建一個用于建立Connection的工具類

public class RabbitMQUtils {
    public static Connection getConnection {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.199.109");  // 指定ip
        factory.setPort(5672);  // 指定端口
        factory.setUsername("test");  // 指定用戶名密碼
        factory.setPassword("test");
        factory.setVirtualHost("/test");  // 指定當前用戶管理的Virtual Host
        
        Connection connection = null;
        connection = factory.newConnection();
        
        return connection;
    }
}

Step3:測試

@Test
public void testConnection() {
    Connection connection = RabbitMQUtils.getConnection();
    connection.close();  // 在這里打個斷點, 就可以在圖形化界面查看到建立的連接
}

3.3 HelloWorld路由

一個生產(chǎn)者,創(chuàng)建一個Channel,一個默認的Exchange(不用手動指定),一個Queue,一個消費者。

Step1:創(chuàng)建生產(chǎn)者,創(chuàng)建一個Channel,發(fā)布消息到Exchange,指定路由規(guī)則

public class Productor {
    @Test
    public void publish() {
        // 1. 獲取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        // 3. 發(fā)布消息到Exchange, 同時指定路由規(guī)則
        String msg = "一條消息";
        /**
         * 四個參數(shù): 
            (1) 指定Exchange, 如果要使用默認的就給一個空串
            (2) 指定路由規(guī)則, 使用具體的隊列名稱(當使用默認Exchange的時候, 這個參數(shù)同時也是隊列名)
                                          (當使用自定義Exchange的時候, 這個參數(shù)就寫Exchange名)
            (3) 指定傳遞消息所攜帶的屬性, 沒有給就null
            (4) 指定發(fā)布的具體消息, byte[]類型
         */
        channel.basicPublish("", "HelloWorld", null, msg.getBytes());
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step2:創(chuàng)建消費者,創(chuàng)建一個Channel,創(chuàng)建一個Queue,并且去消費當前隊列

public class Costumer {
    @Test
    public void consume() {
        // 1. 獲取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        // 3. 聲明隊列
        
        /**
         * 五個參數(shù): 
            (1) 指定隊列名稱
            (2) 當前隊列是否持久化(只有在Queue中才能持久化, Exchange中不行)
            (3) 是否排外, 即當前隊列是否只能被一個消費者消費
            (4) 如果這個隊列沒有消費者在消費, 則隊列自動刪除
            (5) 指定當前隊列的其他信息
         */
        channel.queueDeclare("HelloWorld", true, false, false, null);
        
        // 4. 開啟監(jiān)聽Queue
        // 4.1 消息回調方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                System.out.println("接受到消息" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
            }
        }
        // 4.2 開始監(jiān)聽
        /**
         * 三個參數(shù): 
            (1) 指定消費哪個隊列
            (2) 指定是否自動ACK(即接收到消息以后會立即告訴RabbitMQ該消息已經(jīng)被消費掉)
            (3) 指定消息回調犯法(即消費者具體要做的事情)
         */
        channel.basicConsume("HelloWorld", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step3:測試。注意:先啟動消費者再啟動生產(chǎn)者,不然無法發(fā)送消息到Exchange,因為發(fā)送消息到Exchange需要Queue

3.4 WorkQueues路由

一個生產(chǎn)者,一個默認的Exchange,一個Queue,多個消費者

Step1:創(chuàng)建生產(chǎn)者,同上

Step2:創(chuàng)建消費者,同上。不同的是要創(chuàng)建兩個消費者

/**
 * 二號消費者
 */
public class Costumer2 {
    @Test
    public void consume() {
        // 1. 獲取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        // 3. 聲明隊列
        channel.queueDeclare("WorkQueues", true, false, false, null);
        // 3.1 可以指定當前消費者一次能消費多少條消息, 如果不指定就是兩個消費者均攤
        channel.basicQos(2);
        // 4. 開啟監(jiān)聽Queue
        // 4.1 消息回調方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                System.out.println("消費者二號接受到消息" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
            }
        }
        // 4.2 開始監(jiān)聽
        channel.basicConsume("Work", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

3.5 Publish/Subscribe路由

一個生產(chǎn)者,一個自定義的Exchange,多個Queue,多個消費者。此時兩個Queue中的消息是一模一樣的。

Step1:創(chuàng)建生產(chǎn)者,需要手動創(chuàng)建一個Exchange

public class Productor {
    @Test
    public void publish() {
        // 1. 獲取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        // 3. 創(chuàng)建Exchange
        /**
         * 參數(shù)1: 指定Exchange的名稱; 參數(shù)2: 指定Exchange的類型
           FANOUT - 對應Publish/Subscribe路由
           DIRECT - 對應Routing路由
           TOPIC - 對應Topics路由
         */
        channel.exchangeDeclare("pubsub_exchange", BuiltinExchangeType.FANOUT);
        
        // 4. 將自定義的Exchange和Queue綁定起來(也可以在消費者中綁定)
        //  參數(shù)3是RouteKey, Publish/Subscribe路由的RouteKey給空串即可
        channel.queueBind("pubsub_queue1", "pubsub_exchange", "");
        channel.queueBind("pubsub_queue2", "pubsub_exchange", "");
        
        // 5. 發(fā)布消息到Exchange, 同時指定路由規(guī)則
        for (int i = 0; i < 10; i++) {
            String msg = "一條消息" + i;
            channel.basicPublish("pubsub_exchange", "Work", null, msg.getBytes());
        }
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step2:創(chuàng)建消費者,同時指定當前消費者消費的是哪個隊列

/**
 * 二號消費者
 */
public class Costumer2 {
    @Test
    public void consume() {
        // 1. 獲取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        // 3. 聲明隊列
        channel.queueDeclare("pubsub_queue2", true, false, false, null);

        // 4. 開啟監(jiān)聽Queue
        // 4.1 消息回調方法
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                System.out.println("消費者二號接受到消息" + new String(body, "UTF-8"));
                channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
            }
        }
        // 4.2 開始監(jiān)聽
        channel.basicConsume("pubsub_queue2", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

3.6 Routing路由

一個生產(chǎn)者,一自定義的Exchange,多個Queue,多個消費者,Exchange和Queue之間不是直接綁定,而是通過RouteKey綁定,就是指定哪些類型的消息發(fā)送到哪個Queue中,此時兩個Queue中的消息不一定相同。

Step1:創(chuàng)建生產(chǎn)者,綁定時指定Queue對應的RouteKey

public class Productor {
    @Test
    public void publish() {
        // 1. 獲取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        // 3. 創(chuàng)建Exchange
        /**
         * 參數(shù)1: 指定Exchange的名稱; 參數(shù)2: 指定Exchange的類型
           FANOUT - 對應Publish/Subscribe路由
           DIRECT - 對應Routing路由
           TOPIC - 對應Topics路由
         */
        channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
        
        // 4. 將自定義的Exchange和Queue綁定起來(也可以在消費者中綁定)
        //  參數(shù)3指定每個隊列對應的RouteKey
        channel.queueBind("routing_queue1", "routing_exchange", "ERROR");
        channel.queueBind("routing_queue2", "routing_exchange", "INFO");
        
        // 5. 發(fā)布消息到Exchange, 同時指定RouteKey, Exchange會根據(jù)RouteKey把消息發(fā)送到對應的Queue
        channel.basicPublish("pubsub_exchange", "ERROR", null, "error msg1".getBytes());
        channel.basicPublish("pubsub_exchange", "INFO", null, "info msg1".getBytes());
        channel.basicPublish("pubsub_exchange", "INFO", null, "info msg2".getBytes());
        channel.basicPublish("pubsub_exchange", "INFO", null, "info msg3".getBytes());
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step2:創(chuàng)建消費者,同3.5

3.7 Topics路由

同Routing路由方式,只是RouteKey的形式不同

Step1:創(chuàng)建生產(chǎn)者

public class Productor {
    @Test
    public void publish() {
        // 1. 獲取Connection
        Connection connection = RabbitMQUtils.getConnection();
        // 2. 創(chuàng)建Channel
        Channel channel = connection.createChannel();
        // 3. 創(chuàng)建Exchange
        /**
         * 參數(shù)1: 指定Exchange的名稱; 參數(shù)2: 指定Exchange的類型
           FANOUT - 對應Publish/Subscribe路由
           DIRECT - 對應Routing路由
           TOPIC - 對應Topics路由
         */
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        // 4. 將自定義的Exchange和Queue綁定起來(也可以在消費者中綁定)
        //  參數(shù)3指定每個隊列對應的RouteKey
        /**
         * 關于此時的RouteKey:
         *  "*.red.*": 表示有三個條件, 只要中間的條件符合就可以將消息發(fā)送到該隊列, *是占位符
         *  "fast.#": #是通配符, 效果等同于"fast.*.*"
         */
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        channel.queueBind("topics_queue2", "topics_exchange", "fast.#");
        
        // 5. 發(fā)布消息到Exchange, 同時指定RouteKey, Exchange會根據(jù)RouteKey把消息發(fā)送到對應的Queue
        channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快紅猴".getBytes());
        channel.basicPublish("topics_exchange", "fast.white.dog", null, "快白狗".getBytes());
        channel.basicPublish("topics_exchange", "slow.red.cat", null, "快紅猴".getBytes());
        // 此時第1條、第3條消息會發(fā)送給topics_queue1
        // 第1條、第2條消息會發(fā)送給topics_queue2
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

四、SpringBoot整合RabbitMQ

4.1 快速入門

Step1:創(chuàng)建SpringBoot工程,導入SpringBoot整合RabbitMQ的依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Step2:編寫配置

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test

Step3:編寫配置類,聲明Exchange和Queue,并且綁定

@Configuration
public class RabbitMQConfig {
    // 創(chuàng)建Topic路由的Exchange
    @Bean("TOPIC_EXCHANGE")
    public TopicExchange getTopicExchange() {
        /**
         * 參數(shù)1: 指定Exchange的名稱
         * 參數(shù)2: 是否持久化
         * 參數(shù)3: 是否自動刪除
         */
        return new TopicExchange("boot_topic_exchange", true, false);
    }
    
    // 創(chuàng)建Queue1
    @Bean("TOPIC_QUEUE1")
    public Queue getQueue1() {
        /**
         * 參數(shù)1: 指定Queue名稱
         * 參數(shù)2: 是否持久化
         * 參數(shù)3: 是否排外
         * 參數(shù)4: 是否自動刪除
         * 參數(shù)5: 攜帶的參數(shù)
         */
        return new Queue("boot_topic_queue1", true, false, false, null);
    }
    
    // 創(chuàng)建Queue2
    @Bean("TOPIC_QUEUE2")
    public Queue getQueue2() {
        return new Queue("boot_topic_queue2", true, false, false, null);
    }
    
    // 綁定
    @Bean
    public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
    }
    
    // 綁定
    @Bean
    public Binding getBindingSlow(@Qualifier("TOPIC_QUEUE2") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("slow.*.*");
    }
}

Step4:準備生產(chǎn)者,發(fā)送消息

public class Productor {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void publish() {
        /**
         * 參數(shù)1: 指定發(fā)送到的Exchange名稱
         * 參數(shù)2: 指定此次發(fā)送的RouteKey
         * 參數(shù)3: 具體消息
         */
        rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快紅狗");
        rabbitTemplate.convertAndSend("boot_topic_exchange", "slow.white.pig", "慢白豬");
        System.in.read();
    }
}

Step5:準備消費者

@Component
public class Costumer {
    // 指定當前消費者監(jiān)聽的隊列, SpringBoot工程啟動后就會一直監(jiān)聽
    @RabbitListener(queues = "boot_topic_queue1")
    public void consumer1(Object msg) {
        // 該方法內(nèi)是消費者接收到消息之后的具體動作
        System.out.println("消費者1號接收到消息" + msg);
    }
    
    @RabbitListener(queues = "boot_topic_queue2")
    public void consumer2(Object msg) {
        System.out.println("消費者2號接收到消息" + msg);
    }
}

Step6:測試。啟動SpringBoot工程,可以在打印中查看消費者打印的消息,也可以在監(jiān)控界面中查看消息的發(fā)送和被消費的情況

4.2 在SpringBoot中手動ACK

Step1:修改配置

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手動ACK配置

Step2:修改消費者的接收行為

@RabbitListener(queues = "boot_topic_queue1")
public void consumer1(String msg, Channel channel, Message message) throws IOException{
    System.out.println("消費者1號接收到消息" + msg);
    // 手動ACK
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

五、RabbitMQ的其他操作

5.1 消息的可靠性

  1. Q:如果消息已經(jīng)到達了RabbitMQ,但是RabbitMQ宕機了,消息會丟失嗎?

    A:不會,RabbitMQ的Queue有持久化機制

  2. Q:消費者在消費消息時,如果執(zhí)行到一半,消費者宕機了怎么辦?

    A:消費者中最好使用手動ACK來避免消息沒消息完卻宕機的情況,這樣消息就還會存在隊列中

  3. Q:生產(chǎn)者發(fā)送消息時,由于網(wǎng)絡問題,導致消息沒發(fā)送到RabbitMQ怎么辦?

    A:RabbitMQ提供了事務操作和Confirm機制,可以保證生產(chǎn)者把消息發(fā)送到Exchange(但是事務操作效率太低,主要用Confirm機制)

5.1.1 RabbitMQ的Confirm機制

可以保證生產(chǎn)者把消息發(fā)送到Exchange

5.1.1.1 普通Confirm方式

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        // Step1:在消息發(fā)送之前, 開啟Confirm
        channel.confirmSelect();
        
        channel.basicPublish("topics_exchange", "fast.red.monkey", null, "快紅猴".getBytes());
        
        // Step2:在消息發(fā)送之后確認消息是否發(fā)送成功
        if (channel.waitForConfirms()) {
            /**
             * 消息發(fā)送成功時的處理
             */
        } else {
            /**
             * 消息發(fā)送失敗時的處理
             */
        }
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.1.2 批量Confirm方式

主要用于發(fā)送多條消息時,只要有一條發(fā)送失敗,則全部發(fā)送失敗,拋出異常。

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        // Step1:在消息發(fā)送之前, 開啟Confirm
        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            String msg = "快紅猴" + i;
            channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
        }
        
        // Step2:在消息發(fā)送之后確認消息是否發(fā)送成功
        channel.waitForConfirmsOrDie();  // 該方法不會返回布爾值, 如果有一條發(fā)送失敗就會直接拋異常
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.1.3 異步Confirm方式

因為是異步的,所以不管生產(chǎn)者的消息是否發(fā)送成功,都會接著執(zhí)行生產(chǎn)者后面的代碼,單獨有會有一個線程出來進行Confirm判斷。效率最高,最常用。

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        // Step1:在消息發(fā)送之前, 開啟Confirm
        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            String msg = "快紅猴" + i;
            channel.basicPublish("topics_exchange", "fast.red.monkey", null, msg.getBytes());
        }
        
        // Step2:在消息發(fā)送之后確認消息是否發(fā)送成功
        channel.addConfirmListener(new ConfirmListener {
            // 消息發(fā)送成功時的回調方法
            @Override
            public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息發(fā)送成功。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
            }
            
            // 消息發(fā)送失敗時的回調方法
            @Override
            public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息發(fā)送失敗。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
            }
        });
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.2 RabbitMQ的Return機制

Confirm機制只能保證消息到達Exchange,無法保證消息可以被Exchange分發(fā)到Queue。而Exchange是不能持久化消息的,Queue才可以持久化消息。所以可以使用Return機制保證Exchange把消息送達到指定的Queue

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        
        // Step1:在開啟Confirm之前先開啟Return
        channel.addReturnListener(new ReturnListener() {
            // 消息沒有到達Queue時執(zhí)行的回調方法
            @Override
            public void handleReturn(int replyCode, String exchange, 
                                     String routingKey, AMQP.BasicProperties properties,
                                     byte[] msg) throws Exception {
                System.out.println(new String(msg, "UTF-8") + "沒有送到到Queue中");
            }
        })
        
        // Step2:在消息發(fā)送之前, 開啟Confirm
        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            String msg = "快紅猴" + i;
            // 在發(fā)送消息時, 在RouteKey參數(shù)后面追加一個參數(shù), 置為true表示開啟Return
            channel.basicPublish("topics_exchange", "fast.red.monkey", true, null, msg.getBytes());
        }
        
        // Step3:在消息發(fā)送之后確認消息是否發(fā)送成功
        channel.addConfirmListener(new ConfirmListener {
            @Override
            public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息發(fā)送成功。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息發(fā)送失敗。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
            }
        });
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

5.1.3 在SpringBoot中實現(xiàn)Confirm和Return

Step1:修改配置文件

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手動ACK配置
    publisher-confirm-type: simple
    publisher-returns: true

Step2:編寫配置類,指定RabbitTemplate對象,開啟Confirm和Return,并編寫回調方法

@Component  // 這里不能用Configuration, 因為要實現(xiàn)接口
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTempalte;
    
    // 定義初始化方法, 這個方法在構建對象時會執(zhí)行
    @PostConstruct
    public void initMethod() {
        // 給RabbitTemplate指定Confirm和Return的回調方法
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            /**
             * 消息發(fā)送成功時的處理
             */
        } else {
            /**
             * 消息發(fā)送失敗時的處理
             */
        }
    }
    
    @Override
    public void returnMessage(Message message, int replyCode, String replyText,
                              String exchange, String routingKey) {
        /**
         * Exchange中的消息沒有送達到Queue時的處理
         */
    }
    
}

Step3:生產(chǎn)者和消費者不變

5.2 消息重復消費

重復消費同一個Queue的消息,會對非冪等行操作(增刪)造成問題;重復消費消息的原因是,消費者沒有給RabbitMQ一個Ack。

為了解決重復消費消息的問題,可以采用Redis,在消費者消費消息之前,先將消息的id作為key放到Redis中(用setnx方法,key不存在就創(chuàng)建key,key存在則獲取key的value),并把對應的value置0,表示正在執(zhí)行任務,等任務執(zhí)行完畢之后可以把value置1。如果消費者一號ACK失敗,在RabbitMQ將消息交給消費者二號時,會先執(zhí)行setnx方法判斷key是否存在,如果key存在則獲取key的value,如果value是0,則消費者二號就什么都不做,如果value是1,則表示消費者一號已經(jīng)執(zhí)行完了任務,但是ACK失敗,消費者二號直接執(zhí)行ACK幫消費者一號ACK即可。

極端情況:消費者一號出現(xiàn)了死鎖,則會一直卡在key存在且value=0的情況。解決方法是:在setnx設置key的時候,給key指定上一個生存時間即可。

5.2.1 實現(xiàn)避免消息重復消費

Step1:在Docker中啟動Redis,然后在項目中導入Redis的依賴

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>3.1.0</version>
</dependency>

Step2:生產(chǎn)者在發(fā)送消息前指定上消息的ID

public class Productor {
    @Test
    public void publish() throws Exception{
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics_exchange", BuiltinExchangeType.TOPIC);
        
        channel.queueBind("topics_queue1", "topics_exchange", "*.red.*");
        
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String exchange, 
                                     String routingKey, AMQP.BasicProperties properties,
                                     byte[] msg) throws Exception {
                System.out.println(new String(msg, "UTF-8") + "沒有送到到Queue中");
            }
        })

        channel.confirmSelect();
        for (int i = 0 ; i < 100; i++) {
            // 創(chuàng)建一個properties屬性
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                                            .deliveryMode(1)  // 指定消息是否需要持久化, 1表示需要持久化, 2不需要
                                            .messageId(UUID.randomUUID().toString()) // 給每一條消息隨機分配了一個UUID
                                            .build();
            String msg = "快紅猴" + i;
            // 將properties屬性放到第4個參數(shù)就可以隨著消息一起發(fā)出
            channel.basicPublish("topics_exchange", "fast.red.monkey", true, properties, msg.getBytes());
        }

        channel.addConfirmListener(new ConfirmListener {
            @Override
            public void handleAck(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息發(fā)送成功。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
            }

            @Override
            public void handleNack(long deliveryTag, boolean mutiple) throws IOException {
                System.out.println("消息發(fā)送失敗。 標識為:" + deliveryTag + "是否是批量:" + mutiple);
            }
        });
            
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step3:消費者調用Redis進行操作

public class Costumer1 {
    @Test
    public void consume() {
        Connection connection = RabbitMQUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("WorkQueues", true, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelopo, 
                          AMQP.BasicProperties properties, byte[] body) throws IOException{
                Jedis jedis = new Jedis("192.168.199.109", 6379);  // 連接Redis
                String messageId = properties.getMessageId();  // 取出properties中的messageId
                // 1. setnx到Redis中, 默認指定Value為0, 指定生存時間為10秒
                String result = jedis.set(messageId, "0", "NX", "EX", 10);
                if (result != null && result.equalsIgnoreCase("OK")) {
                    System.out.println("消費者一號接受到消息" + new String(body, "UTF-8"));
                    // 2. 消費成功, set messageId的value為1
                    jedis.set(messageId, "1");
                    channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
                } else {
                    // 3. 如果1中的setnx失敗, 獲取key對應的value
                    //    如果是0, 則直接return, 如果是1則先ACK再return
                    String value = jedis.get(messageId);
                    if ("1".equalsIgnoreCase(s)) {
                        channel.basicAck(envelope.getDeliveryTag(), false); // 手動ACK
                    }
                }
            }
        }

        channel.basicConsume("topics_queue1", false, consumer);
        
        System.in.read();
        channel.close();
        connection.close();
    }
}

Step4:測試,啟動生產(chǎn)者和消費者,然后到Redis中查看是否有新的key

5.2.2 在SpringBoot實現(xiàn)避免消息重復消費

Step1:導入SpringBoot整合Redis的依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

Step2:在配置文件中連接Redis

spring:
  redis:
    host: 192.168.199.109
    port: 6379

Step3:生產(chǎn)者發(fā)送消息之前指定消息的ID

public class Productor {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Test
    public void publish() {
        // 給消息的屬性分配一個隨機的UUID
        CorrelationData messageId = new CorrelationData(UUID.randomUUID.toString());
        // 把messageId放入第4個參數(shù)
        rabbitTemplate.convertAndSend("boot_topic_exchange", "fast.red.dog", "快紅狗", messageId);
        System.in.read();
    }
}

Step4:消費者調用Redis進行操作

@Component
public class Costumer {
    // 用StringRedisTemplate可以直接用String存入Redis(可以不用轉為字節(jié)數(shù)組)
    @Autowired
    private StringRedisTemplate redisTemplate;  
    
    @RabbitListener(queues = "boot_topic_queue1")
    public void consumer1(String msg, Channel channel, Message message) throws IOException{
        // 先取出messageId
        String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
        // 1. 設置key到Redis
        if (redisTemplate.opsForValue().setIfAbsent(messageId, "0", 10, TimeUnit.SECONDS)) {
            System.out.println("消費者1號接收到消息" + msg);
            // 2. 消費成功, set messageId的value為1
            redisTemplate.opsForValue().set(messageId, "1");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手動ACK
        } else {
            // 3. 如果1中的setnx失敗, 獲取key對應的value
            //    如果是0, 則直接return, 如果是1則先ACK再return
            if ("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);  // 手動ACK
            }
        }
    }
}

Step5:測試,啟動生產(chǎn)者和消費者,然后到Redis中查看是否有新的key

六、RabbitMQ的簡單應用

例子:添加一條數(shù)據(jù)時,客戶模塊在MySQL添后會調用搜索模塊在ES中也添加

客戶模塊:

Step1:在客戶模塊的SpringBoot工程中導入SpringBoot整合RabbitMQ的依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Step2:編寫配置文件連接RabbitMQ

spring:
  rabbitmq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手動ACK配置

Step3:編寫配置類

@Configuration
public class RabbitMQConfig {
    // 創(chuàng)建Topic路由的Exchange
    @Bean("TOPIC_EXCHANGE")
    public TopicExchange getTopicExchange() {
        return new TopicExchange("openapi_customer_exchange", true, false);
    }
    
    // 創(chuàng)建Queue1
    @Bean("TOPIC_QUEUE1")
    public Queue getQueue1() {
        return new Queue("openapi_customer_queue", true, false, false, null);
    }
    
    // 綁定
    @Bean
    public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
    }
}

Step4:在service中直接把消息發(fā)送給Exchange(生產(chǎn)者)(要先注入RabbitTemplate)

// 原來的寫法: 直接把消息發(fā)送到
/**
String json = JSON.toJSON(customer);
HttpHeaders headers = new HttpHeaders();
headers.setContenType(MediaType.parseMediaType("application/json;charset=utf-8"));
HttpEntity<String> entity = new HttpEntity<>(json.headers);
restTemplate.postForObject("http://localhost:8080/search/customer/add", entity, String.class);
*/

// 新的寫法: 將消息發(fā)送到Exchange
rabbitTemplate.converAndSend("openapi_customer_exchange", "openapi.customer.add", JSON.toJSON(customer));

搜索模塊:

Step1:在客戶模塊的SpringBoot工程中導入SpringBoot整合RabbitMQ的依賴

<!-- RabbitMQ -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Step2:編寫配置文件,連接RabbitMQ和Redis

spring:
  rabbtimq:
    host: 192.168.199.109
    port: 5672
    username: test
    password: test
    virtual-host: /test
    listener:
      simple:
        ackonwledge-mode: manual  # 手動ACK配置

Step3:編寫配置類

@Configuration
public class RabbitMQConfig {
    // 創(chuàng)建Topic路由的Exchange
    @Bean("TOPIC_EXCHANGE")
    public TopicExchange getTopicExchange() {
        return new TopicExchange("openapi_customer_exchange", true, false);
    }
    
    // 創(chuàng)建Queue1
    @Bean("TOPIC_QUEUE1")
    public Queue getQueue1() {
        return new Queue("openapi_customer_queue", true, false, false, null);
    }
    
    // 綁定
    @Bean
    public Binding getBindingRed(@Qualifier("TOPIC_QUEUE1") Queue queue,
                              @Qualifier("TOPIC_EXCHANGE") TopicExchange topicExchange) {
        // 指定Queue, 指定Exchange, 指定RouteKey
        return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
    }
}

Step4:準備消費者

@Component
public class CostumerListener {
    @Autowired
    private CustomerService customerService;
    // 指定當前消費者監(jiān)聽的隊列, SpringBoot工程啟動后就會一直監(jiān)聽
    @RabbitListener(queues = "openapi_customer_queue")
    public void consumer1(String msg, Channel channel, Message message) throws IOException{
        // 1. 獲取RoutingKey
        String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
        // 2. 根據(jù)RoutingKey來選擇調用的方法
        switch (receivedRoutingKey) {
            case "openapi.customer.add":
                // 3. 調用Service完成添加數(shù)據(jù)到ES(先把json字符串轉為Customer對象再存)
                customerService.saveCustomer(JSON.parseJSON(msg, Customer.class));
                // 4. 手動ACK
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                break;
            /**
             * ......
             * 其他RoutingKey對應的處理
             */
        }
    }
}
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容