spring-boot 集成 rabbitmq

http://blog.csdn.net/zl18310999566/article/details/54341057

RabbitMQ

介紹

RabbitMQ是流行的開源消息隊(duì)列系統(tǒng),用erlang語言開發(fā)。RabbitMQ是AMQP(高級(jí)消息隊(duì)列協(xié)議)的標(biāo)準(zhǔn)實(shí)現(xiàn)。

概念

Broker:簡(jiǎn)單來說就是消息隊(duì)列服務(wù)器實(shí)體。

Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。

Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。

Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。

Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。

vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。

producer:消息生產(chǎn)者,就是投遞消息的程序。

consumer:消息消費(fèi)者,就是接受消息的程序。

channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。

使用過程

客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè)channel。

客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性。

客戶端聲明一個(gè)queue,并設(shè)置相關(guān)屬性。

客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。

客戶端投遞消息到exchange。

exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里。

exchange也有幾個(gè)類型,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時(shí)設(shè)置了routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會(huì)投遞到隊(duì)列。對(duì)key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī),符號(hào)”#”匹配一個(gè)或多個(gè)詞,符號(hào)”*”匹配正好一個(gè)詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機(jī),它采取廣播模式,一個(gè)消息進(jìn)來時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列。

RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會(huì)選擇持久化。消息隊(duì)列持久化包括3個(gè)部分:

exchange持久化,在聲明時(shí)指定durable => 1

queue持久化,在聲明時(shí)指定durable => 1

消息持久化,在投遞時(shí)指定delivery_mode => 2(1是非持久化)

如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個(gè)持久化,一個(gè)非持久化,就不允許建立綁定。

以上內(nèi)容抄自百度百科,RabbitMQ 百度百科

其它關(guān)于RabbitMQ的介紹可以看http://www.cnblogs.com/qiyebao/p/4205626.html

Spring-boot 集成 rabbitmq

添加maven依賴

[html]?view plain?copy?print?

??

org.springframework.boot??

spring-boot-starter-amqp??


簡(jiǎn)單實(shí)現(xiàn)

配置

在application.properties中增加如下配置

[plain]?view plain?copy?print?

spring.rabbitmq.addresses=127.0.0.1:5672??

spring.rabbitmq.username=guest??

spring.rabbitmq.password=guest??

spring.rabbitmq.publisher-confirms=true??

spring.rabbitmq.virtual-host=/??

rabbitmq端口說明:5672-amqp,25672-clustering,61613-stomp,1883-mqtt

消息生產(chǎn)者

[java]?view plain?copy?print?

package?com.rabbitmq.send;??


import?org.springframework.amqp.rabbit.core.RabbitTemplate;??

import?org.springframework.beans.factory.annotation.Autowired;??

import?org.springframework.stereotype.Component;??


@Component??

public?class?Sender?{??


@Autowired??

private?RabbitTemplate?rabbitTemplate;??


public?void?send(String?msg)?{??

this.rabbitTemplate.convertAndSend("foo",?msg);??

????}??

}??

消息監(jiān)聽者

[java]?view plain?copy?print?

package?com.rabbitmq.listener;??


import?org.slf4j.Logger;??

import?org.slf4j.LoggerFactory;??

import?org.springframework.amqp.core.Queue;??

import?org.springframework.amqp.rabbit.annotation.RabbitHandler;??

import?org.springframework.amqp.rabbit.annotation.RabbitListener;??

import?org.springframework.context.annotation.Bean;??

import?org.springframework.context.annotation.Configuration;??

import?org.springframework.messaging.handler.annotation.Payload;??


@Configuration??

@RabbitListener(queues?=?"foo")??

public?class?Listener?{??


private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(Listener.class);??


@Bean??

public?Queue?fooQueue()?{??

return?new?Queue("foo");??

????}??


@RabbitHandler??

public?void?process(@Payload?String?foo)?{??

LOGGER.info("Listener:?"?+?foo);??

????}??

}??

測(cè)試Controller

[java]?view plain?copy?print?

package?com.rabbitmq.controller;??


import?com.rabbitmq.send.Sender;??

import?org.springframework.beans.factory.annotation.Autowired;??

import?org.springframework.web.bind.annotation.GetMapping;??

import?org.springframework.web.bind.annotation.RestController;??


