1、JMS(Java Message Service,Java消息服務(wù))
1.1 定義
Java消息服務(wù)(Java Message Service,即JMS)應(yīng)用程序接口是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM) 的API, 用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無(wú)關(guān)的APl。
1.2 JMS的對(duì)象模型
| 名稱 | 描述 |
|---|---|
| ConnectionFactory | 連接工廠 |
| Connection | 連接 |
| Session | 會(huì)話 |
| Destination | 目的 |
| MessageProducer | 生產(chǎn)者 |
| MessageConsumer | 消費(fèi)者 |
| Message | 消息 |
| Broker | 消息中間件的實(shí)例(ActiveMQ) |
1.3 JMS消息模型
-
Point-to-Point (P2P) /點(diǎn)對(duì)點(diǎn)
image-20220220224711788 -
Publish/Subscribe (Pub/Sub) /主題(發(fā)布訂閱)
image-20220220230555126
1.4 JMS的消息結(jié)構(gòu)
消息頭、消息屬性、消息體
-
消息頭
image-20220220230826606 -
消息屬性:可以理解為消息的附加消息頭,屬性名可以自定義
image-20220220230935372 -
消息體
image-20220220231010881
2、ActiveMQ概念
2.1 定義
ActiveMQ是Apache出品,最流行的,能力強(qiáng)勁的開(kāi)源消息總線。
ActiveMQ是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的JMS Provider實(shí)現(xiàn),盡管JMS規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是JMS在當(dāng)今的J2EE應(yīng)用中間仍然扮演著特殊的地位。
2.2 特性
- 支持多種編程語(yǔ)言
- 支持多種傳輸協(xié)議
- 有多種持久化方式
2.3 ActiveMQ支持哪些協(xié)議
- ActiveMQ支持多種協(xié)議傳輸和傳輸方式,允許客戶端使用多種協(xié)議連接。
- ActiveMQ支持的協(xié)議: AUTO、 OpenWire、 AMQP、Stomp、 MQTT等。
- ActiveMQ支持的基礎(chǔ)傳輸方式: VM、 TCP、SSL、 UDP 、Peer、 Multicast、 HTTP(S)等,以及更高級(jí)的Failover、Fanout、 Discovery、 ZeroConf方式。
2.4 OpenWire協(xié)議
2.4.1 OpenWire協(xié)議是什么
OpenWire是Apache的一種跨語(yǔ)言的協(xié)議,允許從不同的語(yǔ)言和平臺(tái)訪問(wèn)ActiveMQ,是ActiveMQ 4.x以后的版本默認(rèn)的傳輸協(xié)議。
2.4.2 OpenWire協(xié)議如何使用
OpenWire 支持TCP、SSL、 NIO、UDP、VM等傳輸方式,直接配置這些連接,就是使用的OpenWire協(xié)議,OpenWire有 自己的配置參數(shù),客戶端和服務(wù)器端配置的參數(shù)名都通過(guò)綴“wireFormat.” 表示。
示例

OpenWire的配置參數(shù)說(shuō)明
| 屬性 | 默認(rèn)值 | 描述 |
|---|---|---|
| stackTraceEnabled | true | 是否應(yīng)該把已經(jīng)發(fā)生并且跟蹤到的堆棧異常,通過(guò)Broker發(fā)送到客戶端 |
| tcpNoDelayEnabled | true | socket的NoDelay參數(shù) |
| cacheEnabled | true | 如果不斷重復(fù)的值進(jìn)行緩存,以便少編組(馬上要進(jìn)行的發(fā)送)發(fā)生 |
| tightEncodingEnabled | true | 根據(jù)CPU使用情況,自動(dòng)調(diào)整傳輸內(nèi)容大小(壓縮比例) |
| prefixPacketSize | true | 在每個(gè)包被編組前(馬上要被發(fā)送),每個(gè)包的大小是否應(yīng)該作為前綴連接的最大空閑時(shí)間,以毫秒為單位。 |
| maxInactivityDuration | 30000 | broker服務(wù)會(huì)根據(jù)配置關(guān)閉超時(shí)的連接。同時(shí)也可以通過(guò)心跳機(jī)制來(lái)保持連接。值<=0則禁用活動(dòng)連接的監(jiān)測(cè)。 |
| maxlnactivityDurationInitalDelay | 10000 | 連接建立之后,多久開(kāi)始進(jìn)行超時(shí)檢測(cè) |
| cacheSize | 1024 | 如果能被緩存,那么這個(gè)規(guī)定了緩存的最大數(shù)量。此屬性中在ActiveMQ的4.1中開(kāi)始添加使用可發(fā)送最大幀大小 |
| maxFrameSize | MAX_ LONG | 可以幫助防止OOM DOS攻擊 |
2.5 為什么使用MQTT協(xié)議
MQTT的結(jié)構(gòu)簡(jiǎn)單,相對(duì)于其它消息協(xié)議,它更加輕量級(jí)。適合在計(jì)算能力有限、低帶寬、不可靠的網(wǎng)絡(luò)環(huán)境使用。
2.5.1 MQTT的發(fā)布訂閱模型

