這里主要說的是異步消息,異步消息時一個應(yīng)用程序向另一個應(yīng)用程序間接發(fā)送消息的一種方式,這種方式無需等待對方的響應(yīng)。
JMS
JMS(Java Message Service)是一個Java標準,定義了使用消息代理的通用API。借助JMS,所有遵從規(guī)范的實現(xiàn)都使用通用的接口,這就類似于JDBC為數(shù)據(jù)庫操作提供了通用的接口一樣。
JMS支持點對點消息模型和發(fā)布-訂閱模型。
點對點消息模型
在點對點消息模型中,消息發(fā)送者將消息發(fā)送到隊列中,接收者從隊列中取出消息。

消息隊列對消息發(fā)送者和消息接收者進行了解耦。雖然隊列可以有多個接收者,但是每一條消息只能被一個接收者取走。
發(fā)布-訂閱消息模型
在發(fā)布-訂閱消息模型中,消息會發(fā)送給一個主題,多個接收者都可以監(jiān)聽一個主題。與隊列不同,該主題的訂閱者都會接受到此消息的副本。

ActiveMQ就是遵從JMS規(guī)范的框架。
AMQP
AMQP(Advanced Message Queuing Protocol)是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標準高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標準,為面向消息的中間件設(shè)計?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
相比JMS,AMQP的優(yōu)勢在于它具有更加靈活和透明的消息模型。除了JMS支持的消息模型,AMQP還能夠讓我們以其他的多種方式來發(fā)送消息,這是通過將消息的生產(chǎn)者與存放消息的隊列解耦實現(xiàn)的。
AMQP消息模型
AMQP的生產(chǎn)者不會直接將消息發(fā)布到隊列中。AMQP在消息的生產(chǎn)者以及傳遞消息的隊列之間引入了一種間接的機制:Exchange。

消息的生產(chǎn)者將信息發(fā)布到一個Exchange。Exchange會綁定到一個或多個隊列上,它負責(zé)將信息路由到隊列上。消息的消費者會從隊列中提取數(shù)據(jù)并進行處理。
AMQP定義了四中不同類型的Exchange,每一種都有不同的路由算法,這些算法決定了是否要將信息放到隊列中:
- Direct:如果消息的routing key 與binding的routing key 直接匹配的話,消息將會路由到該隊列上;
- Topic: 如果消息的routing key 與binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上;
- Headers: 如果消息參數(shù)表中的頭信息和值與binding參數(shù)表中相匹配,消息將會路由到該隊列上;
- Fanout: 不管消息的routing key 和參數(shù)表的頭信息/值是什么,消息將會路由到所有隊列上。
RabbitMQ
RabbitMQ是一個流行的開源消息代理,它實現(xiàn)了AMQP。Spring AMQP為RabbitMQ提供了支持,包括RabbitMQ鏈接工廠,模板以及Spring配置命名空間。
安裝&啟動服務(wù)
首先我們需要安裝RabbitMQ并啟動服務(wù)。
Download
安裝的話可以搜索一下教程,網(wǎng)上很多。安裝好之后,配置環(huán)境,然后我們通過命令行輸入rabbitmq-server啟動服務(wù)。出現(xiàn)如下界面表示服務(wù)已啟動:
## ##
## ## RabbitMQ 3.7.7. Copyright (C) 2007-2018 Pivotal Software, Inc.
########## Licensed under the MPL. See http://www.rabbitmq.com/
###### ##
########## Logs: /usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro.log
/usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro_upgrade.log
Starting broker...
completed with 3 plugins.
Spring配置
首先是pom.xml中導(dǎo)入依賴:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
其次創(chuàng)建一個配置文件:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!--導(dǎo)入配置文件-->
<context:property-placeholder location="classpath:rabbit.properties"/>
<!-- 連接配置 -->
<rabbit:connection-factory id="connectionFactory"
host="${mq.host}"
username="${mq.username}"
password="${mq.password}"
port="${mq.port}"
virtual-host='/'/>
<rabbit:admin connection-factory="connectionFactory"/>
<!--創(chuàng)建一個隊列-->
<rabbit:queue id="spittleAlertQueue" name="spittle.alerts" />
<!--配置rabbitTemplate-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
<!--隊列監(jiān)聽器-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="spittleAlertQueue" ref="queueListener"/>
</rabbit:listener-container>
<!--注冊相關(guān)bean-->
<bean id="alertService" class="com.hubert.rabbit.AlertServiceImpl">
<constructor-arg ref="rabbitTemplate"/>
</bean>
<bean id="queueListener" class="com.hubert.rabbit.QueueListener"/>
</beans>
rabbit.properties
mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672
bean
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class QueueListener implements MessageListener {
Logger logger = LoggerFactory.getLogger(QueueListener.class);
@Override
public void onMessage(Message msg) {
logger.info("receive");
try {
logger.info(msg.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;
public class AlertServiceImpl {
private RabbitTemplate rabbit;
public AlertServiceImpl(RabbitTemplate rabbit) {
this.rabbit = rabbit;
}
public void sendSpittleAlert() {
rabbit.convertAndSend("spittle.alerts", "object");
}
}
啟動
最后是在main方法中發(fā)送消息:
public class RabbitMain {
public static void main(String[] args) {
ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("rabbitmq-config.xml");
AlertServiceImpl service = (AlertServiceImpl) applicationContext.getBean("alertService");
service.sendSpittleAlert();
}
}
順利的話我們就會看到處理消息的日志:
INFO com.hubert.rabbit.QueueListener - (Body:'object' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=spittle.alerts, receivedDelay=null, deliveryTag=12, messageCount=0, consumerTag=amq.ctag-rLNiWGT6LCAMD3li8xv1qA, consumerQueue=spittle.alerts])
這個過程并不是那么順利,可能會出現(xiàn)鏈接錯誤,權(quán)限被拒的情況,一般都是RabbitMQ的用戶權(quán)限或者Virtual Host的權(quán)限的問題。