ActiveMQ
[toc]
簡(jiǎn)書不支持 toc 目錄模式,截圖一張。

什么是ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
主要特點(diǎn):
- 多種語(yǔ)言和協(xié)議編寫客戶端。語(yǔ)言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應(yīng)用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支持JMS1.1和J2EE 1.4規(guī)范 (持久化,XA消息,事務(wù))
- 對(duì)Spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性
- 通過(guò)了常見J2EE服務(wù)器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測(cè)試,其中通過(guò)JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動(dòng)的部署到任何兼容J2EE 1.4 商業(yè)服務(wù)器上
- 支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支持通過(guò)JDBC和journal提供高速的消息持久化
- 從設(shè)計(jì)上保證了高性能的集群,客戶端-服務(wù)器,點(diǎn)對(duì)點(diǎn)
- 支持Ajax
- 支持與Axis的整合
- 可以很容易得調(diào)用內(nèi)嵌JMS provider,進(jìn)行測(cè)試
ActiveMQ的消息形式
- 對(duì)于消息的傳遞有兩種類型:
- 一種是點(diǎn)對(duì)點(diǎn)的,即一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者一一對(duì)應(yīng);
- 另一種是發(fā)布/訂閱模式,即一個(gè)生產(chǎn)者產(chǎn)生消息并進(jìn)行發(fā)送后,可以由多個(gè)消費(fèi)者進(jìn)行接收。
JMS定義了五種不同的消息正文格式,以及調(diào)用的消息類型,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù),提供現(xiàn)有消息格式的一些級(jí)別的兼容性。
- StreamMessage -- Java原始值的數(shù)據(jù)流
- MapMessage--一套名稱-值對(duì)
- TextMessage--一個(gè)字符串對(duì)象
- ObjectMessage--一個(gè)序列化的 Java對(duì)象
- BytesMessage--一個(gè)字節(jié)的數(shù)據(jù)流
ActiveMQ的安裝
進(jìn)入http://activemq.apache.org/ 下載ActiveMQ

最新版本: 5.14.5
安裝環(huán)境:
需要jdk
安裝Linux系統(tǒng)。生產(chǎn)環(huán)境都是Linux系統(tǒng)。
安裝步驟
第一步: 把ActiveMQ 的壓縮包上傳到Linux系統(tǒng)。
第二步:解壓縮。
第三步:?jiǎn)?dòng)。
使用bin目錄下的activemq命令啟動(dòng):
[root@localhost bin]# ./activemq start
關(guān)閉:
[root@localhost bin]# ./activemq stop
查看狀態(tài):
[root@localhost bin]# ./activemq status
注意:如果ActiveMQ整合spring使用,不要使用activemq-all-5.14.5.jar包(spring 可能少方法)。建議使用5.11.2
進(jìn)入管理后臺(tái):
http://192.168.25.168:8161/admin
用戶名:admin
密碼:admin

可能出現(xiàn)的問(wèn)題:
405的問(wèn)題:機(jī)器名和 ip 沒(méi)有對(duì)上,修改 host 文件。
查看 hostname,然后檢查 hosts 文件中是否相同。
/etc/sysconfig/network

vim /etc/h/hosts

ActiveMQ的使用方法

Queue
點(diǎn)對(duì)點(diǎn)(point-to-point,簡(jiǎn)稱PTP)Queue消息傳遞模型
Producer
生產(chǎn)者:生產(chǎn)消息,發(fā)送端。
把jar包添加到工程中。使用5.11.2版本的jar包。

第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Queue對(duì)象。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
第八步:使用Producer對(duì)象發(fā)送消息。
第九步:關(guān)閉資源。
@Test
public void testQueueProducer() throws Exception {
// 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
//brokerURL服務(wù)器的ip及端口號(hào)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
connection.start();
// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
//第一個(gè)參數(shù):是否開啟事務(wù)。true:開啟事務(wù),第二個(gè)參數(shù)忽略。
//第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Queue對(duì)象。
//參數(shù):隊(duì)列的名稱。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
MessageProducer producer = session.createProducer(queue);
// 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
/*TextMessage message = new ActiveMQTextMessage();
message.setText("hello activeMq,this is my first test.");*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
// 第八步:使用Producer對(duì)象發(fā)送消息。
producer.send(textMessage);
// 第九步:關(guān)閉資源。
producer.close();
session.close();
connection.close();
}
Consumer
消費(fèi)者:接收消息。
第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致queue,并且隊(duì)列的名稱一致。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
第七步:接收消息。
第八步:打印消息。
第九步:關(guān)閉資源
@Test
public void testQueueConsumer() throws Exception {
// 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
connection.start();
// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致queue,并且隊(duì)列的名稱一致。
Queue queue = session.createQueue("test-queue");
// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
MessageConsumer consumer = session.createConsumer(queue);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
//取消息的內(nèi)容
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
//等待鍵盤輸入
System.in.read();
// 第九步:關(guān)閉資源
consumer.close();
session.close();
connection.close();
}
Topic
發(fā)布/訂閱(publish/subscribe,簡(jiǎn)稱pub/sub)Topic消息傳遞模型
Producer
使用步驟:
第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Topic對(duì)象。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
第八步:使用Producer對(duì)象發(fā)送消息。
第九步:關(guān)閉資源。
@Test
public void testTopicProducer() throws Exception {
// 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
// brokerURL服務(wù)器的ip及端口號(hào)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
connection.start();
// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
// 第一個(gè)參數(shù):是否開啟事務(wù)。true:開啟事務(wù),第二個(gè)參數(shù)忽略。
// 第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)topic對(duì)象。
// 參數(shù):話題的名稱。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
MessageProducer producer = session.createProducer(topic);
// 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
/*
* TextMessage message = new ActiveMQTextMessage(); message.setText(
* "hello activeMq,this is my first test.");
*/
TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
// 第八步:使用Producer對(duì)象發(fā)送消息。
producer.send(textMessage);
// 第九步:關(guān)閉資源。
producer.close();
session.close();
connection.close();
}
Consumer
消費(fèi)者:接收消息。
第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致topic,并且話題的名稱一致。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
第七步:接收消息。
第八步:打印消息。
第九步:關(guān)閉資源
@Test
public void testTopicConsumer() throws Exception {
// 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
// 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
Connection connection = connectionFactory.createConnection();
// 第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
connection.start();
// 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致topic,并且話題的名稱一致。
Topic topic = session.createTopic("test-topic");
// 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
MessageConsumer consumer = session.createConsumer(topic);
// 第七步:接收消息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
String text = null;
// 取消息的內(nèi)容
text = textMessage.getText();
// 第八步:打印消息。
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.out.println("topic的消費(fèi)端03。。。。。");
// 等待鍵盤輸入
System.in.read();
// 第九步:關(guān)閉資源
consumer.close();
session.close();
connection.close();
}