1、消息隊列是什么
分布式系統(tǒng)中常用通訊模型主要是“請求-應(yīng)答”模型和“發(fā)布-訂閱”模型。前者常見如RPC通訊,常用HTTP REST或Dubbo等協(xié)議;后者多指消息隊列通訊。
RPC大多屬于請求-應(yīng)答模式,也包括越來越多響應(yīng)式范式,對于需要點對點交互、強事務(wù)保證和延遲敏感的服務(wù)/應(yīng)用之間的通信,RPC是優(yōu)于消息隊列的。
消息隊列(Message Queue,MQ)可以看做是一種異步RPC,把一次RPC變?yōu)閮纱位蚨啻?,進(jìn)行內(nèi)容轉(zhuǎn)存,再在合適的時機投遞出去。消息的發(fā)送者和接收者不需要在同一時間與消息隊列進(jìn)行交互,消息在被處理或被刪除之前一直存儲在隊列上。

消息隊列提供一個臨時存儲消息的輕量級緩存區(qū),以及允許軟件組件連接到隊列以發(fā)送接受消息的的終端節(jié)點。這些消息通常較小,可以是請求、恢復(fù)、錯誤消息或明文消息等。要發(fā)送消息時,一個名為“生產(chǎn)者”的組件將消息添加到隊列。消息將存儲在隊列中,直至名為“消費者”的另一組件檢索該消息并執(zhí)行相關(guān)操作。
許多生產(chǎn)者和消費者都可以使用隊列,但一條信息只能有一組消費者處理一次。因此,這種消息收發(fā)模式通常稱為一對一或點對點通信。如果消息需要由多個消費者進(jìn)行處理,可以將將消息隊列與發(fā)布/訂閱結(jié)合起來使用。
2、消息隊列協(xié)議
AMQP、MQTT和STOMP是三種最常見、最流行的基于TCP/IP的消息傳遞協(xié)議。
2.1 AMQP
AMQP,即高級消息隊列協(xié)議(Advanced Message Queuing Protocol),一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
核心角色如下:
-
Message(消息):消息服務(wù)器處理消息的原子單元,包括一個內(nèi)容頭,一組屬性和一個內(nèi)容體。
消息有優(yōu)先級,高優(yōu)先級的消息在等待同一消息隊列時會比低優(yōu)先級的消息先發(fā)送,而且當(dāng)消息必須被丟棄時,低優(yōu)先級的消息優(yōu)先被丟棄。
使用AMQP協(xié)議,消息服務(wù)器不能修改內(nèi)容體和內(nèi)容頭,但可以在內(nèi)容頭上添加額外信息。 - PubLisher(消息生產(chǎn)者):發(fā)送消息
- Consumer(消息消費者):消費消息
- Broker(消息代理):消息隊列服務(wù)器,負(fù)責(zé)接收客戶端連接,路由消息。
- Queue(消息隊列):Broker中的一個角色,一個Broker中可以有多個Queue,負(fù)責(zé)保存消息直到發(fā)送給不同的消費者。算是消息的容器。一個消息可以被投入一個或多個隊列中,每個隊列的消息都會等待消費者連接到這個隊列并被取走。
- Exchange(交換路由):Broker中的一個角色,負(fù)責(zé)接收生產(chǎn)者發(fā)送的消息,并路由給服務(wù)器中的隊列??梢员焕斫獬梢粋€規(guī)則表,指明消息該被投到哪個隊列中。
- Channel(信道):信道是一條獨立的雙向數(shù)據(jù)流通道。為了解決操作系統(tǒng)無法承受每秒建立特別多的TCP連接。
生產(chǎn)者發(fā)送消息時,必須指定消息要被路由到哪些個消息隊列中。當(dāng)消息到消息隊列中,消息隊列會嘗試將消息傳給消費者,如果失敗,消息隊列會存儲消息并等待消費者。如果沒有消費者,消息隊列將選擇性的將消息返回給生產(chǎn)者。如果消息別消費掉,消息隊列會刪除消息,刪除的過程或者是及時的,或者是等到消費者消費結(jié)果后才刪除的。
RabbitMQ是AMQP消息隊列最有名的開源實現(xiàn),當(dāng)然RabbitMQ同時還可以通過插件支持STOMP、MQTT等協(xié)議接入。Kafka、RocketMQ均使用自定義的協(xié)議。
2.2 MQTT
MQTT,即消息隊列遙測傳輸(Message Queuing Telemetry Transport)。由IBM開發(fā),現(xiàn)在被廣泛用于物聯(lián)網(wǎng)公司。因為他的特點就是輕量,簡單,開放和易于實現(xiàn)。所以它常用于很多計算能力有限、帶寬低、網(wǎng)絡(luò)不可靠的遠(yuǎn)程通信應(yīng)用場景。
核心角色如下:
- Publisher(發(fā)布者):消息發(fā)布客戶端
- Subscriber(訂閱者):消息訂閱客戶端
- Broker(消息代理):消息服務(wù)器端
- Application Message(應(yīng)用消息):指通過網(wǎng)絡(luò)傳輸?shù)膽?yīng)用數(shù)據(jù),一般包括主題和負(fù)載。
- Topic(主題):應(yīng)用消息的類型,一般消息發(fā)布者會確定消息的主題,訂閱者根據(jù)自己實際情況選擇不同的主題進(jìn)行消息訂閱消費。
- Payload(負(fù)載):消息訂閱者具體接收的內(nèi)容。
MQTT協(xié)議是通過交換預(yù)定義的MQTT控制報文來通信的,控制報文內(nèi)容由三部分組成:固定報頭,可變報頭和消息體。固定報頭通過標(biāo)識不同位的值來確定報文類型,包括發(fā)布訂閱的一些完成狀態(tài)等;可變報頭的內(nèi)容根據(jù)控制報文類型不同而不同,常作為包的標(biāo)識符;消息體也是根據(jù)不同的消息類型有著不同的內(nèi)容。
MQTT協(xié)議中,客戶端和服務(wù)端是通過請求-應(yīng)答模式通信的??蛻舳税l(fā)送一條控制報文數(shù)據(jù)給服務(wù)器,服務(wù)器再發(fā)送一條控制報文數(shù)據(jù)給客戶端。
MQTT在發(fā)布消息時,有三種Qos等級:
- 至多一次(0級)
- 至少一次(1級)
- 只有一次(2級)
至多一次等級最低,客戶端只需要將消息發(fā)出去即可,這種等級很低,用于消息不重要但特別多,為了減輕通信壓力,就不顧質(zhì)量,只看數(shù)量了。
至少一次等級中等,客戶端要保證發(fā)出去的消息至少一次被服務(wù)端接收到,所以要收到服務(wù)端的回應(yīng),否則一直發(fā),這種等級一般用于服務(wù)端有冪等處理,所以不怕重復(fù)消費,還要保證消息不會丟失。
只有一次等級最高,客戶端先發(fā)消息過去,然后本地記錄一個我已發(fā)送,但不確定你是否收到的狀態(tài),然后服務(wù)端接收到消息后,回給客戶端一個我已接收的報文,同時服務(wù)端記錄一個我不確定你知不知道我已接收的狀態(tài),然后客戶端收到這個已接收的消息后,就確定服務(wù)端收到這個消息了,于是把自己本地記錄的已發(fā)送未確定的狀態(tài)刪除,同時再給客戶端發(fā)送一個我已經(jīng)知道你收到的報文,服務(wù)端收到這個報文,也會把自己之前記錄的狀態(tài)刪掉,整個一條報文只有一次的通信才算完成,這種等級就比較嚴(yán)格了,但質(zhì)量上去了,相對低等級的,數(shù)量就會相對小些,但可靠就是王道,不多不少才是最好的。
只有一次的發(fā)送和確定,其實思想和三次握手差不多,都是兩端互相確認(rèn)的過程,所以會一來一回的。如果傳輸過程中出現(xiàn)丟包,都會由發(fā)送者重發(fā)上一條消息。
2.3 STOMP
STOMP,即流文本定向消息協(xié)議(Streaming Text Orientated Messaging Protocal),是一個相對簡單的文本消息傳輸協(xié)議,主要特點就是簡單易懂,沒有特別多的套路。
核心角色如下:
- 客戶端:既可以是生產(chǎn)者,也可以是消費者
- 服務(wù)端:消息中心
ActiveMQ以及它的下一代實現(xiàn)Apache Apollo,是STOMP協(xié)議的典型實現(xiàn)。
3、JMS
3.1 概述
JMS(Java MessageService)實際上是一套JAVA API接口,是由Sun公司早期提出的消息標(biāo)準(zhǔn),旨在為java應(yīng)用提供統(tǒng)一的消息操作,包括create、send、receive等。
嚴(yán)格來講,JMS并不屬于消息協(xié)議,而是一種規(guī)范,是對AMQP,MQTT,STOMP等協(xié)議更高一層的抽象。從使用角度看,JMS和JDBC擔(dān)任差不多的角色,用戶都是根據(jù)相應(yīng)的接口可以和實現(xiàn)了JMS的服務(wù)進(jìn)行通信,進(jìn)行相關(guān)的操作。
JMS通常包含如下一些角色:
JMS provider:實現(xiàn)了JMS接口的消息中間件,如ActiveMQ
JMS client:生產(chǎn)或者消費消息的應(yīng)用
JMS producer/publisher:JMS消息生產(chǎn)者
JMS consumer/subscriber:JMS消息消費者
JMS message:消息,在各個JMS client傳輸?shù)膶ο?/p>
JMS queue:Provider存放等待被消費的消息的地方
JMS topic:一種提供多個訂閱者消費消息的一種機制;在MQ中常常被提到,topic模式
消息如何從producer端達(dá)到consumer端由message-routing來決定。在JMS中,消息路由非常簡單,由producer和consumer鏈接到同一個queue(p2p)或者topic(pub/sub)來實現(xiàn)消息的路由。JMSconsumer同時支持message selector(消息選擇器),通過消息選擇器,consumer可以只消費那些通過了selector篩選的消息。
3.2 通信模型
JMS具有兩種通信模式:
Point-to-Point Messaging Domain (點對點)
Publish/Subscribe Messaging Domain (發(fā)布/訂閱模式)
在JMS API出現(xiàn)之前,大部分產(chǎn)品使用“點對點”和“發(fā)布/訂閱”中的任一方式來進(jìn)行消息通訊。JMS定義了這兩種消息發(fā)送模型的規(guī)范,它們相互獨立。任何JMS的提供者可以實現(xiàn)其中的一種或兩種模型,這是它們自己的選擇。JMS規(guī)范提供了通用接口保證我們基于JMS API編寫的程序適用于任何一種模型。
3.2.1 點對點模型