import?javax.servlet.http.HttpServletRequest;??


@RestController??

public?class?RabbitmqController?{??


@Autowired??

private?Sender?sender;??


@GetMapping("/send")??

public?String?send(HttpServletRequest?request,?String?msg)?{??

????????sender.send(msg);??

return?"Send?OK.";??

????}??

}??

測(cè)試

啟動(dòng)服務(wù),在瀏覽器輸入?http://127.0.0.1:8080/send?msg=this%20is%20a%20test,點(diǎn)擊回車可以在控制臺(tái)看到如下輸出

[plain]?view plain?copy?print?

INFO?5559?---?[cTaskExecutor-1]?c.rabbitmq.listener.Listener??:?Listener:?this?is?a?test??

[SimpleAsyncTaskExecutor-1]?INFO??c.rabbitmq.listener.Listener?-?Listener:?this?is?a?test??

帶ConfirmCallback的使用

增加回調(diào)處理,這里不再使用application.properties默認(rèn)配置的方式,會(huì)在程序中顯示的使用文件中的配置信息。

配置

[java]?view plain?copy?print?

package?com.rabbitmq.config;??


import?org.springframework.amqp.rabbit.connection.CachingConnectionFactory;??

import?org.springframework.amqp.rabbit.connection.ConnectionFactory;??

import?org.springframework.amqp.rabbit.core.RabbitTemplate;??

import?org.springframework.beans.factory.annotation.Value;??

import?org.springframework.beans.factory.config.ConfigurableBeanFactory;??

import?org.springframework.context.annotation.Bean;??

import?org.springframework.context.annotation.Configuration;??

import?org.springframework.context.annotation.Scope;??


@Configuration??

public?class?AmqpConfig?{??


public?static?final?String?FOO_EXCHANGE???=?"callback.exchange.foo";??

public?static?final?String?FOO_ROUTINGKEY?=?"callback.routingkey.foo";??

public?static?final?String?FOO_QUEUE??????=?"callback.queue.foo";??


@Value("${spring.rabbitmq.addresses}")??

private?String?addresses;??

@Value("${spring.rabbitmq.username}")??

private?String?username;??

@Value("${spring.rabbitmq.password}")??

private?String?password;??

@Value("${spring.rabbitmq.virtual-host}")??

private?String?virtualHost;??

@Value("${spring.rabbitmq.publisher-confirms}")??

private?boolean?publisherConfirms;??


@Bean??

public?ConnectionFactory?connectionFactory()?{??

CachingConnectionFactory?connectionFactory?=new?CachingConnectionFactory();??

????????connectionFactory.setAddresses(addresses);??

????????connectionFactory.setUsername(username);??

????????connectionFactory.setPassword(password);??

????????connectionFactory.setVirtualHost(virtualHost);??

/**?如果要進(jìn)行消息回調(diào),則這里必須要設(shè)置為true?*/??

????????connectionFactory.setPublisherConfirms(publisherConfirms);??

return?connectionFactory;??

????}??


@Bean??

/**?因?yàn)橐O(shè)置回調(diào)類,所以應(yīng)是prototype類型,如果是singleton類型,則回調(diào)類為最后一次設(shè)置?*/??

@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)??

public?RabbitTemplate?rabbitTemplate()?{??

RabbitTemplate?template?=new?RabbitTemplate(connectionFactory());??

return?template;??

????}??


}??

消息生產(chǎn)者

[java]?view plain?copy?print?

package?com.rabbitmq.send;??


import?com.rabbitmq.config.AmqpConfig;??

import?org.slf4j.Logger;??

import?org.slf4j.LoggerFactory;??

import?org.springframework.amqp.rabbit.core.RabbitTemplate;??

import?org.springframework.amqp.rabbit.support.CorrelationData;??

import?org.springframework.beans.factory.annotation.Autowired;??

import?org.springframework.stereotype.Component;??


import?java.util.UUID;??


@Component??

