http://blog.csdn.net/zl18310999566/article/details/54341057
RabbitMQ
RabbitMQ是流行的開源消息隊(duì)列系統(tǒng),用erlang語言開發(fā)。RabbitMQ是AMQP(高級(jí)消息隊(duì)列協(xié)議)的標(biāo)準(zhǔn)實(shí)現(xiàn)。
Broker:簡(jiǎn)單來說就是消息隊(duì)列服務(wù)器實(shí)體。
Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
Queue:消息隊(duì)列載體,每個(gè)消息都會(huì)被投入到一個(gè)或多個(gè)隊(duì)列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個(gè)關(guān)鍵字進(jìn)行消息投遞。
vhost:虛擬主機(jī),一個(gè)broker里可以開設(shè)多個(gè)vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費(fèi)者,就是接受消息的程序。
channel:消息通道,在客戶端的每個(gè)連接里,可建立多個(gè)channel,每個(gè)channel代表一個(gè)會(huì)話任務(wù)。
客戶端連接到消息隊(duì)列服務(wù)器,打開一個(gè)channel。
客戶端聲明一個(gè)exchange,并設(shè)置相關(guān)屬性。
客戶端聲明一個(gè)queue,并設(shè)置相關(guān)屬性。
客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。
客戶端投遞消息到exchange。
exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進(jìn)行消息路由,將消息投遞到一個(gè)或多個(gè)隊(duì)列里。
exchange也有幾個(gè)類型,完全根據(jù)key進(jìn)行投遞的叫做Direct交換機(jī),例如,綁定時(shí)設(shè)置了routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會(huì)投遞到隊(duì)列。對(duì)key進(jìn)行模式匹配后進(jìn)行投遞的叫做Topic交換機(jī),符號(hào)”#”匹配一個(gè)或多個(gè)詞,符號(hào)”*”匹配正好一個(gè)詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機(jī),它采取廣播模式,一個(gè)消息進(jìn)來時(shí),投遞到與該交換機(jī)綁定的所有隊(duì)列。
RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會(huì)選擇持久化。消息隊(duì)列持久化包括3個(gè)部分:
exchange持久化,在聲明時(shí)指定durable => 1
queue持久化,在聲明時(shí)指定durable => 1
消息持久化,在投遞時(shí)指定delivery_mode => 2(1是非持久化)
如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個(gè)持久化,一個(gè)非持久化,就不允許建立綁定。
以上內(nèi)容抄自百度百科,RabbitMQ 百度百科
其它關(guān)于RabbitMQ的介紹可以看http://www.cnblogs.com/qiyebao/p/4205626.html
[html]?view plain?copy?print?
??
org.springframework.boot??
spring-boot-starter-amqp??
在application.properties中增加如下配置
[plain]?view plain?copy?print?
spring.rabbitmq.addresses=127.0.0.1:5672??
spring.rabbitmq.username=guest??
spring.rabbitmq.password=guest??
spring.rabbitmq.publisher-confirms=true??
spring.rabbitmq.virtual-host=/??
rabbitmq端口說明:5672-amqp,25672-clustering,61613-stomp,1883-mqtt
[java]?view plain?copy?print?
package?com.rabbitmq.send;??
import?org.springframework.amqp.rabbit.core.RabbitTemplate;??
import?org.springframework.beans.factory.annotation.Autowired;??
import?org.springframework.stereotype.Component;??
@Component??
public?class?Sender?{??
@Autowired??
private?RabbitTemplate?rabbitTemplate;??
public?void?send(String?msg)?{??
this.rabbitTemplate.convertAndSend("foo",?msg);??
????}??
}??
[java]?view plain?copy?print?
package?com.rabbitmq.listener;??
import?org.slf4j.Logger;??
import?org.slf4j.LoggerFactory;??
import?org.springframework.amqp.core.Queue;??
import?org.springframework.amqp.rabbit.annotation.RabbitHandler;??
import?org.springframework.amqp.rabbit.annotation.RabbitListener;??
import?org.springframework.context.annotation.Bean;??
import?org.springframework.context.annotation.Configuration;??
import?org.springframework.messaging.handler.annotation.Payload;??
@Configuration??
@RabbitListener(queues?=?"foo")??
public?class?Listener?{??
private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(Listener.class);??
@Bean??
public?Queue?fooQueue()?{??
return?new?Queue("foo");??
????}??
@RabbitHandler??
public?void?process(@Payload?String?foo)?{??
LOGGER.info("Listener:?"?+?foo);??
????}??
}??
[java]?view plain?copy?print?
package?com.rabbitmq.controller;??
import?com.rabbitmq.send.Sender;??
import?org.springframework.beans.factory.annotation.Autowired;??
import?org.springframework.web.bind.annotation.GetMapping;??
import?org.springframework.web.bind.annotation.RestController;??
import?javax.servlet.http.HttpServletRequest;??
@RestController??
public?class?RabbitmqController?{??
@Autowired??
private?Sender?sender;??
@GetMapping("/send")??
public?String?send(HttpServletRequest?request,?String?msg)?{??
????????sender.send(msg);??
return?"Send?OK.";??
????}??
}??
啟動(dòng)服務(wù),在瀏覽器輸入?http://127.0.0.1:8080/send?msg=this%20is%20a%20test,點(diǎn)擊回車可以在控制臺(tái)看到如下輸出
[plain]?view plain?copy?print?
INFO?5559?---?[cTaskExecutor-1]?c.rabbitmq.listener.Listener??:?Listener:?this?is?a?test??
[SimpleAsyncTaskExecutor-1]?INFO??c.rabbitmq.listener.Listener?-?Listener:?this?is?a?test??
增加回調(diào)處理,這里不再使用application.properties默認(rèn)配置的方式,會(huì)在程序中顯示的使用文件中的配置信息。
[java]?view plain?copy?print?
package?com.rabbitmq.config;??
import?org.springframework.amqp.rabbit.connection.CachingConnectionFactory;??
import?org.springframework.amqp.rabbit.connection.ConnectionFactory;??
import?org.springframework.amqp.rabbit.core.RabbitTemplate;??
import?org.springframework.beans.factory.annotation.Value;??
import?org.springframework.beans.factory.config.ConfigurableBeanFactory;??
import?org.springframework.context.annotation.Bean;??
import?org.springframework.context.annotation.Configuration;??
import?org.springframework.context.annotation.Scope;??
@Configuration??
public?class?AmqpConfig?{??
public?static?final?String?FOO_EXCHANGE???=?"callback.exchange.foo";??
public?static?final?String?FOO_ROUTINGKEY?=?"callback.routingkey.foo";??
public?static?final?String?FOO_QUEUE??????=?"callback.queue.foo";??
@Value("${spring.rabbitmq.addresses}")??
private?String?addresses;??
@Value("${spring.rabbitmq.username}")??
private?String?username;??
@Value("${spring.rabbitmq.password}")??
private?String?password;??
@Value("${spring.rabbitmq.virtual-host}")??
private?String?virtualHost;??
@Value("${spring.rabbitmq.publisher-confirms}")??
private?boolean?publisherConfirms;??
@Bean??
public?ConnectionFactory?connectionFactory()?{??
CachingConnectionFactory?connectionFactory?=new?CachingConnectionFactory();??
????????connectionFactory.setAddresses(addresses);??
????????connectionFactory.setUsername(username);??
????????connectionFactory.setPassword(password);??
????????connectionFactory.setVirtualHost(virtualHost);??
/**?如果要進(jìn)行消息回調(diào),則這里必須要設(shè)置為true?*/??
????????connectionFactory.setPublisherConfirms(publisherConfirms);??
return?connectionFactory;??
????}??
@Bean??
/**?因?yàn)橐O(shè)置回調(diào)類,所以應(yīng)是prototype類型,如果是singleton類型,則回調(diào)類為最后一次設(shè)置?*/??
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)??
public?RabbitTemplate?rabbitTemplate()?{??
RabbitTemplate?template?=new?RabbitTemplate(connectionFactory());??
return?template;??
????}??
}??
[java]?view plain?copy?print?
package?com.rabbitmq.send;??
import?com.rabbitmq.config.AmqpConfig;??
import?org.slf4j.Logger;??
import?org.slf4j.LoggerFactory;??
import?org.springframework.amqp.rabbit.core.RabbitTemplate;??
import?org.springframework.amqp.rabbit.support.CorrelationData;??
import?org.springframework.beans.factory.annotation.Autowired;??
import?org.springframework.stereotype.Component;??
import?java.util.UUID;??
@Component??
public?class?Sender?implements?RabbitTemplate.ConfirmCallback?{??
private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(Sender.class);??
private?RabbitTemplate?rabbitTemplate;??
@Autowired??
public?Sender(RabbitTemplate?rabbitTemplate)?{??
this.rabbitTemplate?=?rabbitTemplate;??
this.rabbitTemplate.setConfirmCallback(this);??
????}??
public?void?send(String?msg)?{??
CorrelationData?correlationData?=new?CorrelationData(UUID.randomUUID().toString());??
LOGGER.info("send:?"?+?correlationData.getId());??
this.rabbitTemplate.convertAndSend(AmqpConfig.FOO_EXCHANGE,?AmqpConfig.FOO_ROUTINGKEY,?msg,?correlationData);??
????}??
/**?回調(diào)方法?*/??
@Override??
public?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{??
LOGGER.info("confirm:?"?+?correlationData.getId());??
????}??
}??
[java]?view plain?copy?print?
package?com.rabbitmq.listener;??
import?com.rabbitmq.config.AmqpConfig;??
import?org.slf4j.Logger;??
import?org.slf4j.LoggerFactory;??
import?org.springframework.amqp.core.Binding;??
import?org.springframework.amqp.core.BindingBuilder;??
import?org.springframework.amqp.core.DirectExchange;??
import?org.springframework.amqp.core.Queue;??
import?org.springframework.amqp.rabbit.annotation.RabbitHandler;??
import?org.springframework.amqp.rabbit.annotation.RabbitListener;??
import?org.springframework.context.annotation.Bean;??
import?org.springframework.context.annotation.Configuration;??
import?org.springframework.messaging.handler.annotation.Payload;??
@Configuration??
@RabbitListener(queues?=?AmqpConfig.FOO_QUEUE)??
public?class?Listener?{??
private?static?final?Logger?LOGGER?=?LoggerFactory.getLogger(Listener.class);??
/**?設(shè)置交換機(jī)類型??*/??
@Bean??
public?DirectExchange?defaultExchange()?{??
/**
?????????*?DirectExchange:按照routingkey分發(fā)到指定隊(duì)列
?????????*?TopicExchange:多關(guān)鍵字匹配
?????????*?FanoutExchange:?將消息分發(fā)到所有的綁定隊(duì)列,無routingkey的概念
?????????*?HeadersExchange?:通過添加屬性key-value匹配
?????????*/??
return?new?DirectExchange(AmqpConfig.FOO_EXCHANGE);??
????}??
@Bean??
public?Queue?fooQueue()?{??
return?new?Queue(AmqpConfig.FOO_QUEUE);??
????}??
@Bean??
public?Binding?binding()?{??
/**?將隊(duì)列綁定到交換機(jī)?*/??
return?BindingBuilder.bind(fooQueue()).to(defaultExchange()).with(AmqpConfig.FOO_ROUTINGKEY);??
????}??
@RabbitHandler??
public?void?process(@Payload?String?foo)?{??
LOGGER.info("Listener:?"?+?foo);??
????}??
}??
或者使用下面的代碼來代替@RabbitHandler注解的process方法
[java]?view plain?copy?print?
@Bean??
public?SimpleMessageListenerContainer?messageContainer()?{??
SimpleMessageListenerContainer?container?=new?SimpleMessageListenerContainer(connectionFactory());??
????container.setQueues(fooQueue());??
container.setExposeListenerChannel(true);??
container.setMaxConcurrentConsumers(1);??
container.setConcurrentConsumers(1);??
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//設(shè)置確認(rèn)模式手工確認(rèn)??
container.setMessageListener(new?ChannelAwareMessageListener()?{??
@Override??
public?void?onMessage(Message?message,?Channel?channel)?throws?Exception?{??
byte[]?body?=?message.getBody();??
LOGGER.info("Listener?onMessage?:?"?+?new?String(body));??
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);?//確認(rèn)消息成功消費(fèi)??
????????}??
????});??
return?container;??
}??