ActiveMQ——基礎(chǔ)知識與模擬體驗

最近在學習一個Java的開源項目,git地址:https://gitee.com/shuzheng/zheng
該開源項目開發(fā)環(huán)境需具備ActiveMQ,所以特地學習一下ActiveMQ。

一、ActiveMQ是什么?

1)JMS概述
JMS即Java消息服務(Java Message Service的簡稱),是Java EE 的標準/規(guī)范之一。這種規(guī)范(標準)指出:消息的發(fā)送應該是異步的、非阻塞的。也就是說消息的發(fā)送者發(fā)送完消息后就直接返回了,不需要等待接收者返回后才能返回,發(fā)送者和接收者可以說是互不影響。所以這種規(guī)范(標準)能夠減輕或消除系統(tǒng)瓶頸,實現(xiàn)系統(tǒng)之間去除耦合,提高系統(tǒng)的整體可伸縮性和靈活性。JMS只是Java EE中定義的一組標準API,它自身并不是一個消息服務系統(tǒng),它是消息傳送服務的一個抽象,也就是說它定義了消息傳送的接口而并沒有具體實現(xiàn)。

2)ActiveMQ概述
ActiveMQ就是JMS規(guī)范的具體實現(xiàn);它是Apache下的一個項目,采用Java語言開發(fā),是一款非常流行的開源消息服務器(消息隊列中間件)

3)ActiveMQ與JMS關(guān)系


二者關(guān)系.png

JMS只是定義了一組有關(guān)消息傳送的規(guī)范和標準,并沒有真正實現(xiàn),也就說JMS只是定義了一組接口;其具體實現(xiàn)由不同的消息中間件廠商提供,比如Apache ActiveMQ就是JMS規(guī)范的具體實現(xiàn),Apache ActiveMQ才是一個消息服務系統(tǒng),而JMS不是。

消息隊列中間件是分布式系統(tǒng)中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現(xiàn)高性能,高可用,可伸縮和最終一致性架構(gòu)。是大型分布式系統(tǒng)不可缺少的中間件。目前在生產(chǎn)環(huán)境,使用較多的消息隊列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

二、應用場景——異步處理,應用解耦,流量削鋒和消息通訊

2.1異步處理
場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信。
傳統(tǒng)的做法:1.串行的方式; 2.并行方式。
1)串行方式:將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件,再發(fā)送注冊短信。以上三個任務全部完成后,返回給客戶端。

2)并行方式:將注冊信息寫入數(shù)據(jù)庫成功后,發(fā)送注冊郵件的同時,發(fā)送注冊短信。以上三個任務完成后,返回給客戶端。
與串行的差別是,并行的方式可以提高處理的時間。引入消息隊列,就可以將不是必須的業(yè)務邏輯改為異步處理。

2.2應用解耦
場景說明:用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng)。傳統(tǒng)的做法是,訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口。這種模式存在如下缺點:
1) 假如庫存系統(tǒng)無法訪問,則訂單減庫存將失敗,從而導致訂單失??;

2) 訂單系統(tǒng)與庫存系統(tǒng)耦合;

引入應用消息隊列后的方案:
1)訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶訂單下單成功。

2)庫存系統(tǒng):訂閱下單的消息,采用拉/推的方式,獲取下單信息,庫存系統(tǒng)根據(jù)下單信息,進行庫存操作。

假如:在下單時庫存系統(tǒng)不能正常使用。也不影響正常下單,因為下單后,訂單系統(tǒng)寫入消息隊列就不再關(guān)心其他的后續(xù)操作了。實現(xiàn)訂單系統(tǒng)與庫存系統(tǒng)的應用解耦。

2.3流量削鋒
流量削鋒也是消息隊列中的常用場景,一般在秒殺或團搶活動中使用廣泛。
應用場景:秒殺活動,一般會因為流量過大,導致流量暴增,應用掛掉。為解決這個問題,一般需要在應用前端加入消息隊列。
可以控制活動的人數(shù);
可以緩解短時間內(nèi)高流量壓垮應用;

用戶的請求,服務器接收后,首先寫入消息隊列。假如消息隊列長度超過最大數(shù)量,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面;
秒殺業(yè)務根據(jù)消息隊列中的請求信息,再做后續(xù)處理。

2.4日志處理
日志處理是指將消息隊列用在日志處理中,比如Kafka的應用,解決大量日志傳輸?shù)膯栴}。
日志采集客戶端,負責日志數(shù)據(jù)采集,定時寫受寫入Kafka隊列;
Kafka消息隊列,負責日志數(shù)據(jù)的接收,存儲和轉(zhuǎn)發(fā);
日志處理應用:訂閱并消費kafka隊列中的日志數(shù)據(jù);

二.ActiveMQ的使用

