【RabbitMQ-9】自定義配置線程池(線程池資源不足-MQ初始化隊列&&MQ動態(tài)擴容影響)

1. 源碼注意點

源碼一:啟動消費者

@Override 
protected void doStart() {
    checkListenerContainerAware();
    super.doStart();
    synchronized(this.consumersMonitor) {
        if (this.consumers != null) {
            throw new IllegalStateException("A stopped container should not have consumers");
        }
        int newConsumers = initializeConsumers();
        if (this.consumers == null) {
            logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)");
            return;
        }
        if (newConsumers <= 0) {
            if (logger.isInfoEnabled()) {
                logger.info("Consumers are already running");
            }
            return;
        }
        Set < AsyncMessageProcessingConsumer > processors = new HashSet < AsyncMessageProcessingConsumer > ();
        //每一個消費者,創(chuàng)建ConcurrentConsumers個線程
        for (BlockingQueueConsumer consumer: this.consumers) {
            AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
            processors.add(processor);
            //使用配置的線程池去開啟線程
            //(便會執(zhí)行run方法,run方法中啟動成功會使得processor的CountDownLatch-1)
            getTaskExecutor().execute(processor);
            if (getApplicationEventPublisher() != null) {
                getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
            }
        }
        //判斷所有的線程是否執(zhí)行run()方法啟動消費者成功?沒有成功的話,阻塞,直至所有消費者成功。
        waitForConsumersToStart(processors);
    }
}

若核心線程數(shù)滿了,但是依舊有消費者等待啟動,那么會在waitForConsumersToStart阻塞。

源碼二:串行阻塞

private void waitForConsumersToStart(Set < AsyncMessageProcessingConsumer > processors) {
    for (AsyncMessageProcessingConsumer processor: processors) {
        FatalListenerStartupException startupException = null;
        try {
            startupException = processor.getStartupException();
        } catch(TimeoutException e) {
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        } catch(InterruptedException e) {
            Thread.currentThread().interrupt();
            throw RabbitExceptionTranslator.convertRabbitAccessException(e);
        }
        if (startupException != null) {
            throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
        }
    }
}

源碼三:使用CountDownLatch阻塞

private FatalListenerStartupException getStartupException() throws TimeoutException,
InterruptedException {
    if (!this.start.await(SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
        logger.error("Consumer failed to start in " + SimpleMessageListenerContainer.this.consumerStartTimeout + " milliseconds; does the task executor have enough threads to support the container " + "concurrency?");
    }
    return this.startupException;
}

啟動的線程是串行的阻塞。

例如:線程池只存在1個線程,但某個隊列消費者需要10個線程。

  1. 創(chuàng)建消費者線程;
  2. 使用配置的線程池啟動消費者;
  3. 發(fā)布創(chuàng)建消費者的消息;
  4. 串行阻塞判斷所有消費者是否創(chuàng)建完畢(默認60s);
  5. 理論是等待9*60s的時間,唯一的消費者才會開始執(zhí)行;

注意點:

  1. 隊列搶占線程池線程順序是按隊列初始化順序決定的,即先初始化的隊列先占用線程池資源。若線程不足,MQ打印Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?信息。

  2. 配置的線程池資源被消費者占用后,是不會被釋放的,while循環(huán)會一直監(jiān)聽MQ消息。

配置MQ的線程池不應該配置阻塞隊列,因為getTaskExecutor().execute(processor);使用線程池啟動線程,若核心線程滿了之后,會使用阻塞隊列。而使用阻塞隊列,會導致消費者不能被啟動。

2. 實現(xiàn)方式

配置線程池模式:

@Slf4j
@Configuration
public class RabbitConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

        @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        /* setConnectionFactory:設置spring-amqp的ConnectionFactory。 */
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(2);
        factory.setPrefetchCount(1);
        factory.setDefaultRequeueRejected(true);
        //使用自定義線程池來啟動消費者。
        factory.setTaskExecutor(taskExecutor());
        factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
        return factory;
    }


    @Bean("correctTaskExecutor")
    @Primary
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();
        // 設置核心線程數(shù)
        executor.setCorePoolSize(100);
        // 設置最大線程數(shù)
        executor.setMaxPoolSize(100);
        // 設置隊列容量
        executor.setQueueCapacity(0);
        // 設置線程活躍時間(秒)
        executor.setKeepAliveSeconds(300);
        // 設置默認線程名稱
        executor.setThreadNamePrefix("thread-xx-");
        // 設置拒絕策略rejection-policy:當pool已經(jīng)達到max size的時候,丟棄
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
        // 等待所有任務結(jié)束后再關(guān)閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }

}

