本文基于《Spring實(shí)戰(zhàn)(第4版)》所寫。
異步消息是一個(gè)應(yīng)用程序向另一個(gè)應(yīng)用程序間接發(fā)送消息的一種方式,這種方式無需等待對(duì)方的響應(yīng)。借助Spring,我們有多個(gè)實(shí)現(xiàn)異步消息的可選方案:在Spring中使用Java消息服務(wù)(Java Message Service, JMS)和高級(jí)消息隊(duì)列協(xié)議(Advanced Message Queuing Protocol, AMQP)發(fā)送和接收消息。除了基本的消息發(fā)送和接收之外,我們還會(huì)看到Spring對(duì)消息驅(qū)動(dòng)POJO的支持,它是一種與EJB的消息驅(qū)動(dòng)Bean(message-driven bean, MDB)類似的消息接收方式。
在異步消息中有兩個(gè)主要的概念:消息代理(message broker)和目的地(destination)。當(dāng)一個(gè)應(yīng)用發(fā)送消息時(shí),會(huì)將消息交給一個(gè)消息代理。消息代理可以確保消息被投遞到指定的目的地,同時(shí)解放發(fā)送者,使其能夠繼續(xù)進(jìn)行其他的業(yè)務(wù)。
盡管不同的消息系統(tǒng)會(huì)提供不同的消息路由模式,但是有兩種通用的目的地:隊(duì)列(queue)和主題(topic)。每種類型都與特定的消息模型相關(guān)聯(lián),分別是點(diǎn)對(duì)點(diǎn)模型(隊(duì)列)和發(fā)布/訂閱模型(主題)。
點(diǎn)對(duì)點(diǎn)消息模型
在點(diǎn)對(duì)點(diǎn)模型中,每一條消息都有一個(gè)發(fā)送者和一個(gè)接收者,如下圖。當(dāng)消息代理得到消息時(shí),它將消息放入一個(gè)隊(duì)列中。當(dāng)接收者請(qǐng)求隊(duì)列中的下一條消息時(shí),消息會(huì)從隊(duì)列中取出,并投遞給接收者。因?yàn)橄⑼哆f后會(huì)從隊(duì)列中刪除,這樣就可以保證消息只能投遞給一個(gè)接收者。

通??梢允褂脦讉€(gè)接收者來處理隊(duì)列中的消息。不過,每個(gè)接收者都會(huì)處理自己所接收到的消息。如果想要提高應(yīng)用的消息處理能力,我們只需簡(jiǎn)單地為隊(duì)列添加新的監(jiān)視器就可以了。
發(fā)布—訂閱消息模型
在發(fā)布—訂閱消息模型中,消息會(huì)發(fā)送給一個(gè)主題。與隊(duì)列類似,多個(gè)接收者都可以監(jiān)聽一個(gè)主題。但是,與隊(duì)列不同的是,消息不再是只投遞給一個(gè)接收者,而是主題的所有訂閱者都會(huì)接收到此消息的副本,如下圖

