Spring學(xué)習(xí)之消息

這里主要說的是異步消息,異步消息時一個應(yīng)用程序向另一個應(yīng)用程序間接發(fā)送消息的一種方式,這種方式無需等待對方的響應(yīng)。

JMS

JMS(Java Message Service)是一個Java標準,定義了使用消息代理的通用API。借助JMS,所有遵從規(guī)范的實現(xiàn)都使用通用的接口,這就類似于JDBC為數(shù)據(jù)庫操作提供了通用的接口一樣。

JMS支持點對點消息模型和發(fā)布-訂閱模型。

點對點消息模型

在點對點消息模型中,消息發(fā)送者將消息發(fā)送到隊列中,接收者從隊列中取出消息。

點對點消息模型

消息隊列對消息發(fā)送者和消息接收者進行了解耦。雖然隊列可以有多個接收者,但是每一條消息只能被一個接收者取走。

發(fā)布-訂閱消息模型

在發(fā)布-訂閱消息模型中,消息會發(fā)送給一個主題,多個接收者都可以監(jiān)聽一個主題。與隊列不同,該主題的訂閱者都會接受到此消息的副本。

發(fā)布-訂閱消息模型

ActiveMQ就是遵從JMS規(guī)范的框架。

AMQP

AMQP(Advanced Message Queuing Protocol)是一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標準高級消息隊列協(xié)議,是應(yīng)用層協(xié)議的一個開放標準,為面向消息的中間件設(shè)計?;诖藚f(xié)議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產(chǎn)品,不同的開發(fā)語言等條件的限制。
相比JMS,AMQP的優(yōu)勢在于它具有更加靈活和透明的消息模型。除了JMS支持的消息模型,AMQP還能夠讓我們以其他的多種方式來發(fā)送消息,這是通過將消息的生產(chǎn)者與存放消息的隊列解耦實現(xiàn)的。

AMQP消息模型

AMQP的生產(chǎn)者不會直接將消息發(fā)布到隊列中。AMQP在消息的生產(chǎn)者以及傳遞消息的隊列之間引入了一種間接的機制:Exchange。

AMQP消息模型

消息的生產(chǎn)者將信息發(fā)布到一個Exchange。Exchange會綁定到一個或多個隊列上,它負責(zé)將信息路由到隊列上。消息的消費者會從隊列中提取數(shù)據(jù)并進行處理。

AMQP定義了四中不同類型的Exchange,每一種都有不同的路由算法,這些算法決定了是否要將信息放到隊列中:

  • Direct:如果消息的routing key 與binding的routing key 直接匹配的話,消息將會路由到該隊列上;
  • Topic: 如果消息的routing key 與binding的routing key 符合通配符匹配的話,消息將會路由到該隊列上;
  • Headers: 如果消息參數(shù)表中的頭信息和值與binding參數(shù)表中相匹配,消息將會路由到該隊列上;
  • Fanout: 不管消息的routing key 和參數(shù)表的頭信息/值是什么,消息將會路由到所有隊列上。

RabbitMQ

RabbitMQ是一個流行的開源消息代理,它實現(xiàn)了AMQP。Spring AMQP為RabbitMQ提供了支持,包括RabbitMQ鏈接工廠,模板以及Spring配置命名空間。

安裝&啟動服務(wù)

首先我們需要安裝RabbitMQ并啟動服務(wù)。
Download
安裝的話可以搜索一下教程,網(wǎng)上很多。安裝好之后,配置環(huán)境,然后我們通過命令行輸入rabbitmq-server啟動服務(wù)。出現(xiàn)如下界面表示服務(wù)已啟動:


  ##  ##
  ##  ##      RabbitMQ 3.7.7. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: /usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro.log
                    /usr/local/rabbitmq_server-3.7.7/var/log/rabbitmq/rabbit@JuliedeMacBook-Pro_upgrade.log

              Starting broker...
 completed with 3 plugins.

Spring配置

首先是pom.xml中導(dǎo)入依賴:

<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

其次創(chuàng)建一個配置文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd
    http://www.springframework.org/schema/context
    http://www.springframework.org/schema/context/spring-context.xsd">

    <!--導(dǎo)入配置文件-->
    <context:property-placeholder location="classpath:rabbit.properties"/>

    <!-- 連接配置 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="${mq.host}"
                               username="${mq.username}"
                               password="${mq.password}"
                               port="${mq.port}"
                               virtual-host='/'/>

    <rabbit:admin connection-factory="connectionFactory"/>

    <!--創(chuàng)建一個隊列-->
    <rabbit:queue id="spittleAlertQueue" name="spittle.alerts" />

    <!--配置rabbitTemplate-->
    <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>

    <!--隊列監(jiān)聽器-->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener queues="spittleAlertQueue" ref="queueListener"/>
    </rabbit:listener-container>

    <!--注冊相關(guān)bean-->
    <bean id="alertService" class="com.hubert.rabbit.AlertServiceImpl">
        <constructor-arg ref="rabbitTemplate"/>
    </bean>
    <bean id="queueListener" class="com.hubert.rabbit.QueueListener"/>
</beans>

rabbit.properties

mq.host=localhost
mq.username=guest
mq.password=guest
mq.port=5672

bean

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class QueueListener implements MessageListener {

    Logger logger = LoggerFactory.getLogger(QueueListener.class);

    @Override
    public void onMessage(Message msg) {
        logger.info("receive");
        try {
            logger.info(msg.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
import org.springframework.amqp.rabbit.core.RabbitTemplate;

public class AlertServiceImpl {
    private RabbitTemplate rabbit;

    public AlertServiceImpl(RabbitTemplate rabbit) {
        this.rabbit = rabbit;
    }

    public void sendSpittleAlert() {
        rabbit.convertAndSend("spittle.alerts", "object");
    }
}

啟動

最后是在main方法中發(fā)送消息:

public class RabbitMain {
    public static void main(String[] args) {
        ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext("rabbitmq-config.xml");
        AlertServiceImpl service = (AlertServiceImpl) applicationContext.getBean("alertService");
        service.sendSpittleAlert();
    }
}

順利的話我們就會看到處理消息的日志:

INFO com.hubert.rabbit.QueueListener - (Body:'object' MessageProperties [headers={}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=spittle.alerts, receivedDelay=null, deliveryTag=12, messageCount=0, consumerTag=amq.ctag-rLNiWGT6LCAMD3li8xv1qA, consumerQueue=spittle.alerts])

這個過程并不是那么順利,可能會出現(xiàn)鏈接錯誤,權(quán)限被拒的情況,一般都是RabbitMQ的用戶權(quán)限或者Virtual Host的權(quán)限的問題。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,198評論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評論 19 139
  • 本文基于《Spring實戰(zhàn)(第4版)》所寫。 異步消息是一個應(yīng)用程序向另一個應(yīng)用程序間接發(fā)送消息的一種方式,這種方...
    陽光的技術(shù)小棧閱讀 1,914評論 0 1
  • 來源 RabbitMQ是用Erlang實現(xiàn)的一個高并發(fā)高可靠AMQP消息隊列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,505評論 2 34
  • 關(guān)于消息隊列,從前年開始斷斷續(xù)續(xù)看了些資料,想寫很久了,但一直沒騰出空,近來分別碰到幾個朋友聊這塊的技術(shù)選型,是時...
    預(yù)流閱讀 586,573評論 51 787

友情鏈接更多精彩內(nèi)容