2. 工作隊(duì)列 Work queues
Distributing tasks among workers
消息將發(fā)送給c1或者c2

image.png
# 個(gè)人理解
生產(chǎn)者定義Queue,并向該隊(duì)列發(fā)送消息
多個(gè)消費(fèi)者可以從指定的同一個(gè)Queue中讀取消息。每條消息只會(huì)發(fā)送給其中某一個(gè)消費(fèi)者。
生產(chǎn)者
package com.futao.springmvcdemo.mq.rabbit.workqueue;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 簡(jiǎn)單發(fā)送者
*
* @author futao
* Created on 2019-04-22.
*/
@Slf4j
public class Send {
@SneakyThrows
public static void main(String[] args) {
@Cleanup
Connection connection = RabbitMqConnectionTools.getConnection();
@Cleanup
Channel channel = connection.createChannel();
//開(kāi)啟持久化消息
boolean durable = true;
//定義一個(gè)隊(duì)列
channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
String msg = "Hello RabbitMq!";
for (int i = 0; i < 20; i++) {
channel.basicPublish("", RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), null, (msg + i).getBytes());
log.info("Send msg:[{}] success", (msg + i));
}
}
}
- 消費(fèi)者1 - 每1秒處理一條
package com.futao.springmvcdemo.mq.rabbit.workqueue;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 簡(jiǎn)單消費(fèi)者
*
* @author futao
* Created on 2019-04-22.
*/
@Slf4j
public class RecvOne {
@SneakyThrows
public static void main(String[] args) {
Channel channel = RabbitMqConnectionTools.getChannel();
//開(kāi)啟持久化消息
boolean durable = true;
channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
log.info("Waiting for message...");
DeliverCallback deliverCallback = ((consumerTag, message) -> {
log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
//acknowledgment應(yīng)答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
try {
Thread.sleep(1000);
} catch (Exception e) {
}
});
//關(guān)閉自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
});
}
}
- 消費(fèi)者2 - 每2秒處理一條
package com.futao.springmvcdemo.mq.rabbit.workqueue;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqConnectionTools;
import com.futao.springmvcdemo.mq.rabbit.RabbitMqQueueEnum;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
/**
* 簡(jiǎn)單消費(fèi)者
*
* @author futao
* Created on 2019-04-22.
*/
@Slf4j
public class RecvTwo {
@SneakyThrows
public static void main(String[] args) {
Channel channel = RabbitMqConnectionTools.getChannel();
//開(kāi)啟持久化消息
boolean durable = true;
channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
log.info("Waiting for message...");
DeliverCallback deliverCallback = ((consumerTag, message) -> {
log.info("收到消息:[{}],tag:[{}]", new String(message.getBody()), consumerTag);
//acknowledgment應(yīng)答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
try {
Thread.sleep(2000);
} catch (Exception e) {
}
});
//關(guān)閉自動(dòng)應(yīng)答
boolean autoAck = false;
channel.basicConsume(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), autoAck, deliverCallback, consumerTag -> {
});
}
}
- 結(jié)果
-
生產(chǎn)者
生產(chǎn)者日志 -
消費(fèi)者1日志
消費(fèi)者1日志 -
消費(fèi)者2日志
消費(fèi)者2日志
-
- 特點(diǎn): 多個(gè)消費(fèi)者之間,不論消息的處理速度,都是平均分發(fā)(公平分發(fā))。你一個(gè),我一個(gè),他一個(gè)。此時(shí)是公平隊(duì)列
- 注意:
- 定義隊(duì)列的時(shí)候,設(shè)置是否開(kāi)啟消息的持久化(該設(shè)置需要同時(shí)在生產(chǎn)者和消費(fèi)者設(shè)置)
//開(kāi)啟持久化消息
boolean durable = true;
channel.queueDeclare(RabbitMqQueueEnum.WORK_QUEUE.getQueueName(), durable, false, false, null);
- 如果消息隊(duì)列已經(jīng)存在,則不可以修改相應(yīng)的配置,必須刪除原有的隊(duì)列,或者新建一個(gè)新的隊(duì)列。
- 關(guān)閉自動(dòng)應(yīng)答(開(kāi)啟手動(dòng)應(yīng)答),可以防止消息在未被正確消費(fèi)的情況下被Rabbitmq從隊(duì)列內(nèi)存中刪除。
# 實(shí)現(xiàn)工作隊(duì)列下的非公平隊(duì)列
消費(fèi)者設(shè)置一次只發(fā)送一條消息,并且在被正確消費(fèi)之前發(fā)繼續(xù)發(fā)送下一條消息。從而使得消費(fèi)快的消費(fèi)者比消費(fèi)慢的消費(fèi)者消費(fèi)更多的消息
//告訴rabbitmq一次只發(fā)送一條消息,并且在前一個(gè)消息未被處理或者消費(fèi)之前,不繼續(xù)發(fā)送下一個(gè)消息
channel.basicQos(1);
- 測(cè)試結(jié)果
image.png
此時(shí)明顯打破了消息的公平分發(fā),消費(fèi)快的消費(fèi)者接收到的消息更多。
如果有兩個(gè)消費(fèi)者,其中一個(gè)ConsumerA設(shè)置了Qos=1,另一個(gè)ConsumerB沒(méi)有設(shè)置。經(jīng)過(guò)我的測(cè)試,ConsumerA會(huì)獲得大量的消息,都積壓在ConsumerA,而ConsumerB獲得消息很少。



