
Apache RocketMQ之JMS基本概念及使用
Apache RocketMQ 系列:
Apache RocketMQ之JMS基本概念及使用:http://www.itdecent.cn/p/d2e3fd77c4f4
Apache RocketMQ 基礎概念及架構解析:http://www.itdecent.cn/p/95ab928960b3
Apache RocketMQ 的基礎特性介紹:http://www.itdecent.cn/p/570680b32590
Apache RocketMQ 集群搭建(兩主兩從):http://www.itdecent.cn/p/b090138cf52c
Apache RocketMQ 刷盤策略與復制策略: http://www.itdecent.cn/p/d66b381428bb
優(yōu)秀博客:
https://blog.csdn.net/canot/article/details/53676350
https://blog.csdn.net/caidaoqq/article/details/45938919
https://blog.csdn.net/u013123635/article/details/78362360
介紹流程:

RocketMQ 是什么?
是一個隊列模型的消息中間件,具有高性能、高可靠、高實時、分布式特點。
RocketMQ是一個消息中間件,那什么是消息中間件?
關注于數(shù)據(jù)的發(fā)送與接收,利用高效可靠的異步消息傳遞機制集成分布式系統(tǒng)。
對于消息中間件,常見的角色大致也就有Producer(生產者)、Consumer(消費者)
消息+中間件
- 消息:消息即為數(shù)據(jù),數(shù)據(jù)就會有規(guī)劃,有長度,有大小。
- 中間件:為我們提供發(fā)送消息的程序或者服務.
消息:
JMS(Java Message Service)
Java消息服務應用程序接口,是一個Java平臺中關于面向消息中間件(Message Oriented Middleware)的API。
用于在兩個應用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。
Java消息服務是一個與具體平臺無關的API,絕大多數(shù)MOM提供商都對JMS提供支持。
JMS允許應用程序組件基于JavaEE平臺創(chuàng)建、發(fā)送、接收和讀取消息。它使分布式通信耦合度更低,消息服務更加可靠以及異步性。
在提到JMS時,我們通常會說到一些術語,解釋如下:
消息中間件(JMS Provider) : 指提供了對JMS協(xié)議的第三方組件,比如RocketMQ就是一個消息中間件,另外比較知名的還有KafKa、 Rabbit MQ、ActiveMQ等。
消息(Message): 通信內容的載體,其結構主要分為消息頭,屬性和消息體,并且根據(jù)存儲結構的不同分為好幾種,后面會詳細提到。
消息模式:分為點對點(Point to Point,即P2P)和發(fā)布/訂閱(Pub/Sub),對應的數(shù)據(jù)結構分別是隊列(Queue)和主題(Topic)
消息生產者:產生消息的一方,在P2P模式下,指消息發(fā)送者(Sender),在P/S模式下指消息發(fā)布者(Publisher)
消息消費者:接收消息的一方,對應于兩種模式分別是消息接收者(Receiver)和消息訂閱者(Subscriber)
中間件:
- 為我們提供發(fā)送消息的程序或者服務,
- 目前主流的有 rocketMq 、kafka、rabbitMq、activemq等。
JMS基本概念及原理詳解
基本概念:
- JMS的客戶端之間可以通過JMS服務進行異步的消息傳輸。
體系架構
JMS由以下元素組成。
| 元素 | 描述 |
|---|---|
| JMS提供者 | 連接面向消息中間件的,JMS接口的一個實現(xiàn)。提供者可以是Java平臺的JMS實現(xiàn),也可以是非Java平臺的面向消息中間件的適配器。 |
| JMS客戶 | 生產或消費消息的基于Java的應用程序或對象。 |
| JMS生產者 | 創(chuàng)建并發(fā)送消息的JMS客戶。 |
| JMS消費者 | 接收消息的JMS客戶。 |
| JMS消息 | 包括可以在JMS客戶之間傳遞的數(shù)據(jù)的對象。 |
| JMS隊列 | 一個容納那些被發(fā)送的等待閱讀的消息的區(qū)域。這些消息將按照順序發(fā)送,一旦一個消息被閱讀,該消息將被從隊列中移走。 |
| JMS主題 | 一種支持發(fā)送消息給多個訂閱者的機制。 |
JMS 消息(Message)
每個在JMS規(guī)范中概念都是圍繞處理一個JMS消息,因為它包含了業(yè)務數(shù)據(jù)和事件是怎么被傳輸?shù)摹?/p>
JMS消息允許任何內容作為其一部分發(fā)送消息,包括文本和二進制數(shù)據(jù)以及標題中的信息。
JMS消息包含三部分,包括消息頭、消息的屬性和消息載體(類似于我們常用的大部分協(xié)議,如http等)。
消息頭提供消息的接受端和發(fā)送端兩個客戶端和JMS規(guī)范的元數(shù)據(jù)。
消息載體是消息的實際內容,并且可以保存文本和承載了各種各樣數(shù)據(jù)的二進制數(shù)據(jù)(如圖片,流等等)。
JMS消息設計宗旨是在易于理解和可擴展,所有的復雜的內容包含在JMS消息頭中。

