Spring學(xué)習(xí)之消息

這里主要說的是異步消息,異步消息時(shí)一個(gè)應(yīng)用程序向另一個(gè)應(yīng)用程序間接發(fā)送消息的一種方式,這種方式無需等待對(duì)方的響應(yīng)。

JMS

JMS(Java Message Service)是一個(gè)Java標(biāo)準(zhǔn),定義了使用消息代理的通用API。借助JMS,所有遵從規(guī)范的實(shí)現(xiàn)都使用通用的接口,這就類似于JDBC為數(shù)據(jù)庫操作提供了通用的接口一樣。

JMS支持點(diǎn)對(duì)點(diǎn)消息模型和發(fā)布-訂閱模型。

點(diǎn)對(duì)點(diǎn)消息模型

在點(diǎn)對(duì)點(diǎn)消息模型中,消息發(fā)送者將消息發(fā)送到隊(duì)列中,接收者從隊(duì)列中取出消息。

點(diǎn)對(duì)點(diǎn)消息模型

消息隊(duì)列對(duì)消息發(fā)送者和消息接收者進(jìn)行了解耦。雖然隊(duì)列可以有多個(gè)接收者,但是每一條消息只能被一個(gè)接收者取走。

發(fā)布-訂閱消息模型

在發(fā)布-訂閱消息模型中,消息會(huì)發(fā)送給一個(gè)主題,多個(gè)接收者都可以監(jiān)聽一個(gè)主題。與隊(duì)列不同,該主題的訂閱者都會(huì)接受到此消息的副本。

發(fā)布-訂閱消息模型

ActiveMQ就是遵從JMS規(guī)范的框架。

AMQP

AMQP(Advanced Message Queuing Protocol)是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
相比JMS,AMQP的優(yōu)勢(shì)在于它具有更加靈活和透明的消息模型。除了JMS支持的消息模型,AMQP還能夠讓我們以其他的多種方式來發(fā)送消息,這是通過將消息的生產(chǎn)者與存放消息的隊(duì)列解耦實(shí)現(xiàn)的。

AMQP消息模型

AMQP的生產(chǎn)者不會(huì)直接將消息發(fā)布到隊(duì)列中。AMQP在消息的生產(chǎn)者以及傳遞消息的隊(duì)列之間引入了一種間接的機(jī)制:Exchange。

AMQP消息模型

消息的生產(chǎn)者將信息發(fā)布到一個(gè)Exchange。Exchange會(huì)綁定到一個(gè)或多個(gè)隊(duì)列上,它負(fù)責(zé)將信息路由到隊(duì)列上。消息的消費(fèi)者會(huì)從隊(duì)列中提取數(shù)據(jù)并進(jìn)行處理。

AMQP定義了四中不同類型的Exchange,每一種都有不同的路由算法,這些算法決定了是否要將信息放到隊(duì)列中:

  • Direct:如果消息的routing key 與binding的routing key 直接匹配的話,消息將會(huì)路由到該隊(duì)列上;
  • Topic: 如果消息的routing key 與binding的routing key 符合通配符匹配的話,消息將會(huì)路由到該隊(duì)列上;
  • Headers: 如果消息參數(shù)表中的頭信息和值與binding參數(shù)表中相匹配,消息將會(huì)路由到該隊(duì)列上;
  • Fanout: 不管消息的routing key 和參數(shù)表的頭信息/值是什么,消息將會(huì)路由到所有隊(duì)列上。

RabbitMQ

RabbitMQ是一個(gè)流行的開源消息代理,它實(shí)現(xiàn)了AMQP。Spring AMQP為RabbitMQ提供了支持,包括RabbitMQ鏈接工廠,模板以及Spring配置命名空間。

安裝&啟動(dòng)服務(wù)

首先我們需要安裝RabbitMQ并啟動(dòng)服務(wù)。
Download
安裝的話可以搜索一下教程,網(wǎng)上很多。安裝好之后,配置環(huán)境,然后我們通過命令行輸入rabbitmq-server啟動(dòng)服務(wù)。出現(xiàn)如下界面表示服務(wù)已啟動(dòng):


  ##  ##
  ##  ##      RabbitMQ 3.7.7. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: /usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro.log
                    /usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro_upgrade.log

              Starting broker...
 completed with 3 plugins.

Spring配置

首先是pom.xml中導(dǎo)入依賴:

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

其次創(chuàng)建一個(gè)配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">

    <!--導(dǎo)入配置文件-->
    <context:property-placeholder location="classpath:rabbit.properties"/>

    <!-- 連接配置 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${mq.host}"
                               username="${mq.username}"
                               password="${mq.password}"
                               port="${mq.port}"
                               virtual-host='/'/>

    <rabbit:admin connection-factory="connectionFactory"/>

    <!--創(chuàng)建一個(gè)隊(duì)列-->
    <rabbit:queue id="spittleAlertQueue" name="spittle.alerts" />

    <!--配置rabbitTemplate-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--隊(duì)列監(jiān)聽器-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="spittleAlertQueue" ref="queueListener"/>
    </rabbit:listener-container>

    <!--注冊(cè)相關(guān)bean-->
    <bean id="alertService" class="com.hubert.rabbit.AlertServiceImpl">
        <constructor-arg ref="rabbitTemplate"/>
    </bean>
    <bean id="queueListener" class="com.hubert.rabbit.QueueListener"/>
</beans>

rabbit.properties

mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672

bean

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class QueueListener implements MessageListener {

    Logger logger = LoggerFactory.getLogger(QueueListener.class);

    @Override
    public void onMessage(Message msg) {
        logger.info("receive");
        try {
            logger.info(msg.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class AlertServiceImpl {
    private RabbitTemplate rabbit;

    public AlertServiceImpl(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
    }

    public void sendSpittleAlert() {
        rabbit.convertAndSend("spittle.alerts", "object");
    }
}

啟動(dòng)

最后是在main方法中發(fā)送消息:

public class RabbitMain {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("rabbitmq-config.xml");
        AlertServiceImpl service = (AlertServiceImpl) applicationContext.getBean("alertService");
        service.sendSpittleAlert();
    }
}

順利的話我們就會(huì)看到處理消息的日志:

INFO com.hubert.rabbit.QueueListener - (Body:'object' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=spittle.alerts, receivedDelay=null, deliveryTag=12, messageCount=0, consumerTag=amq.ctag-rLNiWGT6LCAMD3li8xv1qA, consumerQueue=spittle.alerts])

這個(gè)過程并不是那么順利,可能會(huì)出現(xiàn)鏈接錯(cuò)誤,權(quán)限被拒的情況,一般都是RabbitMQ的用戶權(quán)限或者Virtual Host的權(quán)限的問題。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,211評(píng)論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,694評(píng)論 19 139
  • 本文基于《Spring實(shí)戰(zhàn)(第4版)》所寫。 異步消息是一個(gè)應(yīng)用程序向另一個(gè)應(yīng)用程序間接發(fā)送消息的一種方式,這種方...
    陽光的技術(shù)小棧閱讀 1,925評(píng)論 0 1
  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,514評(píng)論 2 34
  • 關(guān)于消息隊(duì)列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 586,690評(píng)論 51 787

友情鏈接更多精彩內(nèi)容