我們知道activemq的使用方式非常簡單有如下幾個步驟:
1.創(chuàng)建連接工廠
2.創(chuàng)建連接
3.創(chuàng)建會話
4.創(chuàng)建目的地
5.創(chuàng)建生產(chǎn)者或消費者
6.生產(chǎn)或消費消息
7.關(guān)閉生產(chǎn)或消費者、關(guān)閉會話、關(guān)閉連接
這一節(jié)我們針對他的消息傳播機制和持久化方式做一個簡單的學習。在會用的同時我們也需要理解一些基本的概念,這樣才不至于在出錯后無從下手。
1.activemq服務器工作模型
我們先看一下消息發(fā)送的時序圖:

ConnectionFactory 對象創(chuàng)建一個連接工廠,消息的發(fā)送和接受服務均由此進行;
ConnectionFactory 創(chuàng)建一個活動Connection作為當前使用的連接;
Session 是一個用于生成和使用消息的單線程上下文,它用于創(chuàng)建發(fā)送的生產(chǎn)者和接收消息的消費者,并為所發(fā)送的消息定義發(fā)送順序。會話通過大量確認選項或通過事務來支持可靠傳送。
戶端使用 MessageProducer 向指定的物理目標發(fā)送消息,生產(chǎn)者可指定一個默認傳送模式(持久性消息與非持久性消息)、優(yōu)先級和有效期值,以控制生產(chǎn)者向物理目標發(fā)送的所有消息;
消費者可以支持同步或異步消息接收。異步使用可通過向消費者注冊 MessageListener 來實現(xiàn)。當會話線程調(diào)用 MessageListener 對象的 onMessage 方法時,客戶端將使用消息。
2.ActiveMQ消息傳送模型
ActiveMQ 支持兩種消息傳送模型:PTP(即點對點模型)和Pub/Sub(即發(fā)布 /訂閱模型),前面我們已經(jīng)講過,在此就不贅述。
3.消息選擇器
ActiveMQ提供了一種機制,使用它,消息服務可根據(jù)消息選擇器中的標準來執(zhí)行消息過濾。生產(chǎn)者可在消息中放入應用程序特有的屬性,而消費者可使用基于這些屬性的選擇標準來表明對消息是否感興趣。
消息選擇器是根據(jù) header 和 properties 允許客戶端選擇性的制定需要接收的消息,消息選擇器是無法利用 消息主體(Body)進行過濾的。無論你的消息主題是什么類型, 文本、或者對象、或者鍵值對。下面我們講一下消息選擇器的語法以及使用規(guī)范:
可接收的類型包括:byte,int,double,boolean,String;
屬性標識符定義:
1)變量名與java定義一樣;
2)要么在heads中定義 要么在 properties中定義,如果在sender中是在heads中定義而receiver中卻從properties中尋找的話,找不到的情況下他是不會自動去heads中尋找的,而是會返回null;
3)根據(jù)不同類型的變量選擇不同的方法:
message.setIntProperty("test",14);
4)那么在接收端可以對該變量進行攔截:
session.createConsumer(destination,"test > 14");
屬性標志符是區(qū)分大小寫的;
攔截器中的部分表示方式:
1)可以是條件表達式
2)可以是算術(shù)表達式
3)可以是比較運算和邏輯運算組成的表達式
支持 () 左右括號;
支持邏輯運算的優(yōu)先順序表達式 例如: NOT , AND , OR;
比較運算符有: = , > , >= , < , <= , <> (not equal);
eg:
標識符是null
"prop_name IS NULL"
標識符非空 not null
"prop_name IS NOT NULL"
"age BETWEEN 15 AND 19" is equivalent to "age >= 15 AND age <= 19"
"Country NOT IN (' UK', 'US', 'France') "
代碼很簡單,只需要在Sender端做如下改寫:
TextMessage message = session.createTextMessage();
message.setIntProperty("test",14);
message.setText("test");
Receiver端:
consumer = session.createConsumer(destination,"test > 14");
對發(fā)送端的特定字符做一個判斷符合條件即被攔截
4.消息確認機制
jms消息只有在被確認之后才認為成功消費了這條消息。消息的成功消費通常包括三個步驟:
(1)client接收消息
(2)client處理消息
(3)消息被確認(也就是client給一個確認消息)
在事務性會話中當一個事務被提交的時候,確認自動發(fā)生,和應答模式?jīng)]關(guān)系,這個值可以隨便寫。(這里多提一句異步消息接收中不能使用事務性會話)。
在非事務性會話中消息何時被確認取決于創(chuàng)建的session中設置的消息應答模式(acknowledge model)該參數(shù)有三個值:
1)Session.AUTO_ACKNOWLEDGE:當client端成功的從receive方法或從onMessage(Message message) 方法返回的時候,會話自動確認client收到消息。
2)Session.CLIENT_ACKNOWLEDGE: 客戶單通過調(diào)用acknowledge方法來確認客戶端收到消息。但需要注意在這種應答模式下,確認是在會話層上進行的,確認一個被消費的消息將自動確認所有已消費的其他消息。比如一個消費者已經(jīng)消費了10條消息,然后確認了第5條消息被消費,則這10條都被確認消費了。、
acknowledge()通知方法是在Message對象上,同步接收,調(diào)用acknowledge()方法進行確認如下所示:
consumer = session.createConsumer(queue);
Message message = consumer.receive();
message.acknowledge();
異步接受,調(diào)用acknowledge()方法進行確認:
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
String value = textMessage.getText();
System.out.println("value: " + value);
message.acknowledge(); //消息消費確認通知
} catch (JMSException e) {
e.printStackTrace();
}
}
});
3)Session.DUPS_ACKNOWLEDGE:不是必須簽收,消息可能會重復發(fā)送。在第二次重新傳送消息的時候,消息頭的JmsDelivered會被置為true標示當前消息已經(jīng)傳送過一次,客戶端需要進行消息的重復處理控制。
持久化消息
JMS 支持以下兩種消息提交模式:
5.1 ERSISTENT 持久消息
是activemq默認的傳送方式,此方式下的消息在配合activemq.xml中配置的消息存儲方式,會被存儲在特定的地方,直到有消費者將消息消費或者消息過期進入DLQ隊列,消息生命周期才會結(jié)束。此模式下可以保證消息只會被成功傳送一次和成功使用一次,消息具有可靠性。在消息傳遞到目標消費者,在消費者沒有成功應答前,消息不會丟失。所以很自然的,需要一個地方來持久性存儲。如果消息消費者在進行消費過程發(fā)生失敗,則消息會被再次投遞。
DeliveryMode.PERSISTENT 指示JMS provider持久保存消息,以保證消息不會因為JMS provider的失敗而丟失。 消息持久化在硬盤中,ActiveMQ持久化有三種方式:AMQ、KahaDB、JDBC。
AMQ
AMQ是一種文件存儲形式,它具有寫入速度快和容易恢復的特點。消息存儲在一個個文件中,文件的默認大小為32M,如果一條消息的大小超過了32M,那么這個值必須設置大一點。當一個存儲文件中的消息已經(jīng)全部被消費,那么這個文件將被標識為可刪除,在下一個清除階段,這個文件被刪除。AMQ適用于ActiveMQ5.3之前的版本。
KahaDB
KahaDB是基于文件的本地數(shù)據(jù)庫儲存形式,雖然沒有AMQ的速度快,但是它具有強擴展性,恢復的時間比AMQ短,從5.4版本之后KahaDB做為默認的持久化方式。
JDBC
可以將消息存儲到數(shù)據(jù)庫中,例如:Mysql、SQL Server、Oracle、DB2。
具體使用方式大家下去查一下,限于篇幅在此就不做太詳細的介紹。
5.2 NON_PERSISTENT 非持久消息
非持久的消息適用于不重要的,可以接受消息丟失的哪一類消息,這種消息只會被投遞一次,消息不會在持久性存儲中存儲,也不會保證消息丟失后的重新投遞。
DeliveryMode.NON_PERSISTENT 不要求JMS provider持久保存消息,消息存放在內(nèi)存中,讀寫速度快,在JMS服務停止后消息會消失,沒有持久化到硬盤。ActiveMQ消息過期設置
允許消息過期 。默認情況下,消息永不會過期。如果消息在特定周期內(nèi)失去意義,那么可以設置過期時間。
有兩種方法設置消息的過期時間,時間單位為毫秒:
1)使用 setTimeToLive 方法為所有的消息設置過期時間;
2)使用 send 方法為每一條消息設置過期時間。
消息過期時間,send 方法中的 timeToLive 值加上發(fā)送時刻的 GMT 時間值。如果 timeToLive 值等于零,則 JMSExpiration 被設為零,表示該消息永不過期。如果發(fā)送后,在消息過期時間之后消息還沒有被發(fā)送到目的地,則該消息被清除。
文章來源于網(wǎng)絡。
感謝大家閱讀,歡迎大家私信討論。給大家推薦一個Java技術(shù)交流群:473984645里面會分享一些資深架構(gòu)師錄制的視頻資料:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。還能領(lǐng)取免費的學習資源,目前受益良多!
推薦大家閱讀:
Java高級架構(gòu)學習資料分享+架構(gòu)師成長之路?
個人整理了更多資料以PDF文件的形式分享給大家,需要查閱的程序員朋友可以來免費領(lǐng)取。還有我的學習筆記PDF文件也免費分享給有需要朋友!