如之前所說的,JMS消息的復雜性在消息頭中,消息頭有兩種類型的報文頭,它們具有相同的邏輯概念,但是語義上不同的。
由JMS規(guī)范提供的,客戶端調用send()方法時,自動設置的消息頭。
開發(fā)者分配的消息頭。
標準的JMS消息頭與JMS API提供的方法一起工作。大多數(shù)頭信息是被自動分配的。
接下來描述每個頭的含義,以及如何被分配到消息中的。
客戶端調用send()方法時,自動設置的消息頭。
JMSDestination
JMS發(fā)送消息的目的地。這對于使用來自多個目的地的消息的客戶端很有價值。
使用一個Topic或Queue對象來標識目的地,二者都是Destination類型
相關方法:public abstract Destination getJMSDestination()
JMSDeliveryMode
JMS傳送模式。支持兩種模式:持久模式和非持久模式。默認的傳遞模式是持久。
一條持久性消息應該被傳送“一次而且僅僅一次”,這就意味著如果JMS提供者出現(xiàn)故障,該消息并不會丟失;它會在服務器恢復正常之后再次傳送。一條非持久性消息最多只會傳送一次,這意味著如果JMS提供者出現(xiàn)故障,該消息可能會永久丟失。在持久性和非持久性者兩種傳送模式中,消息服務器都不會講一條消息向同一消費者發(fā)送一次以上,不過,這在技術上最有可能實現(xiàn)的。
Persistent:持久模式,通知消息提供者去持久化消息。即使消息提供者掛掉,消息也不會丟失。在這個模式下,JMS提供者必須對消息進行持久化并且只進行一次。如果JMS提供者掛了,此時該JMS提供者的消息并不會丟失,但消息只能被消費者使用一次。
由于持久化消息提供了額外的可靠性保護,因此也需要更多的空間和性能消耗。Nonpersistent:非持久模式, 使得JMS提供者不需求持久化消息。JMS提供者必須最多傳遞一次非持久消息。如果JMS提供者掛了,此時該JMS提供者的消息會丟失,但不會出現(xiàn)第二次。非持久消息會提供更高的性能和較低的可靠性。
發(fā)送模式在消息發(fā)送者上設置,并應用于從發(fā)送的所有消息。 但是也可以針對單個消息覆蓋發(fā)送模式。
相關方法:public abstract int getJMSDeliveryMode()
JMSMessageID
JMS消息ID。它是一個String類型的值,唯一標識了一條消息,并且必須以ID開頭。
JMSMessageID對于JMS中消費者應用程序的歷史倉庫來說非常有用,它是倉庫中的消息需要的唯一索引。
因為消息ID可能導致JMS提供程序產生一些開銷,消息提供者可以建議JMS提供程序,JMS應用程序不依賴于這個消息頭的值。
通過 MessageProducer.setDisableMessageID()方法設置。
如果JMS提供程序同意該建議,則消息標識必須設置為null。但JMS提供程序可以忽略此調用并始終分配消息ID。
相關方法:public abstract String getJMSMessageID()
JMSTimestamp
JMS時間戳。它包含的是JMS提供者接受消息的時間,而不是該消息實際傳送的時間。這條消息頭用于確認發(fā)送消息和它被消費者實際接受的時間間隔。
此標頭的值使用標準Java millis時間值。
與JMSMessageID頭類似,JMS提供者建議JMS生產者不設置JMSTimestamp頭
通過MessageProducer.setDisableMessageTimestamp()方法設置,如果JMS生產者接受此建議,則它則將JMSTimestamp設置為零。
相關方法:public abstract long getJMSTimestamp()
JMSExpiration
JMS消息的超時時間。這個頭信息被用來阻止過期消息的傳遞。對于那些數(shù)據(jù)僅在某一個時間段內有效的消息來說,非常有用的。
消息的超時值可以使用MessageProducer.setTimeToLive()方法設置該生產者發(fā)送的所有消息的生存時間,或使用一個MessageProducer.send()方法來設置單個消息的超時值。調用這兩個方法都會設置時間,以毫秒為單位。
通過將生存時間添加到JMSExpiration消息頭中來計算超時時間。默認情況下,超時時間為零,意味著消息不會過期。
如果未指定超時時間,則使用默認值并且消息不會過期。如果超時時間明確指定為零,那么同樣的消息不會過期。
此消息頭對于時間敏感的消息很有用。但要注意,JMS提供者程序不應傳遞已過期的消息和JMS客戶端應該被寫入以便不處理已經過期的消息。
相關方法:public abstract long getJMSExpiration()
JMSRedelivered
JMS重發(fā)。表示該消息將被重新傳送給消費者。如果該消息被重新傳送,JMSRedelivered消息頭就為true,否則為false。
相關方法:public abstract boolean getJMSRedelivered()
JMSPriority
- JMS優(yōu)先級。在傳送一條消息時,消息生產者能夠為該消息分配一個優(yōu)先級,這個頭是也設置在消息提供者者。
一旦在生產者上設置了優(yōu)先級,適用于從該生產者發(fā)送的所有消息,也可以對單個消息進行單獨設置。
JMS定義了10個級別的消息優(yōu)先級,范圍從 0(最低)到9(最高):
優(yōu)先級0-4 - 這些優(yōu)先級是普通優(yōu)先級。
優(yōu)先級5-9 - 這些優(yōu)先級是加急優(yōu)先級。
JMS提供者不需要實現(xiàn)消息排序,盡管大部分情況是需要排序。
他們應該簡單地嘗試提供更高優(yōu)先級的消息在低優(yōu)先級消息之前。
相關方法:public abstract int getJMSPriority()
開發(fā)者分配的消息頭:
JMSReplyTo
JMS響應。一個JMS消息生產者可能會要求消費者對一條消息作出應答,JMSReplyTo消息頭包含了一個javax.jms.Destination,表明JMS消費者應該應答的地址。
這個信息頭通常用于消息的請求/回復模式。
已發(fā)送消息使用此標題通常希望得到接受者的響應,它是可選的頭信息。
客戶端必須做出是否對發(fā)送者的響應。
相關方法:public abstract Destination getJMSReplyTo()。
JMSCorrelationID
- JMS關聯(lián)ID。提供了一個消息頭,用于將當前的消息和先前的某些消息或應用程序特定的ID關聯(lián)起來,這個消息頭通常用于將響應消息與請求消息。
JMSCorrelationID的值可以是以下之一:
特定于提供者的消息ID
應用程序特定的字符串
提供程序自身的byte[]值
提供者的消息,以 ID: 為前綴開頭,而應用程序特定的消息不能以 ID: 前綴開頭。
如果是JMS提供者支持 native correlation ID的概念,JMS客戶端可能需要分配一個特定的JMSCorrelationID值,以匹配非JMS客戶端所期望的值,但是這不是必須的。
相關方法:public abstract String getJMSCorrelationID()。
JMSType
JMS類型。用于語義標識消息類型,是由JMS客戶端設置的一個可選消息頭。它的主要作用是標示消息結構和有效負載的類型。
這個消息頭并未指明正被發(fā)送的消息類型,而是JMS提供者使用的內部消息倉庫的一個條目。。
在使用請求/應答場景時,通過這條消息頭屬性可以進一步實現(xiàn)消息生產者和消息消費者之間的去耦。
這個頭信息只有很少廠商使用并且和消息承載的Java類型無關。
相關方法:public abstract String getJMSType()。
消息屬性
消息屬性就像可以分配一條消息的附加消息頭一樣。它們允許開發(fā)者添加有關消息的不透明附加消息。它們還用于暴露消息選擇器在消息過濾時使用的數(shù)據(jù)。
message接口為讀取和寫入屬性提供了若干個取值函數(shù)和賦值函數(shù)方法。
包括布爾值,字節(jié),短整型,長整型,浮點型,雙精度型,以及String對象類型的方法。
下面方法取自Message接口。
public interface Message {
...
boolean getBooleanProperty(String name) throws JMSException;
byte getByteProperty(String name) throws JMSException;
short getShortProperty(String name) throws JMSException;
int getIntProperty(String name) throws JMSException;
long getLongProperty(String name) throws JMSException;
float getFloatProperty(String name) throws JMSException;
double getDoubleProperty(String name) throws JMSException;
String getStringProperty(String name) throws JMSException;
Object getObjectProperty(String name) throws JMSException;
...
Enumeration getPropertyNames() throws JMSException;
boolean propertyExists(String name) throws JMSException;
...
void setBooleanProperty(String name, boolean value) throws JMSException;
void setByteProperty(String name, byte value) throws JMSException;
void setShortProperty(String name, short value) throws JMSException;
void setIntProperty(String name, int value) throws JMSException;
void setLongProperty(String name, long value) throws JMSException;
void setFloatProperty(String name, float value) throws JMSException;
void setDoubleProperty(String name, double value) throws JMSException;
void setStringProperty(String name, String value) throws JMSException;
void setObjectProperty(String name, Object value) throws JMSException;
...
}
- 還要注意用于處理消息上的泛型屬性的兩個方便方法:
- getPropertyNames() 方法,返回給定消息上所有屬性的枚舉,以便輕松地遍歷所有屬性。
- propertyExists() 方法用于測試消息上是否存在給定屬性。
- 注意,特定于JMS的頭部不被認為是通用屬性,也不包含在getPropertyNames()方法返回的枚舉中。
有三種類型的屬性:自定義屬性、JMS定義的屬性和特定于提供者的屬性。
自定義屬性:
- 自定義屬性是任意的由JMS應用程序定義。JMS應用程序的開發(fā)人員可以通過使用前一部分
- getBooleanProperty()/setBooleanProperty()、
- getStringProperty()/ setStringProperty()
- 所示的泛型方法,自由定義任何必需的Java屬性。
例如:
TextMessage message = pubSession .createTextMessage();
message.setText(userName +":" +text );
message.setStringProperty("username" , userName );
publisher.publish(message );
JMS定義的屬性
- JMS定義的屬性具有和應用程序屬性相同的特性,除了前者大多數(shù)在消息發(fā)送時由JMS提供者來設置之外JMS定義的屬性可以作為可選的JMS消息頭;對于某些另有聲明的例外。各廠商可以分別選擇不支持、部分支持或全部支持。下面是JMS定義的9個屬性清單:
| 屬性 | 描述 |
|---|---|
| JMSXAppID | 標識發(fā)送消息的應用程序 |
| JMSXConsumerTXID | 使用此消息的事務的事務標識符 |
| JMSXDeliveryCount | 消息傳遞嘗試的數(shù)量 |
| JMSXGroupID | 此消息是其一部分的消息組 |
| JMSXGroupSeq | 組中此消息的序列號 |
| JMSXProducerTXID | 生成此消息的事務的事務標識符 |
| JMSXRcvTimestamp | JMS提供者向消費者傳遞消息的時間 |
| JMSXState | 用于定義特定于提供程序的狀態(tài) |
| JMSXUserID | 標識發(fā)送消息的用戶 |
- 規(guī)范為使用這些屬性提供的唯一建議 是JMSXGroupID 和 JMSXGroupSeq 屬性,并且當按特定順序對消息或消息分組時,客戶端應該使用這些屬性。
特定于提供者的屬性
- 每個JMS提供者都可以定義一組私有屬性,這些屬性可以由客戶端或提供者自動設置。
- 提供者特定的屬性必須以前綴JMS開頭,后面緊接著是屬性名稱。
- 提供者特定的屬性,其作用就是支持廠商的私有特性。
既然已經討論了消息的JMS頭和屬性,那么它們究竟用于什么呢?
- 在篩選訂閱目的地的客戶端接收的消息時,報頭和屬性非常重要。
消息體(Message Body)
JMS為消息體定義了六種類型載體,通過這個類型,你可以發(fā)送各種各樣的數(shù)據(jù)。
| 類型 | 描述 |
|---|---|
| Message | 最基礎的消息體,沒有數(shù)據(jù)載體。僅僅包含了消息體和屬性,一般用做簡單的時間通知。 |
| TextMessage | 文本消息 字符串數(shù)據(jù)載體。一般用來發(fā)送簡單的文本,XML數(shù)據(jù)。 |
| MapMessage | key-value鍵值隊作為數(shù)據(jù)載體。key一般使用字符串,value可以為Java原始類型。 |
| BytesMessage | 使用一個二進制數(shù)據(jù)來做數(shù)據(jù)載體。 |
| StreamMessage | Java原始類型的流數(shù)據(jù) |
| ObjectMessage | 序列化后的完整Java類。通過使用與復雜Java類型。也支持集合 |
JMS消息模型 (即點對點和發(fā)布訂閱模型)
- Point-to-Point(P2P)

