電商項(xiàng)目(五)

1 ActiveMQ簡(jiǎn)介

1.1 ActiveMQ是什么

ActiveMQ是一個(gè)消息隊(duì)列應(yīng)用服務(wù)器。支持JMS****規(guī)范。

1.1.1 JMS概述

全稱:Java Message Service ,即為Java消息服務(wù),是一套java消息服務(wù)的API標(biāo)準(zhǔn)。(標(biāo)準(zhǔn)即接口)

實(shí)現(xiàn)了JMS標(biāo)準(zhǔn)的系統(tǒng),稱之為JMS Provider。

1.1.2 消息隊(duì)列

1.1.2.1 概念

消息隊(duì)列是在消息的傳輸過程中保存消息的容器,提供一種不同進(jìn)程或者同一進(jìn)程不同線程直接通訊的方式。

[圖片上傳失敗...(image-9b371a-1564373794322)]

Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生和發(fā)送消息到 Broker;

Broker:消息處理中心。負(fù)責(zé)消息存儲(chǔ)、確認(rèn)、重試等,一般其中會(huì)包含多個(gè) queue;

Consumer:消息消費(fèi)者,負(fù)責(zé)從 Broker 中獲取消息,并進(jìn)行相應(yīng)處理;

1.1.2.2 常見消息隊(duì)列應(yīng)用

(1)、ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。

(2)、RabbitMQ

RabbitMQ是一個(gè)在AMQP基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開源協(xié)議。開發(fā)語言為Erlang。

(3)、RocketMQ

由阿里巴巴定義開發(fā)的一套消息隊(duì)列應(yīng)用服務(wù)。

1.2 ActiveMQ能做什么

(1)實(shí)現(xiàn)兩個(gè)不同應(yīng)用(程序)之間的消息通訊。

(2)實(shí)現(xiàn)同一個(gè)應(yīng)用,不同模塊之間的消息通訊。

1.3 ActiveMQ下載

ActiveMQ官網(wǎng)地址: http://activemq.apache.org

ActiveMQ下載地址:http://activemq.apache.org/download-archives.html

--可供下載的歷史版本

--說明:

ActiveMQ 5.10.x以上版本必須使用JDK1.8才能正常使用。

ActiveMQ 5.9.x及以下版本使用JDK1.7即可正常使用。

|

[圖片上傳失敗...(image-8a6a26-1564373794300)]

|

--根據(jù)操作系統(tǒng),選擇下載版本。(本教程下載Linux版本)

|

[圖片上傳失敗...(image-2cabe4-1564373794300)]

|

1.4 ActiveMQ主要特點(diǎn)

(1)支持多語言、多協(xié)議客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應(yīng)用協(xié)議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

(2)對(duì)Spring的支持,ActiveMQ可以很容易整合到Spring的系統(tǒng)里面去。

(3)支持高可用、高性能的集群模式。

2 入門示例

2.1 需求

使用ActiveMQ實(shí)現(xiàn)消息隊(duì)列模型。

2.2 配置步驟說明

(1)搭建ActiveMQ消息服務(wù)器。

(2)創(chuàng)建一個(gè)java項(xiàng)目。

(3)創(chuàng)建消息生產(chǎn)者,發(fā)送消息。

(4)創(chuàng)建消息消費(fèi)者,接收消息。

2.3 第一部分:搭建ActiveMQ消息服務(wù)器

2.3.1 第一步:下載、上傳至Linux

--說明:確保已經(jīng)安裝了jdk

|

[圖片上傳失敗...(image-c63245-1564373794300)]

|

2.3.2 第二步:安裝到/usr/local/activemq目錄

(1)解壓到/usr/local目錄下

|

[root@node07192 ~]# tar -zxvf apache-activemq-5.9.0-bin.tar.gz -C /usr/local

|

(2)修改名稱為activemq

|

[root@node07192 ~]# cd /usr/local/

[root@node07192 local]# mv apache-activemq-5.9.0/ activemq

|

2.3.3 第三步:?jiǎn)?dòng)ActiveMQ服務(wù)器

--說明:ActiveMQ是免安裝軟件,解壓即可啟動(dòng)服務(wù)。

|

[root@node07192 local]# cd activemq/bin

[root@node07192 bin]# ./activemq start

|

--查看ActiveMQ啟動(dòng)狀態(tài)

|

