關(guān)于RabbitMQ的Channel默認線程(一個Connection控制線程數(shù)量)

前言

最近做了一個小功能,是通過一個客戶端消費者監(jiān)聽隊列消息, 代碼如下:

Connection conn = getConnection();
Channel channel = conn.createChannel();
MessageConsumer consumer = ...
channel.basicConsume(realQueue, true, consumer);

通過jvm工具觀察rabbitmq的線程使用情況,發(fā)現(xiàn)生產(chǎn)者每發(fā)一條消息,消費者這邊就會創(chuàng)建一條線程,
言下之意,一個channel當消息來到時就會異步處理這些消息.

定位

通過斷點查找發(fā)現(xiàn)原來是 ConsumerWorkService這個類控制的。
這個類顧名思義,就是消費者工作 ExecutorService, 這里的Service表示的是ExecutorService

這個類構(gòu)造函數(shù)里有一個executor參數(shù),當這個參數(shù)為空時,就會創(chuàng)建一個Executors.newFixedThreadPool,代碼如下:

final public class ConsumerWorkService {
    private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
    private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    private final ExecutorService executor;
    private final boolean privateExecutor;
    private final WorkPool<Channel, Runnable> workPool;
    private final int shutdownTimeout;

    public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
        this.privateExecutor = (executor == null);
        this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
                                           : executor;
        this.workPool = new WorkPool<>(queueingTimeout);
        this.shutdownTimeout = shutdownTimeout;
    }
  ...

默認的executor 會使用 CPU核數(shù)的2倍 作為線程池里線程的數(shù)量。

所以到底是要用多個channel,還是單個channel,這個就是其中一個參考依據(jù)。

這個executor又是怎么傳進來的呢?

答案:

ConnectionFactory -> AMQConnection -> ChannelManager -> ConsumerWorkService

ConnectionFactory有一個屬性是 shareExecutorService ,這個屬性表示內(nèi)部使用共享的唯一一個ExecutorService
設(shè)置這個屬性就可以一直傳到ConsumerWorkService中。

除了ConnectionFactory.setShareExecutorService方法以外,
還可以在Connection被創(chuàng)建時,設(shè)置executorService
ConnectionFactory的newConnection方法:

public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException;

總結(jié)

通過設(shè)置shareExecutorService,無論多少個channel,都可以統(tǒng)一控制線程數(shù)量、隊列數(shù)量,
根據(jù)實際情況進行配置。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容