在點對點通信模式中,應(yīng)用程序由消息隊列,發(fā)送方,接收方組成。每個消息都被發(fā)送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。
- 每個消息只要一個消費者
- 發(fā)送者和接收者在時間上是沒有時間的約束,也就是說發(fā)送者在發(fā)送完消息之后,不管接收者有沒有接受消息,都不會影響發(fā)送方發(fā)送消息到消息隊列中。
- 發(fā)送方不管是否在發(fā)送消息,接收方都可以從消息隊列中去到消息
- 接收方在接收完消息之后,需要向消息隊列應(yīng)答成功
3.2.2 發(fā)布/訂閱模型

在發(fā)布/訂閱消息模型中,發(fā)布者發(fā)布一個消息,該消息通過topic傳遞給所有的客戶端。該模式下,發(fā)布者與訂閱者都是匿名的,即發(fā)布者與訂閱者都不知道對方是誰。并且可以動態(tài)的發(fā)布與訂閱Topic。Topic主要用于保存和傳遞消息,且會一直保存消息直到消息被傳遞給客戶端。
- 一個消息可以傳遞個多個訂閱者(即:一個消息可以有多個接受方)
- 發(fā)布者與訂閱者具有時間約束,針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱者之后,才能消費發(fā)布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態(tài)。
- 為了緩和這樣嚴(yán)格的時間相關(guān)性,JMS允許訂閱者創(chuàng)建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發(fā)布者的消息。
3.3 接收消息
在JMS中,消息的產(chǎn)生和消息是異步的。對于消費來說,JMS的消息者可以通過兩種方式來消費消息。
- 同步(Synchronous)
在同步消費信息模式模式中,訂閱者/接收方通過調(diào)用 receive()方法來接收消息。在receive()方法中,線程會阻塞直到消息到達(dá)或者到指定時間后消息仍未到達(dá)。 - 異步(Asynchronous)
使用異步方式接收消息的話,消息訂閱者需注冊一個消息監(jiān)聽者,類似于事件監(jiān)聽器,只要消息到達(dá),JMS服務(wù)提供者會通過調(diào)用監(jiān)聽器的onMessage()遞送消息。
3.4 編程模型

