[原創(chuàng)]AMQP-RabbitMQ/2/工作隊(duì)列

2. 工作隊(duì)列 Work queues

Distributing tasks among workers

消息將發(fā)送給c1或者c2


image.png

# 個(gè)人理解

  • 生產(chǎn)者定義Queue,并向該隊(duì)列發(fā)送消息

  • 多個(gè)消費(fèi)者可以從指定的同一個(gè)Queue中讀取消息。每條消息只會(huì)發(fā)送給其中某一個(gè)消費(fèi)者。

  • 生產(chǎn)者

package com.futao.springmvcdemo.mq.rabbit.workqueue;

import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 簡(jiǎn)單發(fā)送者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class Send {
    @SneakyThrows
    public static void main(String[] args) {
        @Cleanup
        Connection connection = RabbitMqConnectionTools.getConnection();
        @Cleanup
        Channel channel = connection.createChannel();
        //開(kāi)啟持久化消息
        boolean durable = true;
        //定義一個(gè)隊(duì)列
        channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
        String msg = "Hello RabbitMq!";
        for (int i = 0; i < 20; i++) {
            channel.basicPublish("", RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), null, (msg + i).getBytes());
            log.info("Send msg:[{}] success", (msg + i));
        }
    }
}
  • 消費(fèi)者1 - 每1秒處理一條
package com.futao.springmvcdemo.mq.rabbit.workqueue;

import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 簡(jiǎn)單消費(fèi)者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class RecvOne {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //開(kāi)啟持久化消息
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment應(yīng)答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(1000);
            } catch (Exception e) {

            }
        });
        //關(guān)閉自動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 消費(fèi)者2 - 每2秒處理一條
package com.futao.springmvcdemo.mq.rabbit.workqueue;

import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;

/**
 * 簡(jiǎn)單消費(fèi)者
 *
 * @author futao
 * Created on 2019-04-22.
 */
@Slf4j
public class RecvTwo {
    @SneakyThrows
    public static void main(String[] args) {
        Channel channel = RabbitMqConnectionTools.getChannel();
        //開(kāi)啟持久化消息
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
        log.info("Waiting for message...");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
            log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
            //acknowledgment應(yīng)答
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            try {
                Thread.sleep(2000);
            } catch (Exception e) {

            }
        });
        //關(guān)閉自動(dòng)應(yīng)答
        boolean autoAck = false;
        channel.basicConsume(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
        });
    }
}
  • 結(jié)果
    • 生產(chǎn)者


      生產(chǎn)者日志
    • 消費(fèi)者1日志


      消費(fèi)者1日志
    • 消費(fèi)者2日志


      消費(fèi)者2日志
  • 特點(diǎn): 多個(gè)消費(fèi)者之間,不論消息的處理速度,都是平均分發(fā)(公平分發(fā))。你一個(gè),我一個(gè),他一個(gè)。此時(shí)是公平隊(duì)列
  • 注意:
    • 定義隊(duì)列的時(shí)候,設(shè)置是否開(kāi)啟消息的持久化(該設(shè)置需要同時(shí)在生產(chǎn)者和消費(fèi)者設(shè)置)
       //開(kāi)啟持久化消息
        boolean durable = true;
        channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
  • 如果消息隊(duì)列已經(jīng)存在,則不可以修改相應(yīng)的配置,必須刪除原有的隊(duì)列,或者新建一個(gè)新的隊(duì)列。
  • 關(guān)閉自動(dòng)應(yīng)答(開(kāi)啟手動(dòng)應(yīng)答),可以防止消息在未被正確消費(fèi)的情況下被Rabbitmq從隊(duì)列內(nèi)存中刪除。

# 實(shí)現(xiàn)工作隊(duì)列下的非公平隊(duì)列

消費(fèi)者設(shè)置一次只發(fā)送一條消息,并且在被正確消費(fèi)之前發(fā)繼續(xù)發(fā)送下一條消息。從而使得消費(fèi)快的消費(fèi)者比消費(fèi)慢的消費(fèi)者消費(fèi)更多的消息

        //告訴rabbitmq一次只發(fā)送一條消息,并且在前一個(gè)消息未被處理或者消費(fèi)之前,不繼續(xù)發(fā)送下一個(gè)消息
        channel.basicQos(1);
  • 測(cè)試結(jié)果
    image.png

    此時(shí)明顯打破了消息的公平分發(fā),消費(fèi)快的消費(fèi)者接收到的消息更多。
    如果有兩個(gè)消費(fèi)者,其中一個(gè)ConsumerA設(shè)置了Qos=1,另一個(gè)ConsumerB沒(méi)有設(shè)置。經(jīng)過(guò)我的測(cè)試,ConsumerA會(huì)獲得大量的消息,都積壓在ConsumerA,而ConsumerB獲得消息很少。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容