3. MQ自動擴容的影響

上面說到,mq在啟動時創(chuàng)建消費者時由于線程池資源不足,會導致阻塞(影響該queue的消費消息)。

那么若是代碼中配置了factory.setMaxConcurrentConsumers(2);,擴容時發(fā)現(xiàn)線程池資源不足,有什么影響呢?

3.1 源碼分析

  1. 消費者線程循環(huán)的消費消息

源碼位置org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run

@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
    if (!isActive()) {
        return;
    }
    ...
    try {
        initialize();
        //每個消費者線程循環(huán)的去獲取消息
        while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
            mainLoop();
        }
    } ...
}
  1. 循環(huán)體的操作

注意receiveAndExecute()方法的返回值是checkAdjust()方法的請求參數(shù),那么理解MQ動態(tài)擴容,就必須先明白receiveAndExecute()的邏輯以及返回值的含義。

private void mainLoop() throws Exception { // NOSONAR Exception
    try {
        //該方法是獲取消息,并執(zhí)行業(yè)務操作(并發(fā)送ACK或NACK到MQ)。返回值true表示已經(jīng)消費消息;false表示未獲取到消息
        boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
        //判斷是否配置了maxConcurrentConsumers,是否進行動態(tài)擴容
        if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
            checkAdjust(receivedOk);
        }
        ...
    }
}

2.1 receiveAndExecute—業(yè)務邏輯

該方法會執(zhí)行業(yè)務邏輯,并發(fā)送ACK或NACK到MQ中。完成一個消息的消費。
但是即使發(fā)送ACK后,依舊在mainLoop()循環(huán)中,需要完成后續(xù)邏輯才能消費下一個消息。(注:不是向MQ發(fā)送ACK或NACK后立即去消費后續(xù)消息?。。。?/strong>

private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
    PlatformTransactionManager transactionManager = getTransactionManager();
    if (transactionManager != null) {...事務操作,不關(guān)注
    }
    //接受消息并進行處理
    return doReceiveAndExecute(consumer);
}

若是執(zhí)行nextMessage()沒有獲取到消息,那么執(zhí)行break操作,最終會導致上面的receiveAndExecute()方法返回false。而receiveAndExecute()的值可以決定是否動態(tài)擴容

private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
    Channel channel = consumer.getChannel();
    //默認txSize=1
    for (int i = 0; i < this.txSize; i++) {

        //在內(nèi)存隊列中獲取消息
        Message message = consumer.nextMessage(this.receiveTimeout);
        //未獲取到消息,開始下次循環(huán)
        if (message == null) {
            break;
        }
        try {
            //執(zhí)行業(yè)務邏輯  
            executeListener(channel, message);
        } 
       ...catch操作,不關(guān)注
    }
    //沒有獲取到消息,這個方法會返回false
    return consumer.commitIfNecessary(isChannelLocallyTransacted());
}
public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
    //此處直接返回false
    if (this.deliveryTags.isEmpty()) {
        return false;
    }
    ...
    return true;
}

在內(nèi)存中獲取消息

由于配置了setPrefetchCount參數(shù),所以內(nèi)存會去MQ中預取配置的消息數(shù),放到本地的BlockingQueue中。
配置詳見:
【RabbitMQ-2】RabbitMQ的并發(fā)參數(shù)(concurrency和prefetch)

未獲取到消息

public Message nextMessage(long timeout) throws InterruptedException,
ShutdownSignalException {
    if (logger.isTraceEnabled()) {
        logger.trace("Retrieving delivery for " + this);
    }
    checkShutdown();
    if (this.missingQueues.size() > 0) {
        checkMissingQueues();
    }
    //poll的API描述:檢索并刪除此隊列的頭,等待指定的等待時間(如有必要)使元素變?yōu)榭捎谩?    Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
    //cancelled默認false不會執(zhí)行改邏輯
    if (message == null && this.cancelled.get()) {
        throw new ConsumerCancelledException();
    }
    //未獲取到消息返回null。
    return message;
}

2.2 checkAdjust()—動態(tài)擴容業(yè)務

由上面源碼可以若是沒有獲取到消息,receivedOk返回false(注:若是獲取到消息,但是NACK,receivedOk返回值依舊是true)。

如何保證是連續(xù)獲取或者連續(xù)空轉(zhuǎn)的?
答案:因為mainloop()一直循環(huán),每次均在本地queue獲取消息(最長阻塞1s)。若連續(xù)9次均未獲取到消息,第10次獲取到消息,那么會重置consecutiveIdles=0