- Connection Factories:創(chuàng)建Connection對象的工廠,針對兩種不同的jms消息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查找ConnectionFactory對象。客戶端使用一個連接工廠對象連接到JMS服務(wù)提供者,它創(chuàng)建了JMS服務(wù)提供者和客戶端之間的連接。JMS客戶端(如發(fā)送者或接受者)會在JNDI名字空間中搜索并獲取該連接。使用該連接,客戶端能夠與目的地通訊,往隊列或話題發(fā)送/接收消息。
- Destination:目的地指明消息被發(fā)送的目的地以及客戶端接收消息的來源。JMS使用兩種目的地,隊列和話題
- Connection:表示在客戶端和JMS系統(tǒng)之間建立的鏈接(對TCP/IP socket的包裝)。Connection可以產(chǎn)生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種類型:QueueConnection和TopicConnection。
- Session:對消息進(jìn)行操作的接口,可以通過session創(chuàng)建生產(chǎn)者、消費者、消息等。Session 提供了事務(wù)的功能,如果需要使用session發(fā)送/接收多個消息時,可以將這些發(fā)送/接收動作放到一個事務(wù)中。
- Producter:消息生產(chǎn)者由Session創(chuàng)建,用于往目的地發(fā)送消息。生產(chǎn)者實現(xiàn)MessageProducer接口,我們可以為目的地、隊列或話題創(chuàng)建生產(chǎn)者。
- Consumer:消息消費者由Session創(chuàng)建,用于接收被發(fā)送到Destination的消息。
- MessageListener:消息監(jiān)聽器。如果注冊了消息監(jiān)聽器,一旦消息到達(dá),將自動調(diào)用監(jiān)聽器的onMessage方法。