異步消息的優(yōu)點(diǎn):
- 無需等待:客戶端不需要等待,可以繼續(xù)執(zhí)行其他任務(wù)。
- 面向消息和解耦:發(fā)送異步消息是以數(shù)據(jù)為中心的,客戶端不必了解遠(yuǎn)程服務(wù)的任何規(guī)范。
- 位置獨(dú)立:只要服務(wù)能夠從隊(duì)列或主題中獲取消息即可,消息客戶端根本不需要關(guān)注服務(wù)來自哪里。
- 確保投遞:當(dāng)發(fā)送異步消息時(shí),客戶端可以相信消息會(huì)被投遞。即使在消息發(fā)送時(shí),服務(wù)無法使用,消息也會(huì)被儲(chǔ)存起來,直到服務(wù)重新可以使用為止。
使用JMS發(fā)送消息
Java消息服務(wù)(Java Message Service , JMS)是一個(gè)Java標(biāo)準(zhǔn),定義了使用消息代理的通用API。
Spring通過基于模版的抽象為JMS功能提供了支持,這個(gè)模版也就是JmsTemplate。Spring還提供了消息驅(qū)動(dòng)POJO的理念:這是一個(gè)簡(jiǎn)單的Java對(duì)象,它能夠以異步的方式響應(yīng)隊(duì)列或主題上的到達(dá)的消息。
在Spring中搭建消息代理
ActiveMQ是一個(gè)偉大的開源消息代理產(chǎn)品,也是使用JMS進(jìn)行異步消息傳遞的最佳選擇。ActiveMQ可在http://activemq.apache.org下載二進(jìn)制發(fā)行包。下載后可以在bin目錄下,找到對(duì)應(yīng)的子目錄,子目錄中有相應(yīng)的啟動(dòng)ActiveMQ的腳本。例如,OS X系統(tǒng)可在“bin/macosx”目錄下運(yùn)行“activemq start”。運(yùn)行腳本后,ActiveMQ就準(zhǔn)備好了,可以在瀏覽器輸入“http://localhost:8161/admin/”進(jìn)入ActiveMQ管理頁,用戶名密碼應(yīng)該都是admin
創(chuàng)建連接工廠
我們必須配置JMS連接工廠,讓它知道如何連接到ActiveMQ。配置如下:
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"
p:brokerURL="tcp:localhost:61616" />
也可以使用ActiveMQ的命名空間配置
<amq:connectionFactory id="connectionFactory" brokerURL="tcp://localhost:61616" />
amq是ActiveMQ的命名空間,使用前我們必須確保在Spring的配置文件中聲明了amq命名空間:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
.....
</beans>
聲明目的地
例如,下面的<bean>聲明定義了一個(gè)ActiveMQ隊(duì)列:
<bean id="queue" class="org.apache.activemq.command.ActiveMQQueue"
c:_="spitter.queue" />
命名空間配置如下:
<amq:queue id="spittleQueue" physicalName="spitter.queue" />
同樣,下面<bean>聲明定義了一個(gè)ActiveMQ主題:
<bean id="topic" class="org.apache.activemq.command.ActiveMQTopic"
c:_="spitter.queue" />
命名空間配置如下:
<amq:topic id="spittleTopic" physicalName="spitter.topic" />
使用Spring的JMS模版
針對(duì)如何消除冗長(zhǎng)和重復(fù)的JMS代碼,Spring給出的解決方案是JmsTemplate。JmsTemplate可以創(chuàng)建連接、獲得會(huì)話以及發(fā)送和接收消息。
另外,JmsTemplate可以處理拋出的笨拙的JMSException異常。如果在使用JmsTemplate時(shí)拋出JMSException異常,JmsTemplate將捕獲該異常,然后拋出一個(gè)非檢查型異常,該異常是Spring自帶的JmsException異常的子類。
為了使用JmsTemplate,我們需要在Spring的配置文件中將它聲明為一個(gè)bean,除了配置了工廠,還配置了默認(rèn)隊(duì)列。如下的XML可以完成這項(xiàng)工作:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
c:_0-ref="connectionFactory" p:defaultDestination-ref="queue"/>
發(fā)送消息
為了在Spittle創(chuàng)建的時(shí)候異步發(fā)送spittle提醒,讓我們?yōu)镾pittr應(yīng)用引入AlertService:
package spittr.web;
import spittr.model.Spittle;
public interface AlertService {
void sendSpittleAlert(Spittle spittle);
}
然后實(shí)現(xiàn)該接口,方法中使用JmsOperation將Spittle對(duì)象發(fā)送給消息隊(duì)列。
package spittr.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsOperations;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;
import spittr.model.Spittle;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
@Service
public class AlertServiceImpl implements AlertService {
private JmsOperations jmsOperations;
//注入JMS模版
@Autowired
public AlertServiceImpl(JmsOperations jmsOperations) {
this.jmsOperations = jmsOperations;
}
@Override
public void sendSpittleAlert(final Spittle spittle) {
//創(chuàng)建并發(fā)送消息,如在配置jmsTemplate時(shí),配置了默認(rèn)目的地,可在此步驟省略代碼中目的地配置。
jmsOperations.send("spitter.queue", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(spittle);
}
});
}
}
在發(fā)消息時(shí),對(duì)消息進(jìn)行轉(zhuǎn)換
除了send()方法,JmsTemplate還提供了convertAndSend()方法。與send()方法不同,convertAndSend()方法并不需要 MessageCreator作為參數(shù)。這是因?yàn)閏onvertAndSend()會(huì)使用內(nèi)置的消息轉(zhuǎn)換器(message converter)為我們創(chuàng)建信息。
當(dāng)我們使用convertAndSend()時(shí),代碼可以減少到方法體只包含一行代碼:
@Override
public void sendSpittleAlert(final Spittle spittle) {
// 使用消息轉(zhuǎn)換器創(chuàng)建消息,默認(rèn)情況下會(huì)使用SimpleMessageConverter
jmsOperations.convertAndSend(spittle);
}
這個(gè)接口之所以簡(jiǎn)單,是因?yàn)镾pring已經(jīng)提供了多個(gè)實(shí)現(xiàn),我們沒有必要?jiǎng)?chuàng)建自定義的實(shí)現(xiàn)。如下表所示
| 消息轉(zhuǎn)換器 | 功能 |
|---|---|
| MappingJacksonMessageConverter | 使用Jackson JSON庫(kù)實(shí)現(xiàn)消息與JSON格式之間的相互轉(zhuǎn)換 |
| MappingJackson2MessageConverter | 使用Jackson2 JSON庫(kù)實(shí)現(xiàn)消息與JSON格式之間的相互轉(zhuǎn)換 |
| MarshallingMessageConverter | 使用JAXB庫(kù)實(shí)現(xiàn)消息與XML格式之間的相互轉(zhuǎn)換 |
| SimpleMessageConverter | 實(shí)現(xiàn)String與TextMessage之間的相互轉(zhuǎn)換,字節(jié)數(shù)組與ByteMessage之間的相互轉(zhuǎn)換,Map與MapMessage之間的相互轉(zhuǎn)換以及Serializable對(duì)象與ObjectMessage之間的相互轉(zhuǎn)換 |
默認(rèn)情況下,JmsTemplate在convertAndSend()方法中會(huì)使用SimpleMessageConverter。如果想使用其他轉(zhuǎn)換器,可以重寫這種行為。例如,想使用JSON消息的話
<bean id="messageConverter" class="org.springframework.jms.support.converter.MappingJacksonMessageConverter" />
然后,我們可以將其注入到JmsTemplate中,如下所示:
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"
c:_-ref="connectionFactory"
p:defaultDestinationName="spittle.queue"
p:messageConverter-ref="messageConverter" />
同步接收消息
使用JmsTemplate接收消息
方法一,使用ObjectMessage,如下
public Spittle receiveSpittleAlert(){
try{
// 接收消息
ObjectMessage receivedMessage = (ObjectMessage) jmsOperations.receive();
// 獲得對(duì)象
return (Spittle) receivedMessage.getObject();
}
catch (JMSException jmsException) {
// 拋出轉(zhuǎn)換后的異常
throw JmsUtils.convertJmsAccessException(jmsException);
}
}
方法二,使用消息轉(zhuǎn)換器,如下
public Spittle receiveSpittleAlert(){
return (Spittle) jmsOperations.receiveAndConvert();
}
創(chuàng)建消息驅(qū)動(dòng)的POJO(消息監(jiān)聽器)
Spring提供了以POJO的方式處理消息的能力,這些消息來自于JMS的隊(duì)列或主題中。例如,基于POJO是想SpittleAlertHandler就足以做到這一點(diǎn)。
package spittr.web;
import spittr.model.Spittle;
public class SpittleAlertHandler {
public void handleSpittleAlert(Spittle spittle) {
// ... implementation goes here ... 處理方法
}
}
為POJO賦予消息接收能力的訣竅是在Spring中把它配置為消息監(jiān)聽器。Spring的jms命名空間為我們提供了所需要的一切。首先,讓我們先把處理器聲明為bean:
<bean id="spittleHandler" class="spittr.web.SpittleAlertHandler" />
然后,為了把SpittleAlertHandler轉(zhuǎn)變?yōu)橄Ⅱ?qū)動(dòng)的POJO,我們需要把這個(gè)bean聲明為消息監(jiān)聽器:
<jms:listener-container >
<jms:listener destination="spitter.queue" ref="spittleHandler" method="handleSpittleAlert"/>
</jms:listener-container>
在這里,我們?cè)谙⒈O(jiān)聽器中包含了一個(gè)消息監(jiān)聽器。消息監(jiān)聽器是一個(gè)特殊的bean,它可以監(jiān)控JMS目的地并等待消息到達(dá)。一旦有消息到達(dá),它取出信息,然后把消息傳給任意一個(gè)對(duì)此消息感興趣的消息監(jiān)聽器,如下圖展示了這個(gè)交互過程。

