1 ActiveMQ簡(jiǎn)介
1.1 ActiveMQ是什么
ActiveMQ是一個(gè)消息隊(duì)列應(yīng)用服務(wù)器。支持JMS****規(guī)范。
1.1.1 JMS概述
全稱:Java Message Service ,即為Java消息服務(wù),是一套java消息服務(wù)的API標(biāo)準(zhǔn)。(標(biāo)準(zhǔn)即接口)
實(shí)現(xiàn)了JMS標(biāo)準(zhǔn)的系統(tǒng),稱之為JMS Provider。
1.1.2 消息隊(duì)列
1.1.2.1 概念
消息隊(duì)列是在消息的傳輸過程中保存消息的容器,提供一種不同進(jìn)程或者同一進(jìn)程不同線程直接通訊的方式。
[圖片上傳失敗...(image-9b371a-1564373794322)]
Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到 Broker;
Broker:消息處理中心。負(fù)責(zé)消息存儲(chǔ)、確認(rèn)、重試等,一般其中會(huì)包含多個(gè) queue;
Consumer:消息消費(fèi)者,負(fù)責(zé)從 Broker 中獲取消息,并進(jìn)行相應(yīng)處理;
1.1.2.2 常見消息隊(duì)列應(yīng)用
(1)、ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。
(2)、RabbitMQ
RabbitMQ是一個(gè)在AMQP基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開源協(xié)議。開發(fā)語言為Erlang。
(3)、RocketMQ
由阿里巴巴定義開發(fā)的一套消息隊(duì)列應(yīng)用服務(wù)。
1.2 ActiveMQ能做什么
(1)實(shí)現(xiàn)兩個(gè)不同應(yīng)用(程序)之間的消息通訊。
(2)實(shí)現(xiàn)同一個(gè)應(yīng)用,不同模塊之間的消息通訊。
1.3 ActiveMQ下載
ActiveMQ官網(wǎng)地址: http://activemq.apache.org
ActiveMQ下載地址:http://activemq.apache.org/download-archives.html
--可供下載的歷史版本
--說明:
ActiveMQ 5.10.x以上版本必須使用JDK1.8才能正常使用。
ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。
|
[圖片上傳失敗...(image-8a6a26-1564373794300)]
|
--根據(jù)操作系統(tǒng),選擇下載版本。(本教程下載Linux版本)
|
[圖片上傳失敗...(image-2cabe4-1564373794300)]
|
1.4 ActiveMQ主要特點(diǎn)
(1)支持多語言、多協(xié)議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應(yīng)用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
(2)對(duì)Spring的支持,ActiveMQ可以很容易整合到Spring的系統(tǒng)里面去。
(3)支持高可用、高性能的集群模式。
2 入門示例
2.1 需求
使用ActiveMQ實(shí)現(xiàn)消息隊(duì)列模型。
2.2 配置步驟說明
(1)搭建ActiveMQ消息服務(wù)器。
(2)創(chuàng)建一個(gè)java項(xiàng)目。
(3)創(chuàng)建消息生產(chǎn)者,發(fā)送消息。
(4)創(chuàng)建消息消費(fèi)者,接收消息。
2.3 第一部分:搭建ActiveMQ消息服務(wù)器
2.3.1 第一步:下載、上傳至Linux
--說明:確保已經(jīng)安裝了jdk
|
[圖片上傳失敗...(image-c63245-1564373794300)]
|
2.3.2 第二步:安裝到/usr/local/activemq目錄
(1)解壓到/usr/local目錄下
|
[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local
|
(2)修改名稱為activemq
|
[root@node07192 ~]# cd /usr/local/
[root@node07192 local]# mv apache-activemq-5.9.0/ activemq
|
2.3.3 第三步:?jiǎn)?dòng)ActiveMQ服務(wù)器
--說明:ActiveMQ是免安裝軟件,解壓即可啟動(dòng)服務(wù)。
|
[root@node07192 local]# cd activemq/bin
[root@node07192 bin]# ./activemq start
|
--查看ActiveMQ啟動(dòng)狀態(tài)
|
[root@node07192 bin]# ./activemq status
[圖片上傳失敗...(image-f26a6a-1564373794300)]
|
2.3.4 第四步:瀏覽器訪問ActiveMQ管理界面
2.3.4.1 Step1:查看ActiveMQ管理界面的服務(wù)端口。在/conf/jetty.xml中
--訪問管理控制臺(tái)的服務(wù)端口,默認(rèn)為:8161
|
[root@node07192 bin]# cd ../conf
[root@node07192 conf]# vim jetty.xml
[圖片上傳失敗...(image-9e080e-1564373794300)]
|
2.3.4.2 Step2:查看ActiveMQ用戶、密碼。在/conf/users.properties中:
--默認(rèn)的用戶名、密碼均為amdin
|
[root@node07192 conf]# vim users.properties
[圖片上傳失敗...(image-633981-1564373794300)]
|
2.3.4.3 Step3:訪問ActiveMQ管理控制臺(tái)。地址:http://ip:8161/
--注意:防火墻是沒有配置該服務(wù)的端口的。
因此,要訪問該服務(wù),必須在防火墻中配置。
(1****)修改防火墻,開放8161****端口
|
[root@node07192 conf]# vim /etc/sysconfig/iptables
[圖片上傳失敗...(image-8f9245-1564373794300)]
|
(2****)重啟防火墻
|
[root@node07192 conf]# service iptables restart
|
(3)登錄管理控制臺(tái)
--登陸,用戶名、密碼均為admin
|
[圖片上傳失敗...(image-9c205-1564373794300)]
|
--控制臺(tái)主界面
|
[圖片上傳失敗...(image-48d268-1564373794300)]
|
--搭建ActiveMQ服務(wù)器成功!!!
2.4 第二部分:創(chuàng)建java項(xiàng)目,導(dǎo)入jar包
--導(dǎo)包說明:
ActiveMQ的解壓包中,提供了運(yùn)行ActiveMQ的所有jar。
|
[圖片上傳失敗...(image-a464ce-1564373794300)]
|
--創(chuàng)建項(xiàng)目
|
[圖片上傳失敗...(image-f2670f-1564373794300)]
|
2.5 第三部分:創(chuàng)建消息生成者,發(fā)送消息
--說明:ActiveMQ是實(shí)現(xiàn)了JMS規(guī)范的。在實(shí)現(xiàn)消息服務(wù)的時(shí)候,必須基于API接口規(guī)范。
2.5.1 JMS常用的API說明
下述API都是接口類型,定義在javax.jms包中,是JMS標(biāo)準(zhǔn)接口定義。ActiveMQ完全實(shí)現(xiàn)這一套api標(biāo)準(zhǔn)。
2.5.1.1 ConnectionFactory
鏈接工廠, 用于創(chuàng)建鏈接的工廠類型。
2.5.1.2 Connection
鏈接,用于建立訪問ActiveMQ連接的類型, 由鏈接工廠創(chuàng)建。
2.5.1.3 Session
會(huì)話, 一次持久有效、有狀態(tài)的訪問,由鏈接創(chuàng)建。
2.5.1.4 Destination & Queue & Topic
目的地, 即本次訪問ActiveMQ消息隊(duì)列的地址,由Session會(huì)話創(chuàng)建。
(1)interface Queue extends Destination
(2)Queue:隊(duì)列模型,只有一個(gè)消費(fèi)者。消息一旦被消費(fèi),默認(rèn)刪除。
(3)Topic:主題訂閱中的消息,會(huì)發(fā)送給所有的消費(fèi)者同時(shí)處理。
2.5.1.5 Message
消息,在消息傳遞過程中數(shù)據(jù)載體對(duì)象,是所有消息【文本消息TextMessage,對(duì)象消息ObjectMessage等】具體類型的頂級(jí)接口,可以通過會(huì)話創(chuàng)建或通過會(huì)話從ActiveMQ服務(wù)中獲取。
2.5.1.6 MessageProducer
消息生成者, 在一次有效會(huì)話中, 用于發(fā)送消息給ActiveMQ服務(wù)的工具,由Session會(huì)話創(chuàng)建。
2.5.1.7 MessageCustomer
消息消費(fèi)者【消息訂閱者,消息處理者】, 在一次有效會(huì)話中, 用于ActiveMQ服務(wù)中獲取消息的工具,由Session會(huì)話創(chuàng)建。
我們定義的消息生產(chǎn)者和消費(fèi)者,都是基于上面API實(shí)現(xiàn)的。
2.5.2 第一步:創(chuàng)建MyProducer類,定義sendMessage方法
|
package cn.gzsxt.mq.producer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MyProducer {
// 定義鏈接工廠
ConnectionFactory connectionFactory = null;
// 定義鏈接
Connection connection = null;
// 定義會(huì)話
Session session = null;
// 定義目的地
Destination destination = null;
// 定義消息生成者
MessageProducer producer = null;
// 定義消息
Message message = null;
public void sendToMQ(){
try{
/*
創(chuàng)建鏈接工廠
ActiveMQConnectionFactory - 由ActiveMQ實(shí)現(xiàn)的ConnectionFactory接口實(shí)現(xiàn)類.
構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
userName - 訪問ActiveMQ服務(wù)的用戶名, 用戶名可以通過<u>jetty</u>-realm.properties配置文件配置.
password - 訪問ActiveMQ服務(wù)的密碼, 密碼可以通過<u>jetty</u>-realm.properties配置文件配置.
brokerURL - 訪問ActiveMQ服務(wù)的路徑地址. 路徑結(jié)構(gòu)為 - 協(xié)議名://主機(jī)地址:端口號(hào)
此鏈接基于TCP/IP協(xié)議.
*/
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創(chuàng)建鏈接對(duì)象
connection = connectionFactory.createConnection();
// 啟動(dòng)鏈接
connection.start();
/*
創(chuàng)建會(huì)話對(duì)象
方法 - connection.createSession(boolean transacted, <u>int</u> acknowledgeMode);
transacted - 是否使用事務(wù), 可選值為true|false
true - 使用事務(wù), 當(dāng)設(shè)置此變量值, 則acknowledgeMode參數(shù)無效, 建議傳遞的acknowledgeMode參數(shù)值為
Session.SESSION_TRANSACTED
false - 不使用事務(wù), 設(shè)置此變量值, 則acknowledgeMode參數(shù)必須設(shè)置.
acknowledgeMode - 消息確認(rèn)機(jī)制, 可選值為:
Session.AUTO_ACKNOWLEDGE - 自動(dòng)確認(rèn)消息機(jī)制
Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機(jī)制
Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機(jī)制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地, 目的地命名即隊(duì)列命名, 消息消費(fèi)者需要通過此命名訪問對(duì)應(yīng)的隊(duì)列
destination = session.createQueue("test-mq");
// 創(chuàng)建消息生成者, 創(chuàng)建的消息生成者與某目的地對(duì)應(yīng), 即方法參數(shù)目的地.
producer = session.createProducer(destination);
// 創(chuàng)建消息對(duì)象, 創(chuàng)建一個(gè)文本消息, 此消息對(duì)象中保存要傳遞的文本數(shù)據(jù).
message = session.createTextMessage("hello,activeme");
// 發(fā)送消息
producer.send(message);
System.out.println("消息發(fā)送成功!");
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯(cuò)誤!!");
}finally{
try {
// 回收消息發(fā)送者資源
if(null != producer)
producer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會(huì)話資源
if(null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收鏈接資源
if(null != connection)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
|
2.5.3 第二步:創(chuàng)建一個(gè)測(cè)試類MessageTest
--添加junit類庫,快捷鍵ctrl+1
|
package cn.gzsxt.mq.test;
import org.junit.Test;
import cn.gzsxt.mq.producer.MyProducer;
public class MessageTest {
@Test
public void sendToMQ(){
MyProducer producer = new MyProducer();
producer.sendToMQ();
}
}
|
2.5.4 第三步:測(cè)試
(1)設(shè)置防火墻,配置61616端口。注意修改之后重啟防火墻。
(2)測(cè)試結(jié)果:
--查看控制臺(tái)
|
[圖片上傳失敗...(image-6aa9d4-1564373794299)]
|
--查看ActiveMQ管理控制界面
|
[圖片上傳失敗...(image-4252b4-1564373794299)]
|
--消息發(fā)送成功!!!
2.6 第四部分:創(chuàng)建消息消費(fèi)者,消費(fèi)消息
2.6.1 第一步:創(chuàng)建MyConsumer類
|
package cn.gzsxt.mq.consumer;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
@ClassName:MyConsumer
@Description: 消息消費(fèi)者代碼
*/
public class MyConsumer {
// 定義鏈接工廠
ConnectionFactory connectionFactory = null;
// 定義鏈接
Connection connection = null;
// 定義會(huì)話
Session session = null;
// 定義目的地
Destination destination = null;
// 定義消息消費(fèi)者
MessageConsumer consumer = null;
// 定義消息
Message message = null;
public void recieveFromMQ(){
try{
/*
創(chuàng)建鏈接工廠
ActiveMQConnectionFactory - 由ActiveMQ實(shí)現(xiàn)的ConnectionFactory接口實(shí)現(xiàn)類.
構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)
userName - 訪問ActiveMQ服務(wù)的用戶名, 用戶名可以通過<u>jetty</u>-realm.properties配置文件配置.
password - 訪問ActiveMQ服務(wù)的密碼, 密碼可以通過<u>jetty</u>-realm.properties配置文件配置.
brokerURL - 訪問ActiveMQ服務(wù)的路徑地址. 路徑結(jié)構(gòu)為 - 協(xié)議名://主機(jī)地址:端口號(hào)
此鏈接基于TCP/IP協(xié)議.
*/
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");
// 創(chuàng)建鏈接對(duì)象
connection = connectionFactory.createConnection();
// 啟動(dòng)鏈接
connection.start();
/*
創(chuàng)建會(huì)話對(duì)象
方法 - connection.createSession(boolean transacted, <u>int</u> acknowledgeMode);
transacted - 是否使用事務(wù), 可選值為true|false
true - 使用事務(wù), 當(dāng)設(shè)置此變量值, 則acknowledgeMode參數(shù)無效, 建議傳遞的acknowledgeMode參數(shù)值為
Session.SESSION_TRANSACTED
false - 不使用事務(wù), 設(shè)置此變量值, 則acknowledgeMode參數(shù)必須設(shè)置.
acknowledgeMode - 消息確認(rèn)機(jī)制, 可選值為:
Session.AUTO_ACKNOWLEDGE - 自動(dòng)確認(rèn)消息機(jī)制
Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機(jī)制
Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機(jī)制
*/
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建目的地, 目的地命名即隊(duì)列命名, 消息消費(fèi)者需要通過此命名訪問對(duì)應(yīng)的隊(duì)列
destination = session.createQueue("test-mq");
// 創(chuàng)建消息消費(fèi)者, 創(chuàng)建的消息消費(fèi)者與某目的地對(duì)應(yīng), 即方法參數(shù)目的地.
consumer = session.createConsumer(destination);
// 從ActiveMQ服務(wù)中獲取消息
message = consumer.receive();
TextMessage tMsg = (TextMessage) message;
System.out.println("從MQ中獲取的消息是:"+tMsg.getText());
}catch(Exception e){
e.printStackTrace();
System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯(cuò)誤!!");
}finally{
try {
// 回收消息消費(fèi)者資源
if(null != consumer)
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收會(huì)話資源
if(null != session)
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
try {
// 回收鏈接資源
if(null != connection)
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
|
2.6.2 第二步:修改測(cè)試類MessageTest,新增測(cè)試方法
|
@Test
public void recieveFromMQ(){
MyConsumer consumer = new MyConsumer();
consumer.recieveFromMQ();
}
|
2.6.3 第三步:測(cè)試
--查看Eclipse控制臺(tái)
--查看ActiveMQ管理控制界面