ActiveMQ - 初步認(rèn)識
消息中間件應(yīng)用場景
- 異步處理
場景說明: 用戶注冊,需要執(zhí)行三個業(yè)務(wù)邏輯,分別為寫入用戶表,發(fā)注冊郵件以及注冊短信
- 應(yīng)用解耦
場景說明: 用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng)。
- 流量削峰
場景說明: 秒殺活動,一般會因為流量過大,導(dǎo)致流量暴增,應(yīng)用掛掉。(在用戶請求與秒殺業(yè)務(wù)處理中間加入消息隊列)
ActiveMQ介紹與JMS協(xié)議
簡介
- 什么是ActiveMQ?
ActiveMQ是遵守Apache開源規(guī)則的最流行,能力強勁的消息中間件。ActiveMQ是一個完全支持JMS1.1和J2EE1.4規(guī)范的JMS Provider實現(xiàn)。
- 什么是JMS?
JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件(MOM)的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。
JMS消息模式
消息中間件一般有兩種傳遞模式:點對點模式(P2P)和發(fā)布-訂閱模式(Pub/Sub)
點對點模型(Queue隊列模型)
P2P: 即生產(chǎn)者和消費者之間的消息往來

每個消息都被發(fā)送到特定的消息隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時
P2P特點:
- 每個消息只有一個消費者(Consumer),即一旦被消費,消息就不再在消息隊列中
- 發(fā)送者和接收者之間在時間上沒有依賴性,也就是說當(dāng)發(fā)送者發(fā)送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發(fā)送到隊列
- 接收者在成功接收消息之后需向隊列應(yīng)答成功
發(fā)布/訂閱模型(Publish-Subscribe)
發(fā)布/訂閱
包含三個角色: 主題(Topic),發(fā)布者(Publisher),訂閱者(Subscriber),多個發(fā)布者將消息發(fā)送到topic,系統(tǒng)將這些消息投遞到訂閱此topic的訂閱者。

