【RabbitMQ-2】RabbitMQ的并發(fā)參數(shù)(concurrency和prefetch)

版本信息:
JDK:8
SpringBoot 2.1.3.RELEASE

RabbitMQ消費端配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    listener:
      simple:
#        acknowledge-mode: manual  # 手動確定(默認自動確認)
        concurrency: 1 # 消費端的監(jiān)聽個數(shù)(即@RabbitListener開啟幾個線程去處理數(shù)據(jù)。)
        max-concurrency: 10 # 消費端的監(jiān)聽最大個數(shù)
        prefetch: 10
    connection-timeout: 15000   # 超時時間

在消費端,配置prefetchconcurrency參數(shù)便可以實現(xiàn)消費端MQ并發(fā)處理消息,那么這兩個參數(shù)到底有什么含義??

1. prefetch

每個customer會在MQ預取一些消息放入內(nèi)存的LinkedBlockingQueue中進行消費,這個值越高,消息傳遞的越快,但非順序處理消息的風險更高。如果ack模式為none,則忽略。如有必要,將增加此值以匹配txSize或messagePerAck。從2.0開始默認為250;設(shè)置為1將還原為以前的行為。

prefetch默認值以前是1,這可能會導致高效使用者的利用率不足。從spring-amqp 2.0版開始,默認的prefetch值是250,這將使消費者在大多數(shù)常見場景中保持忙碌,從而提高吞吐量。

不過在有些情況下,尤其是處理速度比較慢的大消息,消息可能在內(nèi)存中大量堆積,消耗大量內(nèi)存;以及對于一些嚴格要求順序的消息,prefetch的值應當設(shè)置為1。

對于低容量消息和多個消費者的情況(也包括單listener容器的concurrency配置)希望在多個使用者之間實現(xiàn)更均勻的消息分布,建議在手動ack下并設(shè)置prefetch=1

模擬:
生產(chǎn)者每次生產(chǎn)10條消息:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirms: true
    publisher-returns: true
@RestController
public class RabbitMQController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //直接向隊列中發(fā)送數(shù)據(jù)
    @GetMapping("send")
    public String send() {
        for (int i = 0; i < 10; i++) {
            String content = "Date:" + System.currentTimeMillis();
            content = content + ":::" + i;
            rabbitTemplate.convertAndSend("kinson", content);
        }
        return "success";
    }
}

2. concurrency

上面配置中,concurrency =1,即每個Listener容器將開啟一個線程去處理消息。

在2.0版本后,可以在注解中配置該參數(shù):

@Component
@Slf4j
public class CustomerRev {
  //會覆蓋配置文件中的參數(shù)。
    @RabbitListener(queues = {"kinson"},concurrency =   "2")
    public void receiver(Message msg, Channel channel) throws InterruptedException {

//        Thread.sleep(10000);
        byte[] messageBytes = msg.getBody();

        if (messageBytes != null && messageBytes.length > 0) {
            //打印數(shù)據(jù)
            String message = new String(msg.getBody(), StandardCharsets.UTF_8);
            log.info("【消3】:{}", message);
        }
    }

}

啟動服務:

可以看到MQ有兩個消費者.png

即該Listener容器產(chǎn)生了兩個線程去消費queue。如果在Listener配置了exclusive參數(shù),即確定此容器中的單個customer是否具有對隊列的獨占訪問權(quán)限。如果為true,則容器的并發(fā)性必須為1。

image.png

3. prefetch和concurrency

若一個消費者配置prefetch=10,concurrency=2,即會開啟2個線程去消費消息,每個線程都會抓取10個線程到內(nèi)存中(注意不是兩個線程去共享內(nèi)存中抓取的消息)。

下圖所示,當10個消息進入MQ后,兩個線程都抓取了5個線程放入了自己的LinkedBlockingQueue進行消費。

image.png

推薦閱讀

RabbitMQ消費者的幾個參數(shù)

Spring-amqp2.1.2中文文檔

SpringBoot2.1.3配置的官方文檔

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容