使用AMQP實(shí)現(xiàn)消息功能
AMQP具有多項(xiàng)JMS所不具備的優(yōu)勢(shì)。首先,AMQP為消息定義了線路層的協(xié)議,而JMS所定義的是API規(guī)范。JMS的API協(xié)議能夠確保所有的實(shí)現(xiàn)都能通過的API來使用,但是并不能保證某個(gè)JMS實(shí)現(xiàn)所發(fā)送的消息能夠被另外不同的JMS實(shí)現(xiàn)所使用。而AMQP的線路層協(xié)議規(guī)范了消息的格式,消息在生產(chǎn)者和消費(fèi)者間傳送的時(shí)候會(huì)遵循這個(gè)格式。這個(gè)AMQP在互相協(xié)作方面就要優(yōu)于JMS—它不僅能跨不同的AMQP實(shí)現(xiàn),還能跨語言和平臺(tái)。
相比JMS,AMQP另外一個(gè)明顯的優(yōu)勢(shì)在于它具有更加靈活和透明的消息模型。使用JMS的話,只有兩種消息模型可供選擇:點(diǎn)對(duì)點(diǎn)和發(fā)布-訂閱。這兩種模型在AMQP當(dāng)然都是可以實(shí)現(xiàn)的,但AMQP還能夠讓我們以其他的多種方式來發(fā)送消息,這是通過將消息的生產(chǎn)者與存放消息的隊(duì)列解耦實(shí)現(xiàn)的。
AMQP的生產(chǎn)者并不會(huì)直接將消息發(fā)布到隊(duì)列中。AMQP在消息的生產(chǎn)者以及傳遞信息的隊(duì)列之間引入了一種間接的機(jī)制:Exchange。這種關(guān)系如下圖

