分布式消息中間件
RabbitMQ是用Erlang語言編寫的分布式消息中間件,常常用在大型網(wǎng)站中作為消息隊(duì)列來使用,主要目的是各個(gè)子系統(tǒng)之間的解耦和異步處理。消息中間件的基本模型是典型的生產(chǎn)者-消費(fèi)者模型,生產(chǎn)者發(fā)送消息到消息隊(duì)列,消費(fèi)者監(jiān)聽消息隊(duì)列,收到消息后消費(fèi)處理。
在使用RabbitMQ做消息分發(fā)時(shí),主要有三個(gè)概念要注意:Exchange,RoutingKey,Queue。
Exchange可以理解為交換器,RoutingKey可以理解為路由,Queue作為真實(shí)存儲消息的隊(duì)列和某個(gè)Exchange綁定,具體如何路由到感興趣的Queue則由Exchange的三種模式?jīng)Q定:
- fanout
- topic
- direct
Exchange為fanout時(shí),生產(chǎn)者往此Exchange發(fā)送的消息會(huì)發(fā)給每個(gè)和其綁定的Queue,此時(shí)RoutingKey并不起作用;Exchange為topic時(shí),生產(chǎn)者可以指定一個(gè)支持通配符的RoutingKey(如demo.*)發(fā)向此Exchange,凡是Exchange上RoutingKey滿足此通配符的Queue就會(huì)收到消息;direct類型的Exchange是最直接最簡單的,生產(chǎn)者指定Exchange和RoutingKey,然后往其發(fā)送消息,消息只能被綁定的滿足RoutingKey的Queue接受消息。(通常如果不指定RoutingKey的具體名字,那么默認(rèn)的名字其實(shí)是Queue的名字)
Concurrency與Prefetch
在通常的使用中(Java項(xiàng)目),我們一般會(huì)結(jié)合spring-amqp框架來使用RabbitMQ,spring-amqp底層調(diào)用RabbitMQ的java client來和Broker交互,比如我們會(huì)用如下配置來建立RabbitMQ的連接池、聲明Queue以及指明監(jiān)聽者的監(jiān)聽行為:
<rabbit:connection-factory id="connectionFactory" />
<!-- template非必須,主要用于生產(chǎn)者發(fā)送消息-->
<rabbit:template id="template" connection-factory="connectionFactory" />
<rabbit:queue name="remoting.queue" />
<rabbit:listener-container connection-factory="connectionFactory" concurrency="2" prefetch="3">
<rabbit:listener ref="listener" queue-names="remoting.queue" />
</rabbit:listener-container>
listener-container可以設(shè)置消費(fèi)者在監(jiān)聽Queue的時(shí)候的各種參數(shù),其中concurrency和prefetch是本篇文章比較關(guān)心的兩個(gè)參數(shù),以下是spring-amqp文檔的解釋:
prefetchCount(prefetch)
The number of messages to accept from the broker in one socket frame. The higher this is the faster the messages can be delivered, but the higher the risk of non-sequential processing. Ignored if the acknowledgeMode
is NONE. This will be increased, if necessary, to match the txSize
concurrentConsumers(concurrency)
The number of concurrent consumers to initially start for each listener.
簡單解釋下就是concurrency設(shè)置的是對每個(gè)listener在初始化的時(shí)候設(shè)置的并發(fā)消費(fèi)者的個(gè)數(shù),prefetch是每次從一次性從broker里面取的待消費(fèi)的消息的個(gè)數(shù),上面的配置在監(jiān)控后臺看到的效果如下:

圖中可以看出有兩個(gè)消費(fèi)者同時(shí)監(jiān)聽Queue,但是注意這里的消息只有被一個(gè)消費(fèi)者消費(fèi)掉就會(huì)自動(dòng)ack,另外一個(gè)消費(fèi)者就不會(huì)再獲取到此消息,Prefetch Count為配置設(shè)置的值3,意味著每個(gè)消費(fèi)者每次會(huì)預(yù)取3個(gè)消息準(zhǔn)備消費(fèi)。每個(gè)消費(fèi)者對應(yīng)的listener有個(gè)Exclusive參數(shù),默認(rèn)為false, 如果設(shè)置為true,concurrency就必須設(shè)置為1,即只能單個(gè)消費(fèi)者消費(fèi)隊(duì)列里的消息,適用于必須嚴(yán)格執(zhí)行消息隊(duì)列的消費(fèi)順序(先進(jìn)先出)。
源碼剖析
這里concurrency的實(shí)現(xiàn)方式不看源碼也能猜到,肯定是用多線程的方式來實(shí)現(xiàn)的,此時(shí)同一進(jìn)程下打開的本地端口都是56278.下面看看listener-contaner對應(yīng)的org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer的源碼:
protected int initializeConsumers() {
int count = 0;
synchronized (this.consumersMonitor) {
if (this.consumers == null) {
this.cancellationLock.reset();
this.consumers = new HashMap<BlockingQueueConsumer, Boolean>(this.concurrentConsumers);
for (int i = 0; i < this.concurrentConsumers; i++) {
BlockingQueueConsumer consumer = createBlockingQueueConsumer();
this.consumers.put(consumer, true);
count++;
}
}
}
return count;
}
container啟動(dòng)的時(shí)候會(huì)根據(jù)設(shè)置的concurrency的值(同時(shí)不超過最大值)創(chuàng)建n個(gè)BlockingQueueConsumer。
protected void doStart() throws Exception {
//some code
synchronized (this.consumersMonitor) {
int newConsumers = initializeConsumers();
//some code
Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
for (BlockingQueueConsumer consumer : this.consumers.keySet()) {
AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
processors.add(processor);
this.taskExecutor.execute(processor);
}
//some code
}
}
在doStart()方法中調(diào)用initializeConsumers來初始化所有的消費(fèi)者,AsyncMessageProcessingConsumer作為真實(shí)的處理器包裝了BlockingQueueConsumer,而AsyncMessageProcessingConsumer其實(shí)實(shí)現(xiàn)了Runnable接口,由this.taskExecutor.execute(processor)來啟動(dòng)消費(fèi)者線程。
private final class AsyncMessageProcessingConsumer implements Runnable {
private final BlockingQueueConsumer consumer;
private final CountDownLatch start;
private volatile FatalListenerStartupException startupException;
private AsyncMessageProcessingConsumer(BlockingQueueConsumer consumer) {
this.consumer = consumer;
this.start = new CountDownLatch(1);
}
//some code
@Override
public void run() {
//some code
}
}
那么prefetch的值意味著什么呢?其實(shí)從名字上大致能看出,BlockingQueueConsumer內(nèi)部應(yīng)該維護(hù)了一個(gè)阻塞隊(duì)列BlockingQueue,prefetch應(yīng)該是這個(gè)阻塞隊(duì)列的長度,看下BlockingQueueConsumer內(nèi)部有個(gè)queue,這個(gè)queue不是對應(yīng)RabbitMQ的隊(duì)列,而是Consumer自己維護(hù)的內(nèi)存級別的隊(duì)列,用來暫時(shí)存儲從RabbitMQ中取出來的消息:
private final BlockingQueue<Delivery> queue;
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
Map<String, Object> consumerArgs, boolean exclusive, String... queues) {
//some code
this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
}
BlockingQueueConsumer的構(gòu)造函數(shù)清楚說明了每個(gè)消費(fèi)者內(nèi)部的隊(duì)列大小就是prefetch的大小。
業(yè)務(wù)問題
前面說過,設(shè)置并發(fā)的時(shí)候,要考慮具體的業(yè)務(wù)場景,對那種對消息的順序有苛刻要求的場景不適合并發(fā)消費(fèi),而對于其他場景,比如用戶注冊后給用戶發(fā)個(gè)提示短信,是不太在意哪個(gè)消息先被消費(fèi),哪個(gè)消息后被消費(fèi),因?yàn)槊總€(gè)消息是相對獨(dú)立的,后注冊的用戶先收到短信也并沒有太大影響。
設(shè)置并發(fā)消費(fèi)除了能提高消費(fèi)的速度,還有另外一個(gè)好處:當(dāng)某個(gè)消費(fèi)者長期阻塞,此時(shí)在當(dāng)前消費(fèi)者內(nèi)部的BlockingQueue的消息也會(huì)被一直阻塞,但是新來的消息仍然可以投遞給其他消費(fèi)者消費(fèi),這種情況頂多會(huì)導(dǎo)致prefetch個(gè)數(shù)目的消息消費(fèi)有問題,而不至于單消費(fèi)者情況下整個(gè)RabbitMQ的隊(duì)列會(huì)因?yàn)橐粋€(gè)消息有問題而全部堵死。所有在合適的業(yè)務(wù)場景下,需要合理設(shè)置concurrency和prefetch值。