2.5.2 MQTT服務(wù)質(zhì)量
服務(wù)質(zhì)量(QoS) 級(jí)別是一種關(guān)于發(fā)送者和接收者之間信息投遞的保證協(xié)議。MQTT中有三種QoS級(jí)別:
- 至多一次(0)
- 至少一次(1)
- 只有一次(2)
QoS是MQTT的一個(gè)主要功能,它使得在不可靠的網(wǎng)絡(luò)下進(jìn)行通信變得更為簡(jiǎn)單,因?yàn)榧幢闶窃诜浅2豢煽康木W(wǎng)絡(luò)下,協(xié)議也可以掌控是否需要重發(fā)消息并保證消息到達(dá)。它也能幫助客戶端根據(jù)網(wǎng)絡(luò)環(huán)境和程序邏輯來(lái)自由選擇QoS.
2.5.3 ActiveMQ中如何使用MQTT協(xié)議
ActiveMQ 服務(wù)器端配置
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize= 104857600"/>
</transportConnectors>
MQTT配置參數(shù)說(shuō)明
| 屬性 | 默認(rèn)值 | 描述 |
|---|---|---|
| maxFrameSize | 268435456 | (v5.12.0)可以發(fā)送的最大幀大小。協(xié)議限制為256MB,其值不能設(shè)置得更高??梢詭椭乐筄OM DOS攻擊 |
配置示例
<transportConnector name="mqtt" uri="mtt://localhost:1883?wireFormat.maxFrameSize=100000"/>
MQTT使用NIO
<transportConnector name="mgtt+nio" uri="mtt+nio://localhost:1883"/>
MQTT使用NIO + SSL
<transportConnector name="mqtt+nio" uri="mqtt+niossl://localhost:1883"/>
2.5.4 Spring使用MQTT
Spring Integration提供了MQTT協(xié)議的支持,通過(guò)Maven添加依賴即可使用。
<dependency>
<groupld>org.springframework.integration</groupld>
<artifactld>spring-integration-mqtt</artifactld>
<version>5.1.1.RELEASE</version>
</dependency>
2.6 AUTO協(xié)議是什么
AUTO自動(dòng)檢測(cè)協(xié)議從ActiveMQ 5.13.0開(kāi)始,ActiveMQ 開(kāi)始支持協(xié)議格式檢測(cè),可以自動(dòng)檢測(cè)OpenWire、STOMP、 AMQP和MQTT。允許這4種類型的客戶端共享一個(gè)傳輸
AUTO使用TCP
<transportConnector name="auto" uri="auto://localhost:5671"/>
AUTO使用SSL
<transportConnector name="auto+ssl" uri="autssl://localhost:5671"/>
AUTO使用NIO
<transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/>
AUTO使用NIO+SSL
<transportConnector name="auto+nio+ssl" uri="auto+nioss://localhost:5671"/>
3、ActiveMQ高可用集群方案
3.1 ActiveMQ有哪些集群部署方式
| Master-Slave部署方式 | Broker-Cluster部署方式 | Master-Slave與Broker-Cluster相結(jié)合的部署方式 |
|---|
3.2 Master-Slave部署方式
共享同一個(gè)文件系統(tǒng)

共享同一個(gè)數(shù)據(jù)庫(kù)

3.3 Broker-Cluster部署方式

3.4 Master-Slave與Broker-Cluster相結(jié)合的部署方式

4、ActiveMQ持久化機(jī)制
4.1 Queue類型的持久化機(jī)制

4.2 Topic類型的持久化機(jī)制

4.3 存儲(chǔ)方式
4.3.1 JDBC方式
將消息存儲(chǔ)到數(shù)據(jù)庫(kù)中,例如: Mysql、 SQL Server、Oracle、 DB2等
| 優(yōu)點(diǎn) | 缺點(diǎn) |
|---|---|
| 方便管理 | 性能低 |
| 可以支持強(qiáng)一致性 | / |
4.3.2 AMQ方式
基于文件的存儲(chǔ)方式,它具有寫(xiě)入速度快和容易恢復(fù)的特點(diǎn),但是由于其重建索引時(shí)間過(guò)長(zhǎng),而且索引文件占用磁盤(pán)空間過(guò)大,所以已經(jīng)不推薦使用。
| 優(yōu)點(diǎn) | 缺點(diǎn) |
|---|---|
| 性能高于JDBC | 索引占用磁盤(pán)空間量大 |
| / | 重建索引速度非常慢 |
4.3.3 LevelDB方式
LevelDB是Google開(kāi)發(fā)的一套用于持久化數(shù)據(jù)的高性能類庫(kù).LevelDB并不是一-種服務(wù),用戶需要自行實(shí)現(xiàn)Server。是單進(jìn)程的服務(wù),能夠處理十億級(jí)別規(guī)模Key-Value型數(shù)據(jù),占用內(nèi)存小。
特點(diǎn)
- 基于K-V存儲(chǔ)
- Key值有序存儲(chǔ)
- 操作接口簡(jiǎn)單
- 支持?jǐn)?shù)據(jù)快照
- 支持?jǐn)?shù)據(jù)壓縮
L evelDB的結(jié)構(gòu)

ActiveMQ配置LevelDB:修改配置文件${ACTIVEMQ_ HOME}/conf/activemq.xml
<persistenceAdapter>
<levelDB directory="${activemq.data}/activemq-data"/>
</persistenceAdapter>
5、ActiveMQ事務(wù)實(shí)現(xiàn)機(jī)制
ActiveMQ事務(wù)實(shí)現(xiàn)的是最終一致性
生產(chǎn)者端實(shí)現(xiàn)機(jī)制如下:

消費(fèi)者端實(shí)現(xiàn)機(jī)制如下:





