持久化消息和非持久化消息的發(fā)送策略:消息同步發(fā)送和異步發(fā)送
ActiveMQ支持同步、異步兩種發(fā)送模式將消息發(fā)送到broker上。同步發(fā)送過(guò)程中,發(fā)送者發(fā)送一條消息會(huì)阻塞直到broker反饋一個(gè)確認(rèn)消息,表示消息已經(jīng)被broker處理。這個(gè)機(jī)制提供了消息的安全性保障,但是由于是阻塞的操作,會(huì)影響到客戶端消息發(fā)送的性能。異步發(fā)送的過(guò)程中,發(fā)送者不需要等待broker提供反饋,所以性能相對(duì)較高。但是可能會(huì)出現(xiàn)消息丟失的情況。所以使用異步發(fā)送的前提是在某些情況下允許出現(xiàn)數(shù)據(jù)丟失的情況。
默認(rèn)情況下,非持久化消息是異步發(fā)送的,持久化消息并且是在非事務(wù)模式下是同步發(fā)送的。但是在開啟事務(wù)的情況下,消息都是異步發(fā)送。由于異步發(fā)送的效率會(huì)比同步發(fā)送性能更高。所以在發(fā)送持久化消息的時(shí)候,盡量去開啟事務(wù)會(huì)話。除了持久化消息和非持久化消息的同步和異步特性以外,我們還可以通過(guò)以下幾種方式來(lái)設(shè)置異步發(fā)送:
1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?jms.useAsyncSend=true");
2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
3.((ActiveMQConnection)connection).setUseAsyncSend(true);
消息發(fā)送的源碼:
以producer.send為入口 進(jìn)入的是ActiveMQSession 實(shí)現(xiàn):

我們從上面的代碼可以看到,在執(zhí)行發(fā)送操作之前需要把消息做一個(gè)轉(zhuǎn)化,并且將我們?cè)O(shè)置的一些屬性注入導(dǎo)指定的屬性中,我們先來(lái)看看異步發(fā)送,會(huì)發(fā)現(xiàn)異步發(fā)送的時(shí)候涉及到producerWindowSize的大?。?/p>
ProducerWindowSize的含義
producer每發(fā)送一個(gè)消息,統(tǒng)計(jì)一下發(fā)送的字節(jié)數(shù),當(dāng)字節(jié)數(shù)達(dá)到ProducerWindowSize值時(shí),需要等待broker的確認(rèn),才能繼續(xù)發(fā)送。
主要用來(lái)約束在異步發(fā)送時(shí)producer端允許積壓的(尚未ACK)的消息的大小,且只對(duì)異步發(fā)送有意義。每次發(fā)送消息之后,都將會(huì)導(dǎo)致memoryUsage大小增加(+message.size),當(dāng)broker返回producerAck時(shí),memoryUsage尺寸減少(producerAck.size,此size表示先前發(fā)送消息的大小)。
可以通過(guò)如下2種方式設(shè)置:
- 在brokerUrl中設(shè)置: "tcp://localhost:61616?jms.producerWindowSize=1048576",這種設(shè)置將會(huì)對(duì)所有的producer生效。
- 在destinationUri中設(shè)置: "myQueue?producer.windowSize=1048576",此參數(shù)只會(huì)對(duì)使用此Destination實(shí)例的producer生效,將會(huì)覆蓋brokerUrl中的producerWindowSize值。
注意:此值越大,意味著消耗Client端的內(nèi)存就越大。
接下去我們進(jìn)入異步發(fā)送流程,看看消息是怎么異步發(fā)送的this.connection.asyncSendPacket(msg):
doAsyncSendPacket