最近在學習一個Java的開源項目,git地址:https://gitee.com/shuzheng/zheng
該開源項目開發(fā)環(huán)境需具備ActiveMQ,所以特地學習一下ActiveMQ。
一、ActiveMQ是什么?
1)JMS概述
JMS即Java消息服務(Java Message Service的簡稱),是Java EE 的標準/規(guī)范之一。這種規(guī)范(標準)指出:消息的發(fā)送應該是異步的、非阻塞的。也就是說消息的發(fā)送者發(fā)送完消息后就直接返回了,不需要等待接收者返回后才能返回,發(fā)送者和接收者可以說是互不影響。所以這種規(guī)范(標準)能夠減輕或消除系統(tǒng)瓶頸,實現(xiàn)系統(tǒng)之間去除耦合,提高系統(tǒng)的整體可伸縮性和靈活性。JMS只是Java EE中定義的一組標準API,它自身并不是一個消息服務系統(tǒng),它是消息傳送服務的一個抽象,也就是說它定義了消息傳送的接口而并沒有具體實現(xiàn)。
2)ActiveMQ概述
ActiveMQ就是JMS規(guī)范的具體實現(xiàn);它是Apache下的一個項目,采用Java語言開發(fā),是一款非常流行的開源消息服務器(消息隊列中間件)
3)ActiveMQ與JMS關(guān)系

JMS只是定義了一組有關(guān)消息傳送的規(guī)范和標準,并沒有真正實現(xiàn),也就說JMS只是定義了一組接口;其具體實現(xiàn)由不同的消息中間件廠商提供,比如Apache ActiveMQ就是JMS規(guī)范的具體實現(xiàn),Apache ActiveMQ才是一個消息服務系統(tǒng),而JMS不是。
消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件。目前在生產(chǎn)環(huán)境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。
二、應用場景——異步處理,應用解耦,流量削鋒和消息通訊
2.1異步處理
場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信。
傳統(tǒng)的做法:1.串行的方式; 2.并行方式。
1)串行方式:將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件,再發(fā)送注冊短信。以上三個任務全部完成后,返回給客戶端。
2)并行方式:將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件的同時,發(fā)送注冊短信。以上三個任務完成后,返回給客戶端。
與串行的差別是,并行的方式可以提高處理的時間。引入消息隊列,就可以將不是必須的業(yè)務邏輯改為異步處理。
2.2應用解耦
場景說明:用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng)。傳統(tǒng)的做法是,訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口。這種模式存在如下缺點:
1) 假如庫存系統(tǒng)無法訪問,則訂單減庫存將失敗,從而導致訂單失??;
2) 訂單系統(tǒng)與庫存系統(tǒng)耦合;
引入應用消息隊列后的方案:
1)訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。
2)庫存系統(tǒng):訂閱下單的消息,采用拉/推的方式,獲取下單信息,庫存系統(tǒng)根據(jù)下單信息,進行庫存操作。
假如:在下單時庫存系統(tǒng)不能正常使用。也不影響正常下單,因為下單后,訂單系統(tǒng)寫入消息隊列就不再關(guān)心其他的后續(xù)操作了。實現(xiàn)訂單系統(tǒng)與庫存系統(tǒng)的應用解耦。
2.3流量削鋒
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。
可以控制活動的人數(shù);
可以緩解短時間內(nèi)高流量壓垮應用;
用戶的請求,服務器接收后,首先寫入消息隊列。假如消息隊列長度超過最大數(shù)量,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面;
秒殺業(yè)務根據(jù)消息隊列中的請求信息,再做后續(xù)處理。
2.4日志處理
日志處理是指將消息隊列用在日志處理中,比如Kafka的應用,解決大量日志傳輸?shù)膯栴}。
日志采集客戶端,負責日志數(shù)據(jù)采集,定時寫受寫入Kafka隊列;
Kafka消息隊列,負責日志數(shù)據(jù)的接收,存儲和轉(zhuǎn)發(fā);
日志處理應用:訂閱并消費kafka隊列中的日志數(shù)據(jù);
二.ActiveMQ的使用
1、JMS兩種消息傳送模式
1)點對點( Point-to-Point):專門用于使用隊列Queue傳送消息;基于隊列Queue的點對點消息只能被一個消費者消費,如多個消費者都注冊到同一個消息隊列上,當生產(chǎn)者發(fā)送一條消息后,而只有其中一個消費者會接收到該消息,而不是所有消費者都能接收到該消息。
2)發(fā)布/訂閱(Publish/Subscribe):專門用于使用主題Topic傳送消息?;谥黝}的發(fā)布與訂閱消息能被多個消費者消費,生產(chǎn)者發(fā)送的消息,所有訂閱了該topic的消費者都能接收到。
2、 JMS API可以分為3個主要部分:
1)公共API:可用于向一個隊列或主題發(fā)送消息或從其中接收消息;
2)點對點API:專門用于使用隊列Queue傳送消息;
3)發(fā)布/訂閱API:專門用于使用主題Topic傳送消息。
JMS公共API:
在JMS公共API內(nèi)部,和發(fā)送與接收消息有關(guān)的JMS API接口主要是:ConnectionFactory/Connection/Session/Message/Destination/MessageProducer/MessageConsumer 。它們的關(guān)系是:一旦有了ConnectionFactory,就可以創(chuàng)建Connection,一旦有了Connection,就可以創(chuàng)建Session,而一旦有了Session,就可以創(chuàng)建Message、MessageProducer和MessageConsumer。
JMS點對點API:
點對點(p2p)消息傳送模型API是指JMS API之內(nèi)基于隊列(Queue)的接口:QueueConnectionFactory/QueueConnection/QueueSession/Message/Queue/QueueSender/QueueReceiver. 從接口的命名可以看出,大多數(shù)接口名稱僅僅是在公共API接口名稱之前添加Queue一詞。一般來說,使用點對點消息傳送模型的應用程序?qū)⑹褂没陉犃械腁PI,而不使用公共API 。
JMS發(fā)布/訂閱API:
發(fā)布/訂閱消息傳送模型API是指JMS API之內(nèi)基于主題(Topic)的接口:TopicConnectionFactory/TopicConnection/TopicSession/Message/Topic/TopicPublisher/TopicSubscriber. 由于基于主題(Topic)的JMS API類似于基于隊列(Queue)的API,因此在大多數(shù)情況下,Queue這個詞會由Topic取代。

