ActiveMQ(windows)簡(jiǎn)單實(shí)例

一、消息隊(duì)列(MQ)概述

消息隊(duì)列(Message Queue),是分布式系統(tǒng)中重要的組件,其通用的使用場(chǎng)景可以簡(jiǎn)單地描述為:

當(dāng)不需要立即獲得結(jié)果,但是并發(fā)量又需要進(jìn)行控制的時(shí)候,差不多就是需要使用消息隊(duì)列的時(shí)候。

消息隊(duì)列主要解決了應(yīng)用耦合、異步處理、流量削鋒等問(wèn)題。

當(dāng)前使用較多的消息隊(duì)列有RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,而部分?jǐn)?shù)據(jù)庫(kù)如Redis、Mysql以及phxsql也可實(shí)現(xiàn)消息隊(duì)列的功能。

二 、ActiveMQ

有那么多得消息隊(duì)列,各有各的有優(yōu)缺點(diǎn),今天主要來(lái)聊聊ActiveMQ

ActiveMQ是由Apache出品,ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。它非??焖伲С侄喾N語(yǔ)言的客戶端和協(xié)議,而且可以非常容易的嵌入到企業(yè)的應(yīng)用環(huán)境中,并有許多高級(jí)功能。

主要特性:

服從 JMS 規(guī)范:JMS 規(guī)范提供了良好的標(biāo)準(zhǔn)和保證,包括:同步或異步的消息分發(fā),一次和僅一次的消息分發(fā),消息接收和訂閱等等。遵從 JMS 規(guī)范的好處在于,不論使用什么 JMS 實(shí)現(xiàn)提供者,這些基礎(chǔ)特性都是可用的;

連接性:ActiveMQ 提供了廣泛的連接選項(xiàng),支持的協(xié)議有:HTTP/S,IP 多播,SSL,STOMP,TCP,UDP,XMPP等等。對(duì)眾多協(xié)議的支持讓 ActiveMQ 擁有了很好的靈活性。

支持的協(xié)議種類(lèi)多:OpenWire、STOMP、REST、XMPP、AMQP ;

持久化插件和安全插件:ActiveMQ 提供了多種持久化選擇。而且,ActiveMQ 的安全性也可以完全依據(jù)用戶需求進(jìn)行自定義鑒權(quán)和授權(quán);

支持的客戶端語(yǔ)言種類(lèi)多:除了 Java 之外,還有:C/C++,.NET,Perl,PHP,Python,Ruby;

代理集群:多個(gè) ActiveMQ 代理可以組成一個(gè)集群來(lái)提供服務(wù);

異常簡(jiǎn)單的管理:ActiveMQ 是以開(kāi)發(fā)者思維被設(shè)計(jì)的。所以,它并不需要專(zhuān)門(mén)的管理員,因?yàn)樗峁┝撕?jiǎn)單又使用的管理特性。有很多中方法可以監(jiān)控 ActiveMQ 不同層面的數(shù)據(jù),包括使用在 JConsole 或者 ActiveMQ 的Web Console 中使用 JMX,通過(guò)處理 JMX 的告警消息,通過(guò)使用命令行腳本,甚至可以通過(guò)監(jiān)控各種類(lèi)型的日志。

使用ActiveMQ需要:

Java JDK

ActiveMQ安裝包

ActiveMQ可以運(yùn)行在Java語(yǔ)言所支持的平臺(tái)之上。

優(yōu)點(diǎn):

跨平臺(tái)(JAVA編寫(xiě)與平臺(tái)無(wú)關(guān)有,ActiveMQ幾乎可以運(yùn)行在任何的JVM上)

可以用JDBC:可以將數(shù)據(jù)持久化到數(shù)據(jù)庫(kù)。雖然使用JDBC會(huì)降低ActiveMQ的性能,但是數(shù)據(jù)庫(kù)一直都是開(kāi)發(fā)人員最熟悉的存儲(chǔ)介質(zhì)。將消息存到數(shù)據(jù)庫(kù),看得見(jiàn)摸得著。而且公司有專(zhuān)門(mén)的DBA去對(duì)數(shù)據(jù)庫(kù)進(jìn)行調(diào)優(yōu),主從分離;

支持JMS :支持JMS的統(tǒng)一接口;

支持自動(dòng)重連;

有安全機(jī)制:支持基于shiro,jaas等多種安全配置機(jī)制,可以對(duì)Queue/Topic進(jìn)行認(rèn)證和授權(quán)。

監(jiān)控完善:擁有完善的監(jiān)控,包括Web Console,JMX,Shell命令行,Jolokia的REST API;

界面友善:提供的Web Console可以滿足大部分情況,還有很多第三方的組件可以使用,如hawtio;

缺點(diǎn):

社區(qū)活躍度不及RabbitMQ高;

根據(jù)其他用戶反饋,會(huì)出莫名其妙的問(wèn)題,會(huì)丟失消息;

目前重心放到activemq6.0產(chǎn)品-apollo,對(duì)5.x的維護(hù)較少;

不適合用于上千個(gè)隊(duì)列的應(yīng)用場(chǎng)景;

三、具體實(shí)例

首先我們?nèi)ス倬W(wǎng)下載MQ,官網(wǎng)地址:http://activemq.apache.org/,我目前下載的是5.8.0版本的,下載好后直接雙擊activemq.bat啟動(dòng)


啟動(dòng)成功后,輸入默認(rèn)的訪問(wèn)地址:http://localhost:8161/admin/index.jsp

用戶名:admin? 密碼:admin

用戶名密碼設(shè)置具體配置文件在conf下jetty-realm.properties里面,jetty.xml文件修改端口號(hào)。



?然后點(diǎn)擊隊(duì)列(Queues),輸入隊(duì)列名稱(chēng)(Queue Name)FirstQueue,然后點(diǎn)創(chuàng)建(Create)

下一步我們創(chuàng)建一個(gè)maven 項(xiàng)目,在pom.xml引入jar

<dependency>?

??<groupId>org.apache.activemq</groupId>

<artifactId>activemq-all</artifactId>

? <version>5.8.0</version>

</dependency>

新建文件 Procuder

package Mq.ActivityMq;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.DeliveryMode;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

*

* @author 生產(chǎn)者

*

*/

public class Procuder {

? ? //默認(rèn)連接用戶名

? ? private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

? ? //默認(rèn)連接密碼

? ? private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

? ? //默認(rèn)連接地址

? ? private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

? ? public static void main(String[] args) {

? ? ? ? //連接工廠

? ? ? ? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

? ? ? ? try {

? ? ? ? ? ? //連接

? ? ? ? ? ? Connection connection = connectionFactory.createConnection();

? ? ? ? ? ? //啟動(dòng)連接

? ? ? ? ? ? connection.start();

? ? ? ? ? ? //創(chuàng)建session

? ? ? ? ? ? Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

? ? ? ? ? ? //消息目的地

? ? ? ? ? ? Destination destination = session.createQueue("FirstQueue");

? ? ? ? ? ? //消息生產(chǎn)者

? ? ? ? ? ? MessageProducer producer = session.createProducer(destination);

? ? ? ? ? ? //設(shè)置不持久化,此處學(xué)習(xí),實(shí)際根據(jù)項(xiàng)目決定

? ? ? ? ? ? producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

? ? ? ? ? ? //發(fā)送消息

? ? ? ? ? ? for (int i = 0; i < 5; i++) {

? ? ? ? ? ? ? ? //創(chuàng)建一條文本消息

? ? ? ? ? ? ? ? TextMessage message = session.createTextMessage("你好ActiveMQ: 這是第 " + i + " 條消息");

? ? ? ? ? ? ? ? //生產(chǎn)者發(fā)送消息

? ? ? ? ? ? ? ? producer.send(message);

? ? ? ? ? ? }

? ? ? ? ? ? session.commit();

? ? ? ? ? ? session.close();

? ? ? ? ? ? connection.close();

? ? ? ? } catch (JMSException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

}


新建文件Consumer

package Mq.ActivityMq;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.Destination;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

*

* @author 消費(fèi)者

*

*/

public class Consumer {

? ? //默認(rèn)連接用戶名

? ? private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

? ? //默認(rèn)連接密碼

? ? private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

? ? //默認(rèn)連接地址

? ? private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

? ? public static void main(String[] args) {

? ? ? ? //連接工廠

? ? ? ? ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);

? ? ? ? try {

? ? ? ? ? ? //連接

? ? ? ? ? ? Connection connection = connectionFactory.createConnection();

? ? ? ? ? ? //啟動(dòng)連接

? ? ? ? ? ? connection.start();

? ? ? ? ? ? //創(chuàng)建session

? ? ? ? ? ? Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

? ? ? ? ? ? //消息目的地

? ? ? ? ? ? Destination destination = session.createQueue("FirstQueue");

? ? ? ? ? ? //消息消費(fèi)者

? ? ? ? ? ? MessageConsumer consumer = session.createConsumer(destination);

? ? ? ? ? ? while (true) {

? ? ? ? ? ? ? ? TextMessage message = (TextMessage) consumer.receive();

? ? ? ? ? ? ? ? if (message != null) {

? ? ? ? ? ? ? ? ? ? System.out.println("接收到消息: " + message.getText());

? ? ? ? ? ? ? ? } else {

? ? ? ? ? ? ? ? ? ? break;

? ? ? ? ? ? ? ? }

? ? ? ? ? ? }

? ? ? ? ? ? session.close();

? ? ? ? ? ? connection.close();

? ? ? ? } catch (JMSException e) {

? ? ? ? ? ? e.printStackTrace();

? ? ? ? }

? ? }

}

測(cè)試結(jié)果

先運(yùn)行Producer(生產(chǎn)者),再運(yùn)行Consumer(消費(fèi)者)


這是我們查看MQ視圖


我們看到有0條等待消費(fèi)的消息,有1條消費(fèi)者,生產(chǎn)者發(fā)布了5條消息,消費(fèi)者消費(fèi)了5條消息。名詞解釋如下:


Messages Enqueued:表示生產(chǎn)了多少條消息,記做P

Messages Dequeued:表示消費(fèi)了多少條消息,記做C

Number Of Consumers:表示在該隊(duì)列上還有多少消費(fèi)者在等待接受消息

Number Of Pending Messages:表示還有多少條消息沒(méi)有被消費(fèi),實(shí)際上是表示消息的積壓程度,就是P-C


四、總結(jié)

如上所示只是ActiveMQ簡(jiǎn)單實(shí)例,實(shí)際的業(yè)務(wù)場(chǎng)景需要配置更多的參數(shù),比如消費(fèi)者消費(fèi)完畢后要通知生成者對(duì)消息進(jìn)行銷(xiāo)毀.....等等,掌握了基本的原理后,其他都是結(jié)合具體業(yè)務(wù)場(chǎng)景水到渠成的事情。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容