ActiveMQ七之API簡介

個人專題目錄


七、API簡介

1 Producer API簡介

1.1 發(fā)送消息

MessageProducer.

send(Message message);發(fā)送消息到默認目的地,就是創(chuàng)建Producer時指定的目的地。

send(Destination destination, Message message); 發(fā)送消息到指定目的地,Producer不建議綁定目的地。也就是創(chuàng)建Producer的時候,不綁定目的地。session.createProducer(null)。

send(Message message, int deliveryMode, int priority, long timeToLive);發(fā)送消息到默認目的地,且設置相關參數。deliveryMode-持久化方式(DeliveryMode.PERSISTENT| DeliveryMode.NON_PERSISTENT)。priority-優(yōu)先級。timeToLive-消息有效期(單位毫秒)。

send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive); 發(fā)送消息到指定目的地,且設置相關參數。

1.2 消息有效期

消息過期后,默認會將失效消息保存到“死信隊列(ActiveMQ.DLQ)”。

不持久化的消息,在超時后直接丟棄,不會保存到死信隊列中。

死信隊列名稱可配置,死信隊列中的消息不能恢復。

死信隊列是在activemq.xml中配置的。

1.3 消息優(yōu)先級

不需特殊關注。

我們可以在發(fā)送消息時,指定消息的權重,broker可以建議權重較高的消息將會優(yōu)先發(fā)送給Consumer。在某些場景下,我們通常希望權重較高的消息優(yōu)先傳送;不過因為各種原因,priority并不能決定消息傳送的嚴格順序(order)。

JMS標準中約定priority可以為09的整數數值,值越大表示權重越高,默認值為4。不過activeMQ中各個存儲器對priority的支持并非完全一樣。比如JDBC存儲器可以支持09,因為JDBC存儲器可以基于priority對消息進行排序和索引化;但是對于kahadb/levelDB等這種基于日志文件的存儲器而言,priority支持相對較弱,只能識別三種優(yōu)先級(LOW: < 4,NORMAL: =4,HIGH: > 4)。

1.3.1 開啟

在broker端,默認是不存儲priority信息的,我們需要手動開啟,修改activemq.xml配置文件,在broker標簽的子標簽policyEntries中增加下述配置:

<policyEntry queue=">" prioritizedMessages="true"/>  

不過對于“非持久化”類型的消息(如果沒有被swap到臨時文件),它們被保存在內存中,它們不存在從文件Paged in到內存的過程,因為可以保證優(yōu)先級較高的消息,總是在prefetch的時候被優(yōu)先獲取,這也是“非持久化”消息可以擔保消息發(fā)送順序的優(yōu)點。

Broker在收到Producer的消息之后,將會把消息cache到內存,如果消息需要持久化,那么同時也會把消息寫入文件;如果通道中Consumer的消費速度足夠快(即積壓的消息很少,尚未超過內存限制,我們通過上文能夠知道,每個通道都可以有一定的內存用來cache消息),那么消息幾乎不需要從存儲文件中Paged In,直接就能從內存的cache中獲取即可,這種情況下,priority可以擔?!叭猪樞颉?;不過,如果消費者滯后太多,cache已滿,就會觸發(fā)新接收的消息直接保存在磁盤中,那么此時,priority就沒有那么有效了。

在Queue中,prefetch的消息列表默認將會采用“輪詢”的方式(roundRobin,注意并不是roundRobinDispatch)[備注:因為Queue不支持任何DispatchPolicy],依次添加到每個consumer的pending buffer中,比如有m1-m2-m3-m4四條消息,有C1-C2兩個消費者,那么: m1->C1,m2->C2,m3->C1,m4->C2。這種輪序方式,會對基于權重的消息發(fā)送有些額外的影響,假如四條消息的權重都不同,但是(m1,m3)->C1,事實上m2的權重>m3,對于C1而言,它似乎丟失了“順序性”。

1.3.2 強順序
<policyEntry queue=">" strictOrderDispatch="true"/>

strictOrderDispatch“嚴格順序轉發(fā)”,這是區(qū)別于“輪詢”的一種消息轉發(fā)手段;不過不要誤解它為“全局嚴格順序”,它只不過是將prefetch的消息依次填滿每個consumer的pending buffer。比如上述例子中,如果C1-C2兩個消費者的buffer尺寸為3,那么(m1,m2,m3)->C1,(m4)->C2;當C1填充完畢之后,才會填充C2。由此這種策略可以保證buffer中所有的消息都是“權重臨近的”、有序的。(需要注意:strictOrderDispatch并非是解決priority消息順序的問題而生,只是在使用priority時需要關注它)。

1.3.3 嚴格順序
policyEntry queue=">" prioritizedMessages="true" useCache="false" expireMessagesPeriod="0" queuePrefetch="1"/>

useCache=false來關閉內存,強制將所有的消息都立即寫入文件(索引化,但是會降低消息的轉發(fā)效率);queuePrefetch=1來約束每個consumer任何時刻只有一個消息正在處理,那些消息消費之后,將會從文件中重新獲取,這大大增加了消息文件操作的次數,不過每次讀取肯定都是priority最高的消息。

2 Consumer API簡介

2.1 消息的確認

Consumer拉取消息后,如果沒有做確認acknowledge,此消息不會從MQ中刪除。

消息的如果拉去到consumer后,未確認,那么消息被鎖定。如果consumer關閉的時候仍舊沒有確認消息,則釋放消息鎖定信息。消息將發(fā)送給其他的consumer處理。

消息一旦處理,應該必須確認。類似數據庫中的事務管理機制。

2.2 消息的過濾

對消息消費者處理的消息數據進行過濾。這種處理可以明確消費者的角色,細分消費者的功能。

設置過濾:

Session.createConsumer(Destination destination, String messageSelector);

過濾信息為字符串,語法類似SQL92中的where子句條件信息??梢允褂弥T如AND、OR、IN、NOT IN等關鍵字。詳細內容可以查看javax.jms.Message的幫助文檔。

注意:消息的生產者在發(fā)送消息的的時候,必須設置可過濾的屬性信息,所有的屬性信息設置方法格式為:setXxxxProperty(String name, T value)。 其中方法名中的Xxxx是類型,如setObjectProperty/setStringProperty等。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容