[root@node07192 bin]# ./activemq status

[圖片上傳失敗...(image-f26a6a-1564373794300)]

|

2.3.4 第四步:瀏覽器訪問ActiveMQ管理界面

2.3.4.1 Step1:查看ActiveMQ管理界面的服務(wù)端口。在/conf/jetty.xml中

--訪問管理控制臺(tái)的服務(wù)端口,默認(rèn)為:8161

|

[root@node07192 bin]# cd ../conf

[root@node07192 conf]# vim jetty.xml

[圖片上傳失敗...(image-9e080e-1564373794300)]

|

2.3.4.2 Step2:查看ActiveMQ用戶、密碼。在/conf/users.properties中:

--默認(rèn)的用戶名、密碼均為amdin

|

[root@node07192 conf]# vim users.properties

[圖片上傳失敗...(image-633981-1564373794300)]

|

2.3.4.3 Step3:訪問ActiveMQ管理控制臺(tái)。地址:http://ip:8161/

--注意:防火墻是沒有配置該服務(wù)的端口的。

因此,要訪問該服務(wù),必須在防火墻中配置。

(1****)修改防火墻,開放8161****端口

|

[root@node07192 conf]# vim /etc/sysconfig/iptables

[圖片上傳失敗...(image-8f9245-1564373794300)]

|

(2****)重啟防火墻

|

[root@node07192 conf]# service iptables restart

|

(3)登錄管理控制臺(tái)

--登陸,用戶名、密碼均為admin

|

[圖片上傳失敗...(image-9c205-1564373794300)]

|

--控制臺(tái)主界面

|

[圖片上傳失敗...(image-48d268-1564373794300)]

|

--搭建ActiveMQ服務(wù)器成功!!!

2.4 第二部分:創(chuàng)建java項(xiàng)目,導(dǎo)入jar包

--導(dǎo)包說明:

ActiveMQ的解壓包中,提供了運(yùn)行ActiveMQ的所有jar。

|

[圖片上傳失敗...(image-a464ce-1564373794300)]

|

--創(chuàng)建項(xiàng)目

|

[圖片上傳失敗...(image-f2670f-1564373794300)]

|

2.5 第三部分:創(chuàng)建消息生成者,發(fā)送消息

--說明:ActiveMQ是實(shí)現(xiàn)了JMS規(guī)范的。在實(shí)現(xiàn)消息服務(wù)的時(shí)候,必須基于API接口規(guī)范。

2.5.1 JMS常用的API說明

下述API都是接口類型,定義在javax.jms包中,是JMS標(biāo)準(zhǔn)接口定義。ActiveMQ完全實(shí)現(xiàn)這一套api標(biāo)準(zhǔn)。

2.5.1.1 ConnectionFactory

鏈接工廠, 用于創(chuàng)建鏈接的工廠類型。

2.5.1.2 Connection

鏈接,用于建立訪問ActiveMQ連接的類型, 由鏈接工廠創(chuàng)建。

2.5.1.3 Session

會(huì)話, 一次持久有效、有狀態(tài)的訪問,由鏈接創(chuàng)建。

2.5.1.4 Destination & Queue & Topic

目的地, 即本次訪問ActiveMQ消息隊(duì)列的地址,由Session會(huì)話創(chuàng)建。

(1)interface Queue extends Destination

(2)Queue:隊(duì)列模型,只有一個(gè)消費(fèi)者。消息一旦被消費(fèi),默認(rèn)刪除。

(3)Topic:主題訂閱中的消息,會(huì)發(fā)送給所有的消費(fèi)者同時(shí)處理。

2.5.1.5 Message

消息,在消息傳遞過程中數(shù)據(jù)載體對(duì)象,是所有消息【文本消息TextMessage,對(duì)象消息ObjectMessage等】具體類型的頂級(jí)接口,可以通過會(huì)話創(chuàng)建或通過會(huì)話從ActiveMQ服務(wù)中獲取。

2.5.1.6 MessageProducer

消息生成者, 在一次有效會(huì)話中, 用于發(fā)送消息給ActiveMQ服務(wù)的工具,由Session會(huì)話創(chuàng)建。

2.5.1.7 MessageCustomer

消息消費(fèi)者【消息訂閱者,消息處理者】, 在一次有效會(huì)話中, 用于ActiveMQ服務(wù)中獲取消息的工具,由Session會(huì)話創(chuàng)建。

我們定義的消息生產(chǎn)者和消費(fèi)者,都是基于上面API實(shí)現(xiàn)的。

2.5.2 第一步:創(chuàng)建MyProducer類,定義sendMessage方法

|

package cn.gzsxt.mq.producer;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageProducer;

import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

public class MyProducer {

// 定義鏈接工廠

ConnectionFactory connectionFactory = null;

// 定義鏈接

Connection connection = null;

// 定義會(huì)話

Session session = null;

// 定義目的地

Destination destination = null;

// 定義消息生成者

MessageProducer producer = null;

// 定義消息

Message message = null;

public void sendToMQ(){

try{

/*

  • 創(chuàng)建鏈接工廠

  • ActiveMQConnectionFactory - 由ActiveMQ實(shí)現(xiàn)的ConnectionFactory接口實(shí)現(xiàn)類.

  • 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

  • userName - 訪問ActiveMQ服務(wù)的用戶名, 用戶名可以通過<u>jetty</u>-realm.properties配置文件配置.

  • password - 訪問ActiveMQ服務(wù)的密碼, 密碼可以通過<u>jetty</u>-realm.properties配置文件配置.

  • brokerURL - 訪問ActiveMQ服務(wù)的路徑地址. 路徑結(jié)構(gòu)為 - 協(xié)議名://主機(jī)地址:端口號(hào)

  • 此鏈接基于TCP/IP協(xié)議.

*/

connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

// 創(chuàng)建鏈接對(duì)象

connection = connectionFactory.createConnection();

// 啟動(dòng)鏈接

connection.start();

/*

  • 創(chuàng)建會(huì)話對(duì)象

  • 方法 - connection.createSession(boolean transacted, <u>int</u> acknowledgeMode);

  • transacted - 是否使用事務(wù), 可選值為true|false

  • true - 使用事務(wù), 當(dāng)設(shè)置此變量值, 則acknowledgeMode參數(shù)無效, 建議傳遞的acknowledgeMode參數(shù)值為

  • Session.SESSION_TRANSACTED

  • false - 不使用事務(wù), 設(shè)置此變量值, 則acknowledgeMode參數(shù)必須設(shè)置.

  • acknowledgeMode - 消息確認(rèn)機(jī)制, 可選值為:

  • Session.AUTO_ACKNOWLEDGE - 自動(dòng)確認(rèn)消息機(jī)制

  • Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機(jī)制

  • Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機(jī)制

*/

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建目的地, 目的地命名即隊(duì)列命名, 消息消費(fèi)者需要通過此命名訪問對(duì)應(yīng)的隊(duì)列

destination = session.createQueue("test-mq");

// 創(chuàng)建消息生成者, 創(chuàng)建的消息生成者與某目的地對(duì)應(yīng), 即方法參數(shù)目的地.

producer = session.createProducer(destination);

// 創(chuàng)建消息對(duì)象, 創(chuàng)建一個(gè)文本消息, 此消息對(duì)象中保存要傳遞的文本數(shù)據(jù).

message = session.createTextMessage("hello,activeme");

// 發(fā)送消息

producer.send(message);

System.out.println("消息發(fā)送成功!");

}catch(Exception e){

e.printStackTrace();

System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯(cuò)誤!!");

}finally{

try {

// 回收消息發(fā)送者資源

if(null != producer)

producer.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收會(huì)話資源

if(null != session)

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收鏈接資源

if(null != connection)

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

|

2.5.3 第二步:創(chuàng)建一個(gè)測(cè)試類MessageTest

--添加junit類庫,快捷鍵ctrl+1

|

package cn.gzsxt.mq.test;

import org.junit.Test;

import cn.gzsxt.mq.producer.MyProducer;

public class MessageTest {

@Test

public void sendToMQ(){

MyProducer producer = new MyProducer();

producer.sendToMQ();

}

}

|

2.5.4 第三步:測(cè)試

(1)設(shè)置防火墻,配置61616端口。注意修改之后重啟防火墻。

(2)測(cè)試結(jié)果:

--查看控制臺(tái)

|

[圖片上傳失敗...(image-6aa9d4-1564373794299)]

|

--查看ActiveMQ管理控制界面

|

[圖片上傳失敗...(image-4252b4-1564373794299)]

|

--消息發(fā)送成功!!!

2.6 第四部分:創(chuàng)建消息消費(fèi)者,消費(fèi)消息

2.6.1 第一步:創(chuàng)建MyConsumer類

|

package cn.gzsxt.mq.consumer;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.Message;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

  • @ClassName:MyConsumer

  • @Description: 消息消費(fèi)者代碼

*/

public class MyConsumer {

// 定義鏈接工廠

ConnectionFactory connectionFactory = null;

// 定義鏈接

Connection connection = null;

// 定義會(huì)話

Session session = null;

// 定義目的地

Destination destination = null;

// 定義消息消費(fèi)者

MessageConsumer consumer = null;

// 定義消息

Message message = null;

public void recieveFromMQ(){

try{

/*

  • 創(chuàng)建鏈接工廠

  • ActiveMQConnectionFactory - 由ActiveMQ實(shí)現(xiàn)的ConnectionFactory接口實(shí)現(xiàn)類.

  • 構(gòu)造方法: public ActiveMQConnectionFactory(String userName, String password, String brokerURL)

  • userName - 訪問ActiveMQ服務(wù)的用戶名, 用戶名可以通過<u>jetty</u>-realm.properties配置文件配置.

  • password - 訪問ActiveMQ服務(wù)的密碼, 密碼可以通過<u>jetty</u>-realm.properties配置文件配置.

  • brokerURL - 訪問ActiveMQ服務(wù)的路徑地址. 路徑結(jié)構(gòu)為 - 協(xié)議名://主機(jī)地址:端口號(hào)

  • 此鏈接基于TCP/IP協(xié)議.

*/

connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.23.13:61616");

// 創(chuàng)建鏈接對(duì)象

connection = connectionFactory.createConnection();

// 啟動(dòng)鏈接

connection.start();

/*

  • 創(chuàng)建會(huì)話對(duì)象

  • 方法 - connection.createSession(boolean transacted, <u>int</u> acknowledgeMode);

  • transacted - 是否使用事務(wù), 可選值為true|false

  • true - 使用事務(wù), 當(dāng)設(shè)置此變量值, 則acknowledgeMode參數(shù)無效, 建議傳遞的acknowledgeMode參數(shù)值為

  • Session.SESSION_TRANSACTED

  • false - 不使用事務(wù), 設(shè)置此變量值, 則acknowledgeMode參數(shù)必須設(shè)置.

  • acknowledgeMode - 消息確認(rèn)機(jī)制, 可選值為:

  • Session.AUTO_ACKNOWLEDGE - 自動(dòng)確認(rèn)消息機(jī)制

  • Session.CLIENT_ACKNOWLEDGE - 客戶端確認(rèn)消息機(jī)制

  • Session.DUPS_OK_ACKNOWLEDGE - 有副本的客戶端確認(rèn)消息機(jī)制

*/

session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 創(chuàng)建目的地, 目的地命名即隊(duì)列命名, 消息消費(fèi)者需要通過此命名訪問對(duì)應(yīng)的隊(duì)列

destination = session.createQueue("test-mq");

// 創(chuàng)建消息消費(fèi)者, 創(chuàng)建的消息消費(fèi)者與某目的地對(duì)應(yīng), 即方法參數(shù)目的地.

consumer = session.createConsumer(destination);

// 從ActiveMQ服務(wù)中獲取消息

message = consumer.receive();

TextMessage tMsg = (TextMessage) message;

System.out.println("從MQ中獲取的消息是:"+tMsg.getText());

}catch(Exception e){

e.printStackTrace();

System.out.println("訪問ActiveMQ服務(wù)發(fā)生錯(cuò)誤!!");

}finally{

try {

// 回收消息消費(fèi)者資源

if(null != consumer)

consumer.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收會(huì)話資源

if(null != session)

session.close();

} catch (JMSException e) {

e.printStackTrace();

}

try {

// 回收鏈接資源

if(null != connection)

connection.close();

} catch (JMSException e) {

e.printStackTrace();

}

}

}

}

|

2.6.2 第二步:修改測(cè)試類MessageTest,新增測(cè)試方法

|

@Test

public void recieveFromMQ(){

MyConsumer consumer = new MyConsumer();

consumer.recieveFromMQ();

}

|

2.6.3 第三步:測(cè)試

--查看Eclipse控制臺(tái)

--查看ActiveMQ管理控制界面

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容