可以看到,消息的生產(chǎn)者將信息發(fā)布到一個(gè)Exchange。Exchange會(huì)綁定到一個(gè)或多個(gè)隊(duì)列上它負(fù)責(zé)將信息路由到隊(duì)列上。信息的消費(fèi)者會(huì)從隊(duì)列中提取數(shù)據(jù)并進(jìn)行處理。
AMQP定義了四種不同類型的Exchange,每一種都有不同的路由算法,這些算法決定了是否要將信息放到隊(duì)列中。根據(jù)Exchange的算法不同,它可能會(huì)使用消息的routing key和/或參數(shù),并將其與Exchange和隊(duì)列之間binding的routing key和參數(shù)進(jìn)行對(duì)比。如果對(duì)比結(jié)果滿足相應(yīng)的算法,那么消息將會(huì)路由到隊(duì)列上。否則的話,將不會(huì)路由到隊(duì)列上。
四種標(biāo)準(zhǔn)的AMQP Exchange如下所示:
- Direct: 如果消息的routing key與binding的routing key直接匹配的話,消息將會(huì)路由到該隊(duì)列上;
- Topic: 如果消息的routing key與binding的routing key附和通配符匹配的話,消息將會(huì)路由到該隊(duì)列上;
- Headers: 如果消息參數(shù)表中的頭信息和值都與binding參數(shù)表中相匹配,消息將會(huì)路由到該隊(duì)列上;
- Fanout: 不管消息的routing key和參數(shù)表和頭信息/值是什么,消息將會(huì)路由到所有隊(duì)列上。
生產(chǎn)者將信息發(fā)送給Exchange并帶有一個(gè)routing key,消費(fèi)者從隊(duì)列中獲取消息。
配置Spring支持AMQP消息
使用Spring AMQP前也要配置一個(gè)連接工廠,需要配置RabbitMQ連接工廠。RabbitMQ是一個(gè)流行的開源消息代理,它實(shí)現(xiàn)了AMQP。
使用之前,需要安裝RabbitMQ,安裝地址:http://www.rabbitmq.com/download.html。由于RabbitMQ用erlang語言開發(fā),所有安裝RabbitMQ前需要先安裝Erlang,安裝地址:http://www.erlang.org/downloads
安裝成功后,進(jìn)入/usr/local/sbin,執(zhí)行./rabbitmq-server,來啟動(dòng)rabbitmq。用戶名和密碼默認(rèn)都是guest,可瀏覽器直接登錄http://localhost:15672/查看rabbitmq的管理平臺(tái)。
配置RabbitMQ連接工廠最簡(jiǎn)單的方式就是使用Spring AMQP所提供的rabbit配置命名空間。為了使用這項(xiàng)功能,需要確保在Spring配置文件中已經(jīng)聲明了該模式:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:mvc="http://www.springframework.org/schema/mvc"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
....
</beans>
接下來,配置工廠信息
<!--配置連接工廠, 默認(rèn)地址localhost,默認(rèn)端口5672,并且用戶名和密碼均為guest-->
<rabbit:connection-factory id="connectionFactory" host="localhost" port="5672" username="guest" password="guest"/>
除了連接工廠以外,我們還要考慮使用其他的幾個(gè)配置元素。接下來,看一下如何創(chuàng)建隊(duì)列、Exchange已經(jīng)binding。
聲明隊(duì)列、Exchange已經(jīng)binding
rabbit命名空間包含了多個(gè)元素,幫助我們聲明隊(duì)列、Exchange以及將他們結(jié)合在一起的binding。下表中列出了這些元素。
| 元素 | 作用 |
|---|---|
| <queue> | 創(chuàng)建一個(gè)隊(duì)列 |
| <fanout-exchange> | 創(chuàng)建一個(gè)fanout類型的Exchange |
| <header-exchange> | 創(chuàng)建一個(gè)header類型的Exchange |
| <topic-exchange> | 創(chuàng)建一個(gè)topic類型的Exchange |
| <direct-exchange> | 創(chuàng)建一個(gè)direct類型的Exchange |
| <bindings> <binding /> </bindings> | 元素定義一個(gè)或多個(gè)元素的集合。元素創(chuàng)建Exchange和隊(duì)列之間的binding |
這些配置元素要與<admin> 元素一起使用。<admin>元素會(huì)創(chuàng)建一個(gè)RabbitMQ管理組件,它會(huì)自動(dòng)創(chuàng)建上述這些元素所聲明的隊(duì)列、Exchange以及binding。
例如,如果你希望聲明為spittle.alerts的隊(duì)列,只需要在Spring配置中添加如下代碼:
<rabbit:admin connection-factory="connectionFactory" />
<rabbit:queue id="spittleAlertQueue" name="spittle.alerts" />
<rabbit:fanout-exchange name="spittle.alert.exchange" >
<rabbit:bindings>
<rabbit:binding queue="spittleAlertQueue"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
使用RabbitTemplate發(fā)送消息
配置RabbitTemplate的最簡(jiǎn)單方式是使用rabbit命名空間的<template>元素,如下所示:
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"/>
現(xiàn)在,要發(fā)送消息的話,我們只需要將模版bean注入到AlertServiceImpl中,并使用它來發(fā)送Spittle。如下的程序清單展示了一個(gè)新版本的AlertServiceImpl,它使用RabbitTemplate代替JmsTemplate來發(fā)送Spittle提醒。
package spittr.web;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import spittr.model.Spittle;
@Service
public class AlertServiceImpl implements AlertService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendSpittleAlert(Spittle spittle) {
rabbitTemplate.convertAndSend("spittle.alert.exchange",
"spittle.alerts",
spittle);
}
}
RabbitTemplate有多個(gè)重載版本的convertAndSend()方法??梢酝瑫r(shí)省略Exchange名稱和routing key:
rabbitTemplate.convertAndSend(spittle);
那么可以在<template>元素上借助exchange和routing-key屬性配置不同的默認(rèn)值:
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spittle.alert.exchange"
routing-key="spittle.alerts" />
convertAndSend()也是需要一個(gè)消息轉(zhuǎn)換器的幫助來完成任務(wù),默認(rèn)的消息轉(zhuǎn)換器是SimpleMessageConverter。Spring AMQP還提供了其他幾個(gè)有用的消息轉(zhuǎn)換器,其中包括使用JSON和XML數(shù)據(jù)的消息轉(zhuǎn)換器。
接收AMQP消息
Spring AMQP也是提供了兩種方式來獲取消息。
使用RabbitTemplate來接收消息
<template>的配置與發(fā)送消息一致。接收方法如下:
Spittle spittle = (Spittle) rabbit.receiveAndConvert("spittle.alerts");
如果<template>的配置已經(jīng)隊(duì)列名稱,可以省略代碼中的隊(duì)列名稱。
定義消息驅(qū)動(dòng)的AMQP POJO
如果想在消息驅(qū)動(dòng)POJO中異步地消費(fèi)使用Spittle對(duì)象,首先要解決的問題就是這個(gè)POJO本身。如下的SpittleAlertHandler扮演了這個(gè)角色:
package spittr.web;
import spittr.model.Spittle;
public class SpittleAlertHandler {
public void handleSpittleAlert(Spittle spittle) {
// ... implementation goes here ...
}
}
我們還需要在Spring應(yīng)用上下文中將SpittleAlertHandler聲明為一個(gè)bean:
<bean id="spittleHandler" class="spittr.web.SpittleAlertHandler" />
最后,我們需要聲明一個(gè)監(jiān)聽器容器和監(jiān)聽器,當(dāng)消息到達(dá)的時(shí)候,能夠調(diào)用SpittleAlertHandler。
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="spittleHandler" method="handleSpittleAlert" queue-names="spittle.alerts" />
</rabbit:listener-container>