前言
最近做了一個小功能,是通過一個客戶端消費者監(jiān)聽隊列消息, 代碼如下:
Connection conn = getConnection();
Channel channel = conn.createChannel();
MessageConsumer consumer = ...
channel.basicConsume(realQueue, true, consumer);
通過jvm工具觀察rabbitmq的線程使用情況,發(fā)現(xiàn)生產(chǎn)者每發(fā)一條消息,消費者這邊就會創(chuàng)建一條線程,
言下之意,一個channel當消息來到時就會異步處理這些消息.
定位
通過斷點查找發(fā)現(xiàn)原來是 ConsumerWorkService這個類控制的。
這個類顧名思義,就是消費者工作 ExecutorService, 這里的Service表示的是ExecutorService
這個類構(gòu)造函數(shù)里有一個executor參數(shù),當這個參數(shù)為空時,就會創(chuàng)建一個Executors.newFixedThreadPool,代碼如下:
final public class ConsumerWorkService {
private static final int MAX_RUNNABLE_BLOCK_SIZE = 16;
private static final int DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2;
private final ExecutorService executor;
private final boolean privateExecutor;
private final WorkPool<Channel, Runnable> workPool;
private final int shutdownTimeout;
public ConsumerWorkService(ExecutorService executor, ThreadFactory threadFactory, int queueingTimeout, int shutdownTimeout) {
this.privateExecutor = (executor == null);
this.executor = (executor == null) ? Executors.newFixedThreadPool(DEFAULT_NUM_THREADS, threadFactory)
: executor;
this.workPool = new WorkPool<>(queueingTimeout);
this.shutdownTimeout = shutdownTimeout;
}
...
默認的executor 會使用 CPU核數(shù)的2倍 作為線程池里線程的數(shù)量。
所以到底是要用多個channel,還是單個channel,這個就是其中一個參考依據(jù)。
這個executor又是怎么傳進來的呢?
答案:
ConnectionFactory -> AMQConnection -> ChannelManager -> ConsumerWorkService
ConnectionFactory有一個屬性是 shareExecutorService ,這個屬性表示內(nèi)部使用共享的唯一一個ExecutorService
設(shè)置這個屬性就可以一直傳到ConsumerWorkService中。
除了ConnectionFactory.setShareExecutorService方法以外,
還可以在Connection被創(chuàng)建時,設(shè)置executorService
ConnectionFactory的newConnection方法:
public Connection newConnection(ExecutorService executor) throws IOException, TimeoutException;
總結(jié)
通過設(shè)置shareExecutorService,無論多少個channel,都可以統(tǒng)一控制線程數(shù)量、隊列數(shù)量,
根據(jù)實際情況進行配置。