private void checkAdjust(boolean receivedOk) {
    //成功獲取到消息
    if (receivedOk) {
        if (isActive(this.consumer)) {
            //連續(xù)空轉(zhuǎn)標識設置為0
            this.consecutiveIdles = 0;
            //consecutiveActiveTrigger默認為10
            if (this.consecutiveMessages++>SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
               //開啟一個消費者線程
                considerAddingAConsumer();
                //練習消費的標識設置為0
                this.consecutiveMessages = 0;
            }
        }
    } else {
        this.consecutiveMessages = 0;
        if (this.consecutiveIdles++>SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
            considerStoppingAConsumer(this.consumer);
            this.consecutiveIdles = 0;
        }
    }
}

開啟一個消費者線程:

private void considerAddingAConsumer() {
    //加鎖
    synchronized(this.consumersMonitor) {
      //若是當前consumers數(shù)量小于配置maxConcurrentConsumers
        if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
            long now = System.currentTimeMillis();
            //開啟消費者有間隔時間
            if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
                //增加消費者。
                this.addAndStartConsumers(1);
                this.lastConsumerStarted = now;
            }
        }
    }
}

開啟消費者的操作

protected void addAndStartConsumers(int delta) {
    synchronized(this.consumersMonitor) {
        if (this.consumers != null) {
            //每一次循環(huán)均是創(chuàng)建一個消費者
            for (int i = 0; i < delta; i++) {
                 //判斷是否創(chuàng)建消費者
                if (this.maxConcurrentConsumers != null && this.consumers.size() >= this.maxConcurrentConsumers) {
                    break;
                }
                //創(chuàng)建消費者
                BlockingQueueConsumer consumer = createBlockingQueueConsumer();
                //(核心)屬性的consumers+1
                this.consumers.add(consumer);
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Starting a new consumer: " + consumer);
                }  
                //使用內(nèi)部的線程池執(zhí)行
                getTaskExecutor().execute(processor);
                //發(fā)布創(chuàng)建消費者事件
                if (this.getApplicationEventPublisher() != null) {
                    this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
                try {
                    //線程執(zhí)行完run方法后,線程中的CountDownLatch-1。
                    //若線程池沒有資源,那么會在此處阻塞(默認60s)
                    //阻塞完畢,startupException返回null。即成功創(chuàng)建
                    FatalListenerStartupException startupException = processor.getStartupException();
                    //若是線程池資源不足,只是返回null,不會執(zhí)行下面分支。
                    if (startupException != null) {
                        this.consumers.remove(consumer);
                        throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                    }
                } catch(InterruptedException ie) {
                    Thread.currentThread().interrupt();
                } catch(Exception e) {
                    consumer.stop();
                    logger.error("Error starting new consumer", e);
                    this.cancellationLock.release(consumer);
                    this.consumers.remove(consumer);
                }
            }
        }
    }
}
  1. 上述代碼中,創(chuàng)建消費者線程是同步的流程,即某個消費者線程加鎖去創(chuàng)建。某創(chuàng)建時線程池沒有資源,會阻塞消費者線程。
  2. 若線程池沒有資源,阻塞完畢后,只是打印異常日志,并拋出異常,此時內(nèi)存中消費者個數(shù)為n+1個,但是只有n個線程可以消費消息
  3. 當連續(xù)10次空轉(zhuǎn)時consecutiveIdles =10,且消費者線程n+1,會回收臨時擴展的消費者線程。
private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
    synchronized(this.consumersMonitor) {
        if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
            long now = System.currentTimeMillis();
            if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
                //回收消費者的核心方式
                consumer.basicCancel(true);
                //本地消費者集合移除消費者
                this.consumers.remove(consumer);
                if (logger.isDebugEnabled()) {
                    logger.debug("Idle consumer terminating: " + consumer);
                }
                this.lastConsumerStopped = now;
            }
        }
    }
}

上面說到,內(nèi)存中的消費者數(shù)量n+1,但是有效的消費者n個。當回收消費者時會回收有效的消費者使得內(nèi)存消費者數(shù)量n個,有效消費者數(shù)量n-1個。

若是線程池資源不足,且配置了消費者動態(tài)擴展參數(shù)后,最終會導致有效的消費者數(shù)量為0,導致消息的大量積壓!??!

注:RabbitMQ使用默認的new SimpleAsyncTaskExecutor()開啟消費者線程,即每當使用線程是,均是new出來的。

總結(jié):不推薦使用自定義配置的線程池,若使用,每次增加隊列時均需要注意配置好線程數(shù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容