版本信息:
JDK:8
SpringBoot 2.1.3.RELEASE
RabbitMQ消費端配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
listener:
simple:
# acknowledge-mode: manual # 手動確定(默認自動確認)
concurrency: 1 # 消費端的監(jiān)聽個數(shù)(即@RabbitListener開啟幾個線程去處理數(shù)據(jù)。)
max-concurrency: 10 # 消費端的監(jiān)聽最大個數(shù)
prefetch: 10
connection-timeout: 15000 # 超時時間
在消費端,配置prefetch和concurrency參數(shù)便可以實現(xiàn)消費端MQ并發(fā)處理消息,那么這兩個參數(shù)到底有什么含義??
1. prefetch
每個customer會在MQ預取一些消息放入內(nèi)存的LinkedBlockingQueue中進行消費,這個值越高,消息傳遞的越快,但非順序處理消息的風險更高。如果ack模式為none,則忽略。如有必要,將增加此值以匹配txSize或messagePerAck。從2.0開始默認為250;設(shè)置為1將還原為以前的行為。
prefetch默認值以前是1,這可能會導致高效使用者的利用率不足。從spring-amqp 2.0版開始,默認的prefetch值是250,這將使消費者在大多數(shù)常見場景中保持忙碌,從而提高吞吐量。
不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內(nèi)存中大量堆積,消耗大量內(nèi)存;以及對于一些嚴格要求順序的消息,prefetch的值應當設(shè)置為1。
對于低容量消息和多個消費者的情況(也包括單listener容器的concurrency配置)希望在多個使用者之間實現(xiàn)更均勻的消息分布,建議在手動ack下并設(shè)置prefetch=1。
模擬:
生產(chǎn)者每次生產(chǎn)10條消息:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirms: true
publisher-returns: true
@RestController
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
//直接向隊列中發(fā)送數(shù)據(jù)
@GetMapping("send")
public String send() {
for (int i = 0; i < 10; i++) {
String content = "Date:" + System.currentTimeMillis();
content = content + ":::" + i;
rabbitTemplate.convertAndSend("kinson", content);
}
return "success";
}
}
2. concurrency
上面配置中,concurrency =1,即每個Listener容器將開啟一個線程去處理消息。
在2.0版本后,可以在注解中配置該參數(shù):
@Component
@Slf4j
public class CustomerRev {
//會覆蓋配置文件中的參數(shù)。
@RabbitListener(queues = {"kinson"},concurrency = "2")
public void receiver(Message msg, Channel channel) throws InterruptedException {
// Thread.sleep(10000);
byte[] messageBytes = msg.getBody();
if (messageBytes != null && messageBytes.length > 0) {
//打印數(shù)據(jù)
String message = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("【消3】:{}", message);
}
}
}
啟動服務:

即該Listener容器產(chǎn)生了兩個線程去消費queue。如果在Listener配置了exclusive參數(shù),即確定此容器中的單個customer是否具有對隊列的獨占訪問權(quán)限。如果為true,則容器的并發(fā)性必須為1。

3. prefetch和concurrency
若一個消費者配置prefetch=10,concurrency=2,即會開啟2個線程去消費消息,每個線程都會抓取10個線程到內(nèi)存中(注意不是兩個線程去共享內(nèi)存中抓取的消息)。
下圖所示,當10個消息進入MQ后,兩個線程都抓取了5個線程放入了自己的LinkedBlockingQueue進行消費。