public?class?Sender?implements?RabbitTemplate.ConfirmCallback?{??


private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(Sender.class);??


private?RabbitTemplate?rabbitTemplate;??


@Autowired??

public?Sender(RabbitTemplate?rabbitTemplate)?{??

this.rabbitTemplate?=?rabbitTemplate;??

this.rabbitTemplate.setConfirmCallback(this);??

????}??


public?void?send(String?msg)?{??

CorrelationData?correlationData?=new?CorrelationData(UUID.randomUUID().toString());??

LOGGER.info("send:?"?+?correlationData.getId());??

this.rabbitTemplate.convertAndSend(AmqpConfig.FOO_EXCHANGE,?AmqpConfig.FOO_ROUTINGKEY,?msg,?correlationData);??

????}??


/**?回調(diào)方法?*/??

@Override??

public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{??

LOGGER.info("confirm:?"?+?correlationData.getId());??

????}??

}??

消息監(jiān)聽者

[java]?view plain?copy?print?

package?com.rabbitmq.listener;??


import?com.rabbitmq.config.AmqpConfig;??

import?org.slf4j.Logger;??

import?org.slf4j.LoggerFactory;??

import?org.springframework.amqp.core.Binding;??

import?org.springframework.amqp.core.BindingBuilder;??

import?org.springframework.amqp.core.DirectExchange;??

import?org.springframework.amqp.core.Queue;??

import?org.springframework.amqp.rabbit.annotation.RabbitHandler;??

import?org.springframework.amqp.rabbit.annotation.RabbitListener;??

import?org.springframework.context.annotation.Bean;??

import?org.springframework.context.annotation.Configuration;??

import?org.springframework.messaging.handler.annotation.Payload;??


@Configuration??

@RabbitListener(queues?=?AmqpConfig.FOO_QUEUE)??

public?class?Listener?{??


private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(Listener.class);??


/**?設(shè)置交換機(jī)類型??*/??

@Bean??

public?DirectExchange?defaultExchange()?{??

/**

?????????*?DirectExchange:按照routingkey分發(fā)到指定隊(duì)列

?????????*?TopicExchange:多關(guān)鍵字匹配

?????????*?FanoutExchange:?將消息分發(fā)到所有的綁定隊(duì)列,無routingkey的概念

?????????*?HeadersExchange?:通過添加屬性key-value匹配

?????????*/??

return?new?DirectExchange(AmqpConfig.FOO_EXCHANGE);??

????}??


@Bean??

public?Queue?fooQueue()?{??

return?new?Queue(AmqpConfig.FOO_QUEUE);??

????}??


@Bean??

public?Binding?binding()?{??

/**?將隊(duì)列綁定到交換機(jī)?*/??

return?BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(AmqpConfig.FOO_ROUTINGKEY);??

????}??


@RabbitHandler??

public?void?process(@Payload?String?foo)?{??

LOGGER.info("Listener:?"?+?foo);??

????}??

}??

或者使用下面的代碼來代替@RabbitHandler注解的process方法

[java]?view plain?copy?print?

@Bean??

public?SimpleMessageListenerContainer?messageContainer()?{??

SimpleMessageListenerContainer?container?=new?SimpleMessageListenerContainer(connectionFactory());??

????container.setQueues(fooQueue());??

container.setExposeListenerChannel(true);??

container.setMaxConcurrentConsumers(1);??

container.setConcurrentConsumers(1);??

container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//設(shè)置確認(rèn)模式手工確認(rèn)??

container.setMessageListener(new?ChannelAwareMessageListener()?{??


@Override??

public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{??

byte[]?body?=?message.getBody();??

LOGGER.info("Listener?onMessage?:?"?+?new?String(body));??

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);?//確認(rèn)消息成功消費(fèi)??

????????}??

????});??

return?container;??

}??

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評(píng)論 2 11
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,578評(píng)論 19 139
  • http://blog.csdn.net/u011493599/article/details/62892490 ...
    sherlock_6981閱讀 7,418評(píng)論 0 3
  • 安裝下面是有Windows和Linux不同系統(tǒng)下的詳細(xì)安裝步驟,這了不再做介紹安裝步驟這里需要注意RabbitMQ...
    RalapHao閱讀 1,669評(píng)論 0 3
  • RabbitMQ作為AMQP的代表性產(chǎn)品,在項(xiàng)目中大量使用。結(jié)合現(xiàn)在主流的spring boot,極大簡(jiǎn)化了開發(fā)過...
    SamHxm閱讀 26,262評(píng)論 6 19

友情鏈接更多精彩內(nèi)容