發(fā)布者發(fā)送到topic的消息,只有訂閱了topic的訂閱者才會收到消息。topic實現(xiàn)了發(fā)布和訂閱,當(dāng)你發(fā)布一個消息,所有訂閱這個topic的服務(wù)都能得到這個消息,所以從1到N個訂閱者都能得到這個消息的拷貝。
發(fā)布/訂閱模型的特點:
- 每個消息可以有多個消費者
- 發(fā)布者和訂閱者之間有時間上的依賴性(先訂閱再發(fā)布)
- 訂閱者必須保持運行的狀態(tài),才能接收發(fā)布者發(fā)布的消息
JMS編程API
| 要素 | 作用 |
|---|---|
| Destination | 表示消息所走通道的目標(biāo)定義,用來定義消息從發(fā)送端發(fā)出后要走的通道,而不是接收方。Destination屬于類對象 |
| ConnectionFactory | 用于創(chuàng)建連接對象,ConnectionFactory屬于管理類的對象 |
| Connection | 連接接口,所負(fù)責(zé)的重要工作時創(chuàng)建Session |
| Session | 會話接口,這是一個非常重要的對象,消息發(fā)送者、消息接收者以及消息對象本省,都是通過這個會話對象創(chuàng)建的 |
| MessageConsume | 消息消費者,也就是訂閱消息并處理消息的對象 |
| MessageProducer | 消息的生產(chǎn)者,也就是用來發(fā)送消息的對象 |
ConnectionFactory
創(chuàng)建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。Destination
Destination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說消息消費者的消息來源。對于消息生產(chǎn)者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、TopicConnection
Connection表示在客戶端和JMS系統(tǒng)之間建立的連接(對TCP/IP socket的包裝)。Connection可以產(chǎn)生一個或多個Session。Session
Session是我們對消息進(jìn)行操作的接口,可以通過Session創(chuàng)建生產(chǎn)者、消費者、消息等。Session提供了事務(wù)的功能,如果需要使用session發(fā)送/接收多個消息時,可以將這些發(fā)送/接收動作放到一個事務(wù)中。Producer
消息生產(chǎn)者由Session創(chuàng)建,并用于將消息發(fā)送到Destination。同樣,消息生產(chǎn)者分兩種類型: QueueSender和TopicPublisher??梢哉{(diào)用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息Consumer
消息消費者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息。兩張類型: QueueReceiver和TopicSubscriber??煞謩e通過session的createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建。當(dāng)然,也可以session的createDurableSubscriber方法來創(chuàng)建持久化的訂閱者。MessageListener
消息監(jiān)聽器。如果注冊了消息監(jiān)聽器,一旦消息到達(dá),將自動調(diào)用監(jiān)聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

ActiveMQ安裝
安裝(Linux)
第一步: 安裝JDK(略)
第二步: 下載activemq的壓縮包(apache-activemq-5.15.12-bin.tar.gz)到Linux系統(tǒng)
第三步: 解壓文件
tar -zxvf apache-activemq-5.15.12-bin.tar.gz
第四步: 進(jìn)入apache-activemq-5.15.12的bin目錄
cd apache-activemq-5.15.12/bin
第五步: 啟動activemq
./activemq start (執(zhí)行2次:第一次生產(chǎn)配置文件;第二次啟動)
第六步: 停止activemq
./activemq stop
其它命令:
./activemq status -- 查看activemq的狀態(tài)
./activemq restart -- 重啟activemq
./activemq purge FOO.BAR -- 刪除隊列中的所有消息,隊列名稱是FOO.BAR
./activemq dstat -- 顯示默認(rèn)broker的所有主題和隊列統(tǒng)計信息
./activemq dstat topics -- 顯示主題的統(tǒng)計信息
./activemq dstat queue -- 顯示隊列的統(tǒng)計信息
...
訪問
頁面控制臺: http://ip:8161 (監(jiān)控)
請求地址: tcp://ip:61616 ?。╦ava代碼訪問消息中間件)
初始用戶名和密碼: admin/admin
原生JMS開發(fā)
點對點模式
生產(chǎn)者
- Maven引入依賴
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
</dependencies>
- 編寫生產(chǎn)消息的類(PTP_Producer.class)
public class PTP_Producer {
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.打開連接
connection.start();
//4.創(chuàng)建session
/**
* 參數(shù)一:是否開啟事務(wù)
* 參數(shù)二:消息確認(rèn)機制
*/
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建目標(biāo)地址(Queue:點對點消息,Topic:發(fā)布訂閱消息)
Queue queue = session.createQueue("queue01");
//6.創(chuàng)建消息生產(chǎn)者
MessageProducer producer=session.createProducer(queue);
//7.創(chuàng)建消息
TextMessage message=session.createTextMessage("hello,this is PTP message");
//8.發(fā)送消息
producer.send(message);
System.out.println("生產(chǎn)者發(fā)送完畢...");
//9.釋放資源
session.close();
connection.close();
}
}
-
運行效果
ActiveMQ
ActiveMQ
消費者
Maven引入依賴
如上,略編寫接收消息的類(PTP_Consumer.class) -- receive方法
public class PTP_Consumer {
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.打開連接
connection.start();
//4.創(chuàng)建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.指定目標(biāo)地址
Queue queue = session.createQueue("queue01");
//6.創(chuàng)建消息消費者
MessageConsumer consumer = session.createConsumer(queue);
//7.接受消息
while (true){
Message message = consumer.receive(); // 不斷的接收,還有一個方法receive(long l),這個是隔多少毫秒接收一次
if(message == null){ // 表示沒有信息了,退出循環(huán)
break;
}
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
System.out.println("接受到的消息: "+textMessage.getText());
}
}
}
}
- 編寫接收消息的類(PTP_Consumer2.class) -- 監(jiān)聽器方法(常用)
public class PTP_Consumer2 {
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.打開連接
connection.start();
//4.創(chuàng)建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.指定目標(biāo)地址
Queue queue = session.createQueue("queue01");
//6.創(chuàng)建消息消費者
MessageConsumer consumer = session.createConsumer(queue);
//7.設(shè)置消息監(jiān)聽器來接收消息
consumer.setMessageListener(new MessageListener() {
// 處理消息
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收的消息(2):"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 注意: 在監(jiān)聽器模式下千萬不要關(guān)閉連接,一旦關(guān)閉,消息無法接收
}
}
-
運行效果
ActiveMQ
ActiveMQ
發(fā)布訂閱模式
生產(chǎn)者
Maven引入依賴
如上,略編寫生產(chǎn)類(PS_Producer.class)
/**
* 發(fā)布訂閱模式-消息生產(chǎn)者
*/
public class PS_Producer {
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.打開連接
connection.start();
//4.創(chuàng)建session
/**
* 參數(shù)一:是否開啟事務(wù)
* 參數(shù)二:消息確認(rèn)機制
*/
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.創(chuàng)建目標(biāo)地址(Queue:點對點消息,Topic:發(fā)布訂閱消息)
Topic topic = session.createTopic("topic01");
//6.創(chuàng)建消息生產(chǎn)者
MessageProducer producer=session.createProducer(topic);
//7.創(chuàng)建消息
TextMessage message=session.createTextMessage("hello,this is PS message");
//8.發(fā)送消息
producer.send(message);
System.out.println("生產(chǎn)者發(fā)送完畢...");
//9.釋放資源
session.close();
connection.close();
}
}
-
運行效果
ActiveMQ
ActiveMQ
消費者
Maven引入依賴
如上,略編寫生產(chǎn)類(PS_Consumer.class)
/*
* 發(fā)布訂閱模式消費者
*/
public class PS_Consumer {
public static void main(String[] args) throws JMSException {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
//2.創(chuàng)建連接
Connection connection = factory.createConnection();
//3.打開連接
connection.start();
//4.創(chuàng)建session
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
//5.指定目標(biāo)地址
Topic topic = session.createTopic("topic01");
//6.創(chuàng)建消息消費者
MessageConsumer consumer = session.createConsumer(topic);
//7.設(shè)置消息監(jiān)聽器來接收消息
consumer.setMessageListener(new MessageListener() {
// 處理消息
@Override
public void onMessage(Message message) {
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收的消息---topic:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 注意: 在監(jiān)聽器模式下千萬不要關(guān)閉連接,一旦關(guān)閉,消息無法接收
}
}
-
運行效果)
ActiveMQ
這時我們看到訂閱到的topic消息是沒有被消費的。上面有說到,在發(fā)布訂閱模式下,一定要先啟動消費者,然后才能消費到發(fā)布者推送的訂閱的信息。讓我們重新啟動下PS_Producer類,再看看效果

這時已經(jīng)成功獲取到消息了,再看看頁面控制臺

消息入列2條,成功出列1條,1個消費者






