MQ之ActiveMQ

ActiveMQ

[toc]
簡(jiǎn)書不支持 toc 目錄模式,截圖一張。


image.png

什么是ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。

主要特點(diǎn):

  1. 多種語(yǔ)言和協(xié)議編寫客戶端。語(yǔ)言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應(yīng)用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
  2. 完全支持JMS1.1和J2EE 1.4規(guī)范 (持久化,XA消息,事務(wù))
  3. 對(duì)Spring的支持,ActiveMQ可以很容易內(nèi)嵌到使用Spring的系統(tǒng)里面去,而且也支持Spring2.0的特性
  4. 通過(guò)了常見J2EE服務(wù)器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測(cè)試,其中通過(guò)JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動(dòng)的部署到任何兼容J2EE 1.4 商業(yè)服務(wù)器上
  5. 支持多種傳送協(xié)議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
  6. 支持通過(guò)JDBC和journal提供高速的消息持久化
  7. 從設(shè)計(jì)上保證了高性能的集群,客戶端-服務(wù)器,點(diǎn)對(duì)點(diǎn)
  8. 支持Ajax
  9. 支持與Axis的整合
  10. 可以很容易得調(diào)用內(nèi)嵌JMS provider,進(jìn)行測(cè)試

ActiveMQ的消息形式

  • 對(duì)于消息的傳遞有兩種類型:
    • 一種是點(diǎn)對(duì)點(diǎn)的,即一個(gè)生產(chǎn)者和一個(gè)消費(fèi)者一一對(duì)應(yīng);
    • 另一種是發(fā)布/訂閱模式,即一個(gè)生產(chǎn)者產(chǎn)生消息并進(jìn)行發(fā)送后,可以由多個(gè)消費(fèi)者進(jìn)行接收。

JMS定義了五種不同的消息正文格式,以及調(diào)用的消息類型,允許你發(fā)送并接收以一些不同形式的數(shù)據(jù),提供現(xiàn)有消息格式的一些級(jí)別的兼容性。

  • StreamMessage -- Java原始值的數(shù)據(jù)流
  • MapMessage--一套名稱-值對(duì)
  • TextMessage--一個(gè)字符串對(duì)象
  • ObjectMessage--一個(gè)序列化的 Java對(duì)象
  • BytesMessage--一個(gè)字節(jié)的數(shù)據(jù)流

ActiveMQ的安裝

進(jìn)入http://activemq.apache.org/ 下載ActiveMQ


最新版本: 5.14.5

安裝環(huán)境:

需要jdk
安裝Linux系統(tǒng)。生產(chǎn)環(huán)境都是Linux系統(tǒng)。

安裝步驟

第一步: 把ActiveMQ 的壓縮包上傳到Linux系統(tǒng)。
第二步:解壓縮。
第三步:?jiǎn)?dòng)。
使用bin目錄下的activemq命令啟動(dòng):
[root@localhost bin]# ./activemq start
關(guān)閉:
[root@localhost bin]# ./activemq stop
查看狀態(tài):
[root@localhost bin]# ./activemq status

注意:如果ActiveMQ整合spring使用,不要使用activemq-all-5.14.5.jar包(spring 可能少方法)。建議使用5.11.2

進(jìn)入管理后臺(tái):

http://192.168.25.168:8161/admin
用戶名:admin
密碼:admin

可能出現(xiàn)的問(wèn)題:

405的問(wèn)題:機(jī)器名和 ip 沒(méi)有對(duì)上,修改 host 文件。
查看 hostname,然后檢查 hosts 文件中是否相同。
/etc/sysconfig/network

vim /etc/h/hosts

ActiveMQ的使用方法

Queue

點(diǎn)對(duì)點(diǎn)(point-to-point,簡(jiǎn)稱PTP)Queue消息傳遞模型

Producer

生產(chǎn)者:生產(chǎn)消息,發(fā)送端。
把jar包添加到工程中。使用5.11.2版本的jar包。


第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Queue對(duì)象。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
第八步:使用Producer對(duì)象發(fā)送消息。
第九步:關(guān)閉資源。

@Test
public void testQueueProducer() throws Exception {
    // 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
    //brokerURL服務(wù)器的ip及端口號(hào)
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    // 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
    connection.start();
    // 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
    //第一個(gè)參數(shù):是否開啟事務(wù)。true:開啟事務(wù),第二個(gè)參數(shù)忽略。
    //第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Queue對(duì)象。
    //參數(shù):隊(duì)列的名稱。
    Queue queue = session.createQueue("test-queue");
    // 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
    MessageProducer producer = session.createProducer(queue);
    // 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
    /*TextMessage message = new ActiveMQTextMessage();
    message.setText("hello activeMq,this is my first test.");*/
    TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
    // 第八步:使用Producer對(duì)象發(fā)送消息。
    producer.send(textMessage);
    // 第九步:關(guān)閉資源。
    producer.close();
    session.close();
    connection.close();
}

