RabbitMQ的整體概括
RabbitMQ是對于AMQP(高級消息隊列協(xié)議)的具體實現(xiàn),是一個用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息的網(wǎng)絡通信協(xié)議。
應用解耦
此模型表示,消息中間件broker是消息的容器,它從生產(chǎn)者接收消息,并根據(jù)路由規(guī)則把消息投遞給消費者。需要注意的是,生產(chǎn)者不是直接將消息投遞給broker,而是通過路由鍵(RoutingKey)投遞給交換機(Exchange),交換機綁定具體的隊列(Queue),而消費者消費的消息只與隊列有關(guān),消費將會有推拉兩種模式。
圖示如下:
AMQP協(xié)議模型
AMQP 協(xié)議層角色相關(guān)的概念:
- 生產(chǎn)者(producer):產(chǎn)生消息的應用,能夠傳遞消息到消息中間件的應用。
- 消息中間件(brokers):消息傳遞的中間載體,即我們今天的主角 RabbitMQ。
- 消費者(consumers):接收并處理消息的應用。從消息中間件獲取消息并處理。
- 連接(Connection):生產(chǎn)者或消費者和消息中間件之間需要建立起連接。AMQP 應用層協(xié)議使用的是能夠提供可靠投遞的 TCP 連接,AMQP 的連接通常是長連接,AMQP 使用認證機制并且提供 TLS(SSL)保護。當我們的生產(chǎn)者 或 消費者 不再需要連接到消息中間件的的時候,需要優(yōu)雅的釋放掉它們與消息中間件 TCP 連接,而不是直接將 TCP 連接關(guān)閉。
- 信道(channel):通常情況下生產(chǎn)者 或 消費者 需要與 消息中間件之間建立多個連接。無論怎樣,同時開啟多個 TCP 連接都是不合適的,因為這樣做會消耗掉過多的系統(tǒng)資源。AMQP 協(xié)議提供了信道(channel)這個概念來處理多連接,可以把通道理解成共享一個 TCP 連接的多個輕量化連接。一個特定通道上的通訊與其他通道上的通訊是完全隔離的,因此每個 AMQP 方法都需要攜帶一個通道號,這樣客戶端就可以指定此方法是為哪個信道準備的。
消息中間件相關(guān)的概念:
- 虛擬主機(vHosts):虛擬主機概念,一個 Virtual Host 里面可以有若干個 Exchange 和 Queue,我們可以控制用戶在 Virtual Host 的權(quán)限。
- 用戶(User):最直接了當?shù)恼J證方式,誰可以使用當前的消息中間件。
- 交換機(Exchange):交換機接收生產(chǎn)者發(fā)出的消息并且路由到由交換機類型和被稱作綁定(bindings)的規(guī)則所決定的到隊列中,交換機不存儲消息。
- 消息(message):生產(chǎn)者產(chǎn)生的和消費者處理的消息。
- 路由鍵(routing key):路由關(guān)鍵字,交換機 exchange 的路由規(guī)則利用這個關(guān)鍵字進行消息投遞到消息隊列。(路由鍵長度不能超過 255 個字節(jié))
- 綁定(Binding):Binding 可以理解為交換機 Exchange 路由消息到消息隊列的路由規(guī)則關(guān)系(即消息隊列和交換機的綁定)。當交換機 Exchange 收到生產(chǎn)者傳遞的消息 Message 時會解析其 Routing Key,Exchange 根據(jù) Routing Key 與交換機類型 Exchange Type 將 Message 路由到消息隊列中去。
SpringBoot整合RabbitMQ
簡單了解了rabbitmq的概念,我們來實踐一下與springboot的整合。
首先引入jar包。
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>1.7.9.RELEASE</version>
</dependency>
配置連接工廠
/**
* 創(chuàng)建連接工廠
* @return
*/
@Bean(name = "adapterConnectionFactory")
@Order(value = 2)
public ConnectionFactory adapterConnectionFactory() {
//創(chuàng)建連接工廠
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
//設置集群方式
connectionFactory.setAddresses(rabbitMQProperties.getAddress());
//connectionFactory.setHost(rabbitMQProperties.getHost());設置單節(jié)點方式
//設置端口
connectionFactory.setPort(rabbitMQProperties.getPort());
//設置用戶名
connectionFactory.setUsername(rabbitMQProperties.getUsername());
//設置密碼
connectionFactory.setPassword(rabbitMQProperties.getPassword());
//設置虛擬主機
connectionFactory.setVirtualHost(rabbitMQProperties.getVirtualHost());
//消息確認機制confirm-callback或return-callback,成功后confirm,失敗后回調(diào)
connectionFactory.setPublisherReturns(true);
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
消息發(fā)送的組件RabbitTemplate
/**
* 創(chuàng)建消息發(fā)送組件
* @return
*/
@Bean(name = "adapterRabbitTemplate")
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(adapterConnectionFactory());
//exchange根據(jù)路由鍵匹配不到對應的queue時將會調(diào)用basic.return將消息返還給生產(chǎn)者
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
//消息成功發(fā)送到broker
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
ApiLog.info("mq message send (ACK)status =", ack);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
//消息發(fā)送失敗
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丟失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
return rabbitTemplate;
}
監(jiān)聽器容器工廠SimpleRabbitListenerContainerFactory
/**
* 消費者監(jiān)聽
*
* @return
*/
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(adapterConnectionFactory());
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//單臺并發(fā)消費者數(shù)量
factory.setConcurrentConsumers(10);
//單臺并發(fā)消費的最大消費者數(shù)量
factory.setMaxConcurrentConsumers(30);
//預取消費數(shù)量,unacked數(shù)量超過這個值broker將不會接收消息
factory.setPrefetchCount(5);
//有事務時處理的消息數(shù)
factory.setTxSize(1);
//消息確認機制
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
//構(gòu)建retryConfig,用于在JavaConfig的模式下讀取并發(fā)參數(shù)
RabbitProperties.AmqpContainer config = rabbitMQProperties.getListener().getSimple();
RabbitProperties.ListenerRetry retryConfig = config.getRetry();
RetryInterceptorBuilder<?> builder = (retryConfig.isStateless()
? RetryInterceptorBuilder.stateless()
: RetryInterceptorBuilder.stateful());
//最大重試次數(shù),消費者異常之后
builder.maxAttempts(retryConfig.getMaxAttempts());
builder.backOffOptions(retryConfig.getInitialInterval(),
retryConfig.getMultiplier(), retryConfig.getMaxInterval());
MessageRecoverer recoverer = (this.messageRecoverer != null
? this.messageRecoverer : new RejectAndDontRequeueRecoverer());
builder.recoverer(recoverer);
factory.setAdviceChain(builder.build());
return factory;
}
重試參數(shù)說明,其他參數(shù)略
spring.rabbitmq.listener.simple.retry.enabled=true
//消費者異常之后的最大重試次數(shù),JavaConfig方式需顯示構(gòu)建retryConfig
spring.rabbitmq.listener.simple.retry.max-attempts=4
spring.rabbitmq.listener.simple.retry.initial-interval=2000
spring.rabbitmq.listener.simple.default-requeue-rejected=true
最佳實踐
初始化
@Autowired
private ConnectionFactory connectionFactory;
@Value("${mq.queue.callback_queue}")
private String callbackQueueKey;
@Value("${mq.exchange}")
private String exchange;
@PostConstruct
public void init(){
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
//聲明exchange
Exchange topicExchange = new TopicExchange(exchange);
admin.declareExchange(topicExchange);
//聲明queue
Queue callbackQueue = new Queue(callbackQueueKey, true);
admin.declareQueue(callbackQueue);
//Binding
admin.declareBinding(BindingBuilder.bind(callbackQueue)
.to(topicExchange)
.with(callbackQueueKey).noargs());
}
生產(chǎn)者
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setExchange(exchange);
rabbitTemplate.setRoutingKey(callbackQueueKey);
rabbitTemplate.convertAndSend(callBackRequest, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
MessageProperties properties = message.getMessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
properties.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Obj.class);
return message;
}
});
消費者
@RabbitListener(queues = "queueName", containerFactory = "singleListenerContainer")
public void consumeMessage(@Payload Object obj, Channel channel, Message message) {
doSometing...
}
需要注意的是,消費者的異常沒有捕獲或拋出,或者catch塊里出現(xiàn)異常,在消息確認機制是AUTO的前提下將會無限重試進入死循環(huán),這個時候可以設置最大重試次數(shù)或手動進行ack來處理。
如果需要手動ack,需要實現(xiàn)ChannelAwareMessageListener
@Override
public void onMessage(Message message, Channel channel) throws Exception {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
以上只是項目使用中的簡單闡述,除此還有exchange的4種模式,死信隊列,消息堆積,可靠性投遞機制等待補充。