3、ActiveMQ的下載與安裝
1)直接去官網(wǎng)(http://activemq.apache.org/)下載最新版本即可,由于這是免安裝的,只需要解壓就行了。安裝完之后進入bin目錄,雙擊 activemq.bat文件(linux下在bin目錄下執(zhí)行 activemq start)
2)訪問控制臺(檢驗是否成功)
在瀏覽器輸入:http://ip:8161/admin/ 會彈出登陸框,賬號和密碼都是默認admin。
3)修改端口號
61616為對外服務端口號
8161為控制器端口號
當端口號沖突時,可以修改這兩個端口號。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。
4)刪除不活動隊列
一般情況下,ActiveMQ的queue或者topic在不使用之后,可以通過web控制臺來刪除掉。當然,也可以通過配置,使得broker可以自動探測到無用的隊列(一定時間內(nèi)為空的隊列)并刪除掉,回收響應資源。
activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true" schedulePeriodForDestinationPurge="10000">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
schedulePeriodForDestinationPurge:10000 每十秒檢查一次,默認為0,此功能關(guān)閉
gcInactiveDestinations: true 刪除掉不活動隊列,默認為false
inactiveTimoutBeforeGC:30000 不活動30秒后刪除,默認為60秒
PS:對于topic的不活動隊列只是,10秒中之類沒有消費者進行注冊監(jiān)聽,如果一個用戶事先注冊了這個監(jiān)聽,但是他一直沒有登錄,那么這算活動隊列。而queue只要有消息沒有出隊列就表示活動隊列。
附:消息中間件的用途和優(yōu)點
1)將數(shù)據(jù)從一個應用程序傳送到另一個應用程序,或者從軟件的一個模塊傳送到另外一個模塊;
2)負責建立網(wǎng)絡通信的通道,進行數(shù)據(jù)的可靠傳送;
3)保證數(shù)據(jù)不重發(fā),不丟失;
4)能夠?qū)崿F(xiàn)跨平臺操作,能夠為不同操作系統(tǒng)上的軟件集成技工數(shù)據(jù)傳送服務
三、模擬HelloWord體驗ActiveMQ
下載MQ以后,要將---bin.zip解壓縮后里面的activemq-all-5.11.1.jar包加入到classpath下面,這個包包含了所有jms接口api的實現(xiàn),項目結(jié)構(gòu)圖:

啟動MQ,啟動成功,打開該鏈接:http://localhost:8161/admin/
賬號和密碼都是admin,啟動成功后,圖如下:

生產(chǎn)者和消費者代碼如下:
package com.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 生產(chǎn)者(消息發(fā)送者)
* @author admin
*
*/
public class JMSProducer {
// 賬號(默認連接用戶名)
private static final String user = ActiveMQConnection.DEFAULT_USER;
// 密碼
private static final String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
// 地址
private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static final int sendnum = 10;
public static void main(String[] args) {
// 連接工廠
ConnectionFactory connectionFactory;
// 連接
Connection connection = null;
// 會話
Session session;
// 消息目的
Destination destination;
// 消息生產(chǎn)者
MessageProducer messageProducer;
// 實例化工廠
connectionFactory = new ActiveMQConnectionFactory(JMSProducer.user, JMSProducer.pwd, JMSProducer.url);
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("HelloWord");
messageProducer = session.createProducer(destination);
sendMessage(session, messageProducer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 發(fā)送消息
* @param session
* @param messageProducer 消息生產(chǎn)者
* @throws Exception
*/
public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
for (int i = 0; i < 10; i++) {
TextMessage message = session.createTextMessage("ActiveMQ" + i);
System.out.println("發(fā)送消息" + i);
messageProducer.send(message);
}
}
}
package com.activemq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消費者(消息接收者)
* @author admin
*
*/
public class JMSConsumer {
private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默認連接用戶名
private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默認連接密碼
private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默認連接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;// 連接工廠
Connection connection = null;// 連接
Session session;// 會話 接受或者發(fā)送消息的線程
Destination destination;// 消息的目的地
MessageConsumer messageConsumer;// 消息的消費者
// 實例化連接工廠
connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
JMSConsumer.BROKEURL);
try {
// 通過連接工廠獲取連接
connection = connectionFactory.createConnection();
// 啟動連接
connection.start();
// 創(chuàng)建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 創(chuàng)建一個連接HelloWorld的消息隊列
destination = session.createQueue("HelloWord");
// 創(chuàng)建消息消費者
messageConsumer = session.createConsumer(destination);
while (true) {
TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
if (textMessage != null) {
System.out.println("收到的消息:" + textMessage.getText());
} else {
break;
}
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
啟動生產(chǎn)者,輸出如下:

然后看一下ActiveMQ服務器,Queues內(nèi)容如下:

我們可以看到創(chuàng)建了一個名稱為HelloWorld的消息隊列,隊列中有10條消息未被消費,我們也可以通過Browse查看是哪些消息:

這些隊列中的消息,被刪除,消費者則無法消費,接下來,運行一下消費者:

現(xiàn)在我們再看一下ActiveMQ服務器,Queues內(nèi)容如下:

我們可以看到HelloWorld的消息隊列發(fā)生變化,多一個消息者,隊列中的10條消息被消費了,點擊Browse查看,已經(jīng)為空了。
點擊Active Consumers,我們可以看到這個消費者的詳細信息。