Consumer

消費(fèi)者:接收消息。
第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致queue,并且隊(duì)列的名稱一致。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
第七步:接收消息。
第八步:打印消息。
第九步:關(guān)閉資源

@Test
public void testQueueConsumer() throws Exception {
    // 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    // 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
    connection.start();
    // 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致queue,并且隊(duì)列的名稱一致。
    Queue queue = session.createQueue("test-queue");
    // 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
    MessageConsumer consumer = session.createConsumer(queue);
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {
        
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text = null;
                //取消息的內(nèi)容
                text = textMessage.getText();
                // 第八步:打印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    //等待鍵盤輸入
    System.in.read();
    // 第九步:關(guān)閉資源
    consumer.close();
    session.close();
    connection.close();
}



Topic

發(fā)布/訂閱(publish/subscribe,簡(jiǎn)稱pub/sub)Topic消息傳遞模型

Producer

使用步驟:
第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)Topic對(duì)象。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
第八步:使用Producer對(duì)象發(fā)送消息。
第九步:關(guān)閉資源。

@Test
public void testTopicProducer() throws Exception {
    // 第一步:創(chuàng)建ConnectionFactory對(duì)象,需要指定服務(wù)端ip及端口號(hào)。
    // brokerURL服務(wù)器的ip及端口號(hào)
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    // 第二步:使用ConnectionFactory對(duì)象創(chuàng)建一個(gè)Connection對(duì)象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:開啟連接,調(diào)用Connection對(duì)象的start方法。
    connection.start();
    // 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
    // 第一個(gè)參數(shù):是否開啟事務(wù)。true:開啟事務(wù),第二個(gè)參數(shù)忽略。
    // 第二個(gè)參數(shù):當(dāng)?shù)谝粋€(gè)參數(shù)為false時(shí),才有意義。消息的應(yīng)答模式。1、自動(dòng)應(yīng)答2、手動(dòng)應(yīng)答。一般是自動(dòng)應(yīng)答。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象(topic、queue),此處創(chuàng)建一個(gè)topic對(duì)象。
    // 參數(shù):話題的名稱。
    Topic topic = session.createTopic("test-topic");
    // 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Producer對(duì)象。
    MessageProducer producer = session.createProducer(topic);
    // 第七步:創(chuàng)建一個(gè)Message對(duì)象,創(chuàng)建一個(gè)TextMessage對(duì)象。
    /*
     * TextMessage message = new ActiveMQTextMessage(); message.setText(
     * "hello activeMq,this is my first test.");
     */
    TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
    // 第八步:使用Producer對(duì)象發(fā)送消息。
    producer.send(textMessage);
    // 第九步:關(guān)閉資源。
    producer.close();
    session.close();
    connection.close();
}

Consumer

消費(fèi)者:接收消息。
第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致topic,并且話題的名稱一致。
第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
第七步:接收消息。
第八步:打印消息。
第九步:關(guān)閉資源

@Test
public void testTopicConsumer() throws Exception {
    // 第一步:創(chuàng)建一個(gè)ConnectionFactory對(duì)象。
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
    // 第二步:從ConnectionFactory對(duì)象中獲得一個(gè)Connection對(duì)象。
    Connection connection = connectionFactory.createConnection();
    // 第三步:開啟連接。調(diào)用Connection對(duì)象的start方法。
    connection.start();
    // 第四步:使用Connection對(duì)象創(chuàng)建一個(gè)Session對(duì)象。
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 第五步:使用Session對(duì)象創(chuàng)建一個(gè)Destination對(duì)象。和發(fā)送端保持一致topic,并且話題的名稱一致。
    Topic topic = session.createTopic("test-topic");
    // 第六步:使用Session對(duì)象創(chuàng)建一個(gè)Consumer對(duì)象。
    MessageConsumer consumer = session.createConsumer(topic);
    // 第七步:接收消息。
    consumer.setMessageListener(new MessageListener() {

        @Override
        public void onMessage(Message message) {
            try {
                TextMessage textMessage = (TextMessage) message;
                String text = null;
                // 取消息的內(nèi)容
                text = textMessage.getText();
                // 第八步:打印消息。
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    });
    System.out.println("topic的消費(fèi)端03。。。。。");
    // 等待鍵盤輸入
    System.in.read();
    // 第九步:關(guān)閉資源
    consumer.close();
    session.close();
    connection.close();
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容