- Publish/Subscribe(Pub/Sub)

JMS應用程序接口
| 接口 | 描述 |
|---|---|
| ConnectionFactory 接口(連接工廠) | 創(chuàng)建Connection對象的工廠,根據(jù)消息類型的不同,用戶將使用隊列連接工廠,或者主題連接工廠分,別有QueueConnectionFactory和TopicConnectionFactory兩種??梢酝ㄟ^JNDI來查找ConnectionFactory對象。 |
| Destination 接口(目標) | Destination是一個包裝了消息目標標識符的被管對象,消息目標是指消息發(fā)布和接收的地點,或者是隊列,或者是主題。是消息生產者的消息發(fā)送目標或者說消息消費者的消息來源。對于消息生產者來說,它的Destination是某個隊列(Queue)或某個主題(Topic);對于消息消費者來說,它的Destination也是某個隊列或主題(即消息來源)。所以,Destination實際上就是兩種類型的對象:Queue、Topic可以通過JNDI來查找Destination。 |
| Connection 接口(連接) | Connection表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。 |
| Session 接口(會話) | Session是我們操作消息的接口。表示一個單線程的上下文,用于發(fā)送和接收消息。由于會話是單線程的,所以消息是連續(xù)的,就是說消息是按照發(fā)送的順序一個一個接收的。可以通過session創(chuàng)建生產者、消費者、消息等。Session提供了事務的功能。當我們需要使用session發(fā)送/接收多個消息時,可以將這些發(fā)送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。 |
| MessageProducer 接口(消息的生產者) | 消息生產者由Session創(chuàng)建,并用于將消息發(fā)送到Destination。消費者可以同步地(阻塞模式),或異步(非阻塞)接收隊列和主題類型的消息。同樣,消息生產者分兩種類型:QueueSender和TopicPublisher。可以調用消息生產者的方法(send或publish方法)發(fā)送消息。 |
| MessageConsumer 接口(消息消費者) | 消息消費者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息。兩種類型:QueueReceiver和TopicSubscriber??煞謩e通過session的createReceiver(Queue)或createSubscriber(Topic)來創(chuàng)建。當然,也可以session的creatDurableSubscriber方法來創(chuàng)建持久化的訂閱者。 |
| Message 接口(消息) | 是在消費者和生產者之間傳送的對象,也就是說從一個應用程序創(chuàng)送到另一個應用程序。一個消息有三個主要部分:1、消息頭(必須):包含用于識別和為消息尋找路由的操作設置。2、一組消息屬性(可選):包含額外的屬性,支持其他提供者和用戶的兼容??梢詣?chuàng)建定制的字段和過濾器(消息選擇器)。3、一個消息體(可選):允許用戶創(chuàng)建五種類型的消息(文本消息,映射消息,字節(jié)消息,流消息和對象消息)。消息接口非常靈活,并提供了許多方式來定制消息的內容。 |
| MessageListener | 消息監(jiān)聽器。如果注冊了消息監(jiān)聽器,一旦消息到達,將自動調用監(jiān)聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。 |
如下圖:

其他專題:
Redis:http://www.itdecent.cn/nb/32287093