ActiveMQ系列-01 入門

ActiveMQ - 初步認(rèn)識

消息中間件應(yīng)用場景

  • 異步處理

場景說明: 用戶注冊,需要執(zhí)行三個業(yè)務(wù)邏輯,分別為寫入用戶表,發(fā)注冊郵件以及注冊短信

  • 應(yīng)用解耦

場景說明: 用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng)。

  • 流量削峰

場景說明: 秒殺活動,一般會因為流量過大,導(dǎo)致流量暴增,應(yīng)用掛掉。(在用戶請求與秒殺業(yè)務(wù)處理中間加入消息隊列)

ActiveMQ介紹與JMS協(xié)議

簡介

  • 什么是ActiveMQ?

ActiveMQ是遵守Apache開源規(guī)則的最流行,能力強勁的消息中間件。ActiveMQ是一個完全支持JMS1.1和J2EE1.4規(guī)范的JMS Provider實現(xiàn)。

  • 什么是JMS?

JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個Java平臺中關(guān)于面向消息中間件(MOM)的API,用于在兩個應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個與具體平臺無關(guān)的API,絕大多數(shù)MOM提供商都對JMS提供支持。

JMS消息模式

消息中間件一般有兩種傳遞模式:點對點模式(P2P)和發(fā)布-訂閱模式(Pub/Sub)

點對點模型(Queue隊列模型)

P2P: 即生產(chǎn)者和消費者之間的消息往來

ActiveMQ

每個消息都被發(fā)送到特定的消息隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時

P2P特點:

  • 每個消息只有一個消費者(Consumer),即一旦被消費,消息就不再在消息隊列中
  • 發(fā)送者和接收者之間在時間上沒有依賴性,也就是說當(dāng)發(fā)送者發(fā)送了消息之后,不管接收者有沒有正在運行,它不會影響到消息被發(fā)送到隊列
  • 接收者在成功接收消息之后需向隊列應(yīng)答成功

發(fā)布/訂閱模型(Publish-Subscribe)

發(fā)布/訂閱

包含三個角色: 主題(Topic),發(fā)布者(Publisher),訂閱者(Subscriber),多個發(fā)布者將消息發(fā)送到topic,系統(tǒng)將這些消息投遞到訂閱此topic的訂閱者。

ActiveMQ

發(fā)布者發(fā)送到topic的消息,只有訂閱了topic的訂閱者才會收到消息。topic實現(xiàn)了發(fā)布和訂閱,當(dāng)你發(fā)布一個消息,所有訂閱這個topic的服務(wù)都能得到這個消息,所以從1到N個訂閱者都能得到這個消息的拷貝。

發(fā)布/訂閱模型的特點:

  • 每個消息可以有多個消費者
  • 發(fā)布者和訂閱者之間有時間上的依賴性(先訂閱再發(fā)布)
  • 訂閱者必須保持運行的狀態(tài),才能接收發(fā)布者發(fā)布的消息

JMS編程API

要素 作用
Destination 表示消息所走通道的目標(biāo)定義,用來定義消息從發(fā)送端發(fā)出后要走的通道,而不是接收方。Destination屬于類對象
ConnectionFactory 用于創(chuàng)建連接對象,ConnectionFactory屬于管理類的對象
Connection 連接接口,所負(fù)責(zé)的重要工作時創(chuàng)建Session
Session 會話接口,這是一個非常重要的對象,消息發(fā)送者、消息接收者以及消息對象本省,都是通過這個會話對象創(chuàng)建的
MessageConsume 消息消費者,也就是訂閱消息并處理消息的對象
MessageProducer 消息的生產(chǎn)者,也就是用來發(fā)送消息的對象
  1. ConnectionFactory
    創(chuàng)建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。

  2. Destination
    Destination的意思是消息生產(chǎn)者的消息發(fā)送目標(biāo)或者說消息消費者的消息來源。對于消息生產(chǎn)者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、Topic

  3. Connection
    Connection表示在客戶端和JMS系統(tǒng)之間建立的連接(對TCP/IP socket的包裝)。Connection可以產(chǎn)生一個或多個Session。

  4. Session
    Session是我們對消息進(jìn)行操作的接口,可以通過Session創(chuàng)建生產(chǎn)者、消費者、消息等。Session提供了事務(wù)的功能,如果需要使用session發(fā)送/接收多個消息時,可以將這些發(fā)送/接收動作放到一個事務(wù)中。

  5. Producer
    消息生產(chǎn)者由Session創(chuàng)建,并用于將消息發(fā)送到Destination。同樣,消息生產(chǎn)者分兩種類型: QueueSender和TopicPublisher??梢哉{(diào)用消息生產(chǎn)者的方法(send或publish方法)發(fā)送消息

  6. Consumer
    消息消費者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息。兩張類型: QueueReceiver和TopicSubscriber??煞謩e通過session的createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建。當(dāng)然,也可以session的createDurableSubscriber方法來創(chuàng)建持久化的訂閱者。

  7. MessageListener
    消息監(jiān)聽器。如果注冊了消息監(jiān)聽器,一旦消息到達(dá),將自動調(diào)用監(jiān)聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。

ActiveMQ

ActiveMQ安裝

安裝(Linux)

第一步: 安裝JDK(略)

第二步: 下載activemq的壓縮包(apache-activemq-5.15.12-bin.tar.gz)到Linux系統(tǒng)

第三步: 解壓文件
tar -zxvf apache-activemq-5.15.12-bin.tar.gz

第四步: 進(jìn)入apache-activemq-5.15.12的bin目錄

cd apache-activemq-5.15.12/bin

第五步: 啟動activemq
./activemq start   (執(zhí)行2次:第一次生產(chǎn)配置文件;第二次啟動)

第六步: 停止activemq
./activemq stop

其它命令:
./activemq status -- 查看activemq的狀態(tài)
./activemq restart -- 重啟activemq
./activemq purge FOO.BAR -- 刪除隊列中的所有消息,隊列名稱是FOO.BAR
./activemq dstat -- 顯示默認(rèn)broker的所有主題和隊列統(tǒng)計信息
./activemq dstat topics -- 顯示主題的統(tǒng)計信息
./activemq dstat queue -- 顯示隊列的統(tǒng)計信息
...

訪問

http://127.0.0.1:8161

頁面控制臺: http://ip:8161 (監(jiān)控)
請求地址: tcp://ip:61616 ?。╦ava代碼訪問消息中間件)

初始用戶名和密碼: admin/admin

原生JMS開發(fā)

點對點模式

生產(chǎn)者
  1. Maven引入依賴
<dependencies>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.11</version>
        </dependency>
    </dependencies>
  1. 編寫生產(chǎn)消息的類(PTP_Producer.class)
public class PTP_Producer {
    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.創(chuàng)建連接
        Connection connection = factory.createConnection();
        //3.打開連接
        connection.start();
        //4.創(chuàng)建session
        /**
         * 參數(shù)一:是否開啟事務(wù)
         * 參數(shù)二:消息確認(rèn)機制
         */
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //5.創(chuàng)建目標(biāo)地址(Queue:點對點消息,Topic:發(fā)布訂閱消息)
        Queue queue = session.createQueue("queue01");
        //6.創(chuàng)建消息生產(chǎn)者
        MessageProducer producer=session.createProducer(queue);
        //7.創(chuàng)建消息
        TextMessage message=session.createTextMessage("hello,this is PTP message");
        //8.發(fā)送消息
        producer.send(message);
        System.out.println("生產(chǎn)者發(fā)送完畢...");
        //9.釋放資源
        session.close();
        connection.close();
    }
}
  1. 運行效果


    ActiveMQ

    ActiveMQ
消費者
  1. Maven引入依賴
    如上,略

  2. 編寫接收消息的類(PTP_Consumer.class) -- receive方法

public class PTP_Consumer {
    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.創(chuàng)建連接
        Connection connection = factory.createConnection();
        //3.打開連接
        connection.start();
        //4.創(chuàng)建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //5.指定目標(biāo)地址
        Queue queue = session.createQueue("queue01");
        //6.創(chuàng)建消息消費者
        MessageConsumer consumer = session.createConsumer(queue);
        //7.接受消息
        while (true){
            Message message = consumer.receive(); // 不斷的接收,還有一個方法receive(long l),這個是隔多少毫秒接收一次
            if(message == null){ // 表示沒有信息了,退出循環(huán)
                break;
            }

            if(message instanceof TextMessage){
                TextMessage textMessage = (TextMessage) message;
                System.out.println("接受到的消息: "+textMessage.getText());
            }
        }
    }
}
  1. 編寫接收消息的類(PTP_Consumer2.class) -- 監(jiān)聽器方法(常用)
public class PTP_Consumer2 {
    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.創(chuàng)建連接
        Connection connection = factory.createConnection();
        //3.打開連接
        connection.start();
        //4.創(chuàng)建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //5.指定目標(biāo)地址
        Queue queue = session.createQueue("queue01");
        //6.創(chuàng)建消息消費者
        MessageConsumer consumer = session.createConsumer(queue);
        //7.設(shè)置消息監(jiān)聽器來接收消息
        consumer.setMessageListener(new MessageListener() {
            // 處理消息
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收的消息(2):"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 注意: 在監(jiān)聽器模式下千萬不要關(guān)閉連接,一旦關(guān)閉,消息無法接收
    }
}
  1. 運行效果


    ActiveMQ

    ActiveMQ

發(fā)布訂閱模式

生產(chǎn)者
  1. Maven引入依賴
    如上,略

  2. 編寫生產(chǎn)類(PS_Producer.class)

/**
 * 發(fā)布訂閱模式-消息生產(chǎn)者
 */
public class PS_Producer {
    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.創(chuàng)建連接
        Connection connection = factory.createConnection();
        //3.打開連接
        connection.start();
        //4.創(chuàng)建session
        /**
         * 參數(shù)一:是否開啟事務(wù)
         * 參數(shù)二:消息確認(rèn)機制
         */
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //5.創(chuàng)建目標(biāo)地址(Queue:點對點消息,Topic:發(fā)布訂閱消息)
        Topic topic = session.createTopic("topic01");
        //6.創(chuàng)建消息生產(chǎn)者
        MessageProducer producer=session.createProducer(topic);
        //7.創(chuàng)建消息
        TextMessage message=session.createTextMessage("hello,this is PS message");
        //8.發(fā)送消息
        producer.send(message);
        System.out.println("生產(chǎn)者發(fā)送完畢...");
        //9.釋放資源
        session.close();
        connection.close();
    }
}
  1. 運行效果


    ActiveMQ

    ActiveMQ
消費者
  1. Maven引入依賴
    如上,略

  2. 編寫生產(chǎn)類(PS_Consumer.class)

/*
 * 發(fā)布訂閱模式消費者
 */
public class PS_Consumer {
    public static void main(String[] args) throws JMSException {
        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //2.創(chuàng)建連接
        Connection connection = factory.createConnection();
        //3.打開連接
        connection.start();
        //4.創(chuàng)建session
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        //5.指定目標(biāo)地址
        Topic topic = session.createTopic("topic01");
        //6.創(chuàng)建消息消費者
        MessageConsumer consumer = session.createConsumer(topic);
        //7.設(shè)置消息監(jiān)聽器來接收消息
        consumer.setMessageListener(new MessageListener() {
            // 處理消息
            @Override
            public void onMessage(Message message) {
                if(message instanceof TextMessage){
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("接收的消息---topic:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        // 注意: 在監(jiān)聽器模式下千萬不要關(guān)閉連接,一旦關(guān)閉,消息無法接收
    }
}
  1. 運行效果)


    ActiveMQ

這時我們看到訂閱到的topic消息是沒有被消費的。上面有說到,在發(fā)布訂閱模式下,一定要先啟動消費者,然后才能消費到發(fā)布者推送的訂閱的信息。讓我們重新啟動下PS_Producer類,再看看效果


ActiveMQ

這時已經(jīng)成功獲取到消息了,再看看頁面控制臺


ActiveMQ

消息入列2條,成功出列1條,1個消費者

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ActiveMQ 即時通訊服務(wù) 淺析http://www.cnblogs.com/hoojo/p/active_m...
    bboymonk閱讀 1,578評論 0 11
  • 什么是activeMQ activeMQ是一種開源的,實現(xiàn)了JMS1.1規(guī)范的,面向消息(MOM)的中間件,為應(yīng)用...
    趙鐵柱啊閱讀 2,105評論 1 6
  • 簡介 ActiveMQ 特點 ActiveMQ 是由 Apache 出品的一款開源消息中間件,旨在為應(yīng)用程序提供高...
    預(yù)流閱讀 6,005評論 4 21
  • ActiveMQ 簡介:ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ ...
    CoderZS閱讀 2,762評論 0 23
  • 那個叛逆的我,瘋狂旅行的我,沉迷小說的我,一直被我隱藏著,只有身邊的人才知道那個我,也是我,我想這是每個人的獨特,...
    曉曉E_L閱讀 443評論 0 0

友情鏈接更多精彩內(nèi)容