1、JMS兩種消息傳送模式
1)點對點( Point-to-Point):專門用于使用隊列Queue傳送消息;基于隊列Queue的點對點消息只能被一個消費者消費,如多個消費者都注冊到同一個消息隊列上,當生產(chǎn)者發(fā)送一條消息后,而只有其中一個消費者會接收到該消息,而不是所有消費者都能接收到該消息。

2)發(fā)布/訂閱(Publish/Subscribe):專門用于使用主題Topic傳送消息?;谥黝}的發(fā)布與訂閱消息能被多個消費者消費,生產(chǎn)者發(fā)送的消息,所有訂閱了該topic的消費者都能接收到。

2、 JMS API可以分為3個主要部分:
1)公共API:可用于向一個隊列或主題發(fā)送消息或從其中接收消息;
2)點對點API:專門用于使用隊列Queue傳送消息;
3)發(fā)布/訂閱API:專門用于使用主題Topic傳送消息。

JMS公共API:
在JMS公共API內(nèi)部,和發(fā)送與接收消息有關(guān)的JMS API接口主要是:ConnectionFactory/Connection/Session/Message/Destination/MessageProducer/MessageConsumer 。它們的關(guān)系是:一旦有了ConnectionFactory,就可以創(chuàng)建Connection,一旦有了Connection,就可以創(chuàng)建Session,而一旦有了Session,就可以創(chuàng)建Message、MessageProducer和MessageConsumer。

JMS點對點API:
點對點(p2p)消息傳送模型API是指JMS API之內(nèi)基于隊列(Queue)的接口:QueueConnectionFactory/QueueConnection/QueueSession/Message/Queue/QueueSender/QueueReceiver. 從接口的命名可以看出,大多數(shù)接口名稱僅僅是在公共API接口名稱之前添加Queue一詞。一般來說,使用點對點消息傳送模型的應用程序?qū)⑹褂没陉犃械腁PI,而不使用公共API 。

JMS發(fā)布/訂閱API:
發(fā)布/訂閱消息傳送模型API是指JMS API之內(nèi)基于主題(Topic)的接口:TopicConnectionFactory/TopicConnection/TopicSession/Message/Topic/TopicPublisher/TopicSubscriber. 由于基于主題(Topic)的JMS API類似于基于隊列(Queue)的API,因此在大多數(shù)情況下,Queue這個詞會由Topic取代。

ActiveMQ點對點發(fā)送與接收消息示例.png

3、ActiveMQ的下載與安裝
1)直接去官網(wǎng)(http://activemq.apache.org/)下載最新版本即可,由于這是免安裝的,只需要解壓就行了。安裝完之后進入bin目錄,雙擊 activemq.bat文件(linux下在bin目錄下執(zhí)行 activemq start

2)訪問控制臺(檢驗是否成功)
在瀏覽器輸入:http://ip:8161/admin/ 會彈出登陸框,賬號和密碼都是默認admin。

3)修改端口號
61616為對外服務端口號
8161為控制器端口號
當端口號沖突時,可以修改這兩個端口號。cd conf ,修改activemq.xml 修改里面的61616端口。修改jetty.xml,修改里面的8161端口。

4)刪除不活動隊列
一般情況下,ActiveMQ的queue或者topic在不使用之后,可以通過web控制臺來刪除掉。當然,也可以通過配置,使得broker可以自動探測到無用的隊列(一定時間內(nèi)為空的隊列)并刪除掉,回收響應資源。
activemq.xml

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.base}/data" destroyApplicationContextOnStop="true" schedulePeriodForDestinationPurge="10000">
    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
              <pendingSubscriberPolicy>
                <vmCursor />
              </pendingSubscriberPolicy>
            </policyEntry>
            <policyEntry queue=">" gcInactiveDestinations="true" inactiveTimoutBeforeGC="100000" memoryLimit="1mb">
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>
</broker>

schedulePeriodForDestinationPurge:10000 每十秒檢查一次,默認為0,此功能關(guān)閉
gcInactiveDestinations: true 刪除掉不活動隊列,默認為false
inactiveTimoutBeforeGC:30000 不活動30秒后刪除,默認為60秒
PS:對于topic的不活動隊列只是,10秒中之類沒有消費者進行注冊監(jiān)聽,如果一個用戶事先注冊了這個監(jiān)聽,但是他一直沒有登錄,那么這算活動隊列。而queue只要有消息沒有出隊列就表示活動隊列。

附:消息中間件的用途和優(yōu)點
1)將數(shù)據(jù)從一個應用程序傳送到另一個應用程序,或者從軟件的一個模塊傳送到另外一個模塊;
2)負責建立網(wǎng)絡通信的通道,進行數(shù)據(jù)的可靠傳送;
3)保證數(shù)據(jù)不重發(fā),不丟失;
4)能夠?qū)崿F(xiàn)跨平臺操作,能夠為不同操作系統(tǒng)上的軟件集成技工數(shù)據(jù)傳送服務

三、模擬HelloWord體驗ActiveMQ

下載MQ以后,要將---bin.zip解壓縮后里面的activemq-all-5.11.1.jar包加入到classpath下面,這個包包含了所有jms接口api的實現(xiàn),項目結(jié)構(gòu)圖:

MQ

啟動MQ,啟動成功,打開該鏈接:http://localhost:8161/admin/
賬號和密碼都是admin,啟動成功后,圖如下:
image.png

生產(chǎn)者和消費者代碼如下:

package com.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 生產(chǎn)者(消息發(fā)送者)
 * @author admin
 *
 */
public class JMSProducer {

    // 賬號(默認連接用戶名)
    private static final String user = ActiveMQConnection.DEFAULT_USER;
    // 密碼
    private static final String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
    // 地址
    private static final String url = ActiveMQConnection.DEFAULT_BROKER_URL;

    private static final int sendnum = 10;

    public static void main(String[] args) {
        // 連接工廠
        ConnectionFactory connectionFactory;
        // 連接
        Connection connection = null;
        // 會話
        Session session;
        // 消息目的
        Destination destination;
        // 消息生產(chǎn)者
        MessageProducer messageProducer;
        // 實例化工廠
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.user, JMSProducer.pwd, JMSProducer.url);
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("HelloWord");
            messageProducer = session.createProducer(destination);
            sendMessage(session, messageProducer);
            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 發(fā)送消息
     * @param session
     * @param messageProducer  消息生產(chǎn)者
     * @throws Exception
     */
    public static void sendMessage(Session session, MessageProducer messageProducer) throws Exception {
        for (int i = 0; i < 10; i++) {
            TextMessage message = session.createTextMessage("ActiveMQ" + i);
            System.out.println("發(fā)送消息" + i);
            messageProducer.send(message);
        }
    }
}
package com.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 消費者(消息接收者)
 * @author admin
 *
 */
public class JMSConsumer {
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;// 默認連接用戶名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;// 默認連接密碼
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;// 默認連接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;// 連接工廠
        Connection connection = null;// 連接

        Session session;// 會話 接受或者發(fā)送消息的線程
        Destination destination;// 消息的目的地

        MessageConsumer messageConsumer;// 消息的消費者

        // 實例化連接工廠
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD,
                JMSConsumer.BROKEURL);

        try {
            // 通過連接工廠獲取連接
            connection = connectionFactory.createConnection();
            // 啟動連接
            connection.start();
            // 創(chuàng)建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 創(chuàng)建一個連接HelloWorld的消息隊列
            destination = session.createQueue("HelloWord");
            // 創(chuàng)建消息消費者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if (textMessage != null) {
                    System.out.println("收到的消息:" + textMessage.getText());
                } else {
                    break;
                }
            }

        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

啟動生產(chǎn)者,輸出如下:


image.png

然后看一下ActiveMQ服務器,Queues內(nèi)容如下:


MQ服務器說明

我們可以看到創(chuàng)建了一個名稱為HelloWorld的消息隊列,隊列中有10條消息未被消費,我們也可以通過Browse查看是哪些消息:
image.png

這些隊列中的消息,被刪除,消費者則無法消費,接下來,運行一下消費者:


消費者輸出

現(xiàn)在我們再看一下ActiveMQ服務器,Queues內(nèi)容如下:
1536301523(1).jpg

我們可以看到HelloWorld的消息隊列發(fā)生變化,多一個消息者,隊列中的10條消息被消費了,點擊Browse查看,已經(jīng)為空了。
點擊Active Consumers,我們可以看到這個消費者的詳細信息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • 中間件 非底層操作系統(tǒng)軟件,非業(yè)務應用軟件,不能直接給最終用戶使用和帶來價值的軟件。 消息中間件 關(guān)注于數(shù)據(jù)的發(fā)送...
    wch853閱讀 1,079評論 0 0
  • ActiveMQ 即時通訊服務 淺析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk閱讀 1,588評論 0 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,632評論 19 139
  • 大唐武德九年六月初一丁巳日。 此時正是大上午的時候,卻除了太陽,還有一顆星星,居然在晝?nèi)罩幸擦恋每梢???蛇h方的烏云...
    劉珞閱讀 933評論 5 10
  • 我好蠢啊 小自習室今晚多來了幾個沒見過的人 一開門 被驚住了 在思考自己是不是走錯了自習室 站在門口 一動不動 反...
    翮笙閱讀 203評論 0 0

友情鏈接更多精彩內(nèi)容