RabbitMQ學(xué)習(xí)筆記 - RabbitMQ整合SpringBoot

RabbitMQ整合SpringBoot

????你好!歡迎來(lái)到Java成長(zhǎng)筆記,主要是用于相互交流,相互學(xué)習(xí),也希望分享能幫到大家,如有錯(cuò)誤之處,希望指正,謝謝!

????使用SpringBoot整合RabbitMQ能夠使配置更簡(jiǎn)單,使用起來(lái)更方便,也是線上版本使用最多的配置方式。

引入相應(yīng)依賴

主要依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

消息生產(chǎn)端

生產(chǎn)端配置說(shuō)明

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672   # 連接地址
    username: nihao # 對(duì)應(yīng)用戶名稱
    password: 123456 # 對(duì)應(yīng)用戶密碼 
    virtual-host: / # 虛擬主機(jī) 默認(rèn)是/
    connection-timeout: 15000 # 連接超時(shí)時(shí)間
    publisher-confirms: true # 開啟監(jiān)聽Broker端給我們返回的確認(rèn)
    publisher-returns: true # 開啟不可達(dá)的消息進(jìn)行后續(xù)的處理
    template:
      mandatory: true # 消息不可達(dá)不會(huì)自動(dòng)刪除 默認(rèn)false為自動(dòng)刪除

生產(chǎn)端RabbitTemplate注入配置說(shuō)明

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitConfig {

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate rabbitTemplate () {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        final RabbitTemplate rabbitTemplate = new RabbitTemplate (connectionFactory);
        // 如果設(shè)置ReturnCallback,mandatory必須設(shè)置為true,如果為false,消息不可達(dá)會(huì)被刪除
        rabbitTemplate.setMandatory(true);
        // 設(shè)置確認(rèn)請(qǐng)求
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback(){
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.error("correlationData:{}, ack:{}", JSON.toJSONString(correlationData), ack);
                if(!ack){
                    log.error("異常處理:{}", System.currentTimeMillis());
                }
            }
        });
        // 設(shè)置消息不可達(dá) 后續(xù)處理
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(org.springframework.amqp.core.Message message,
                                        int replyCode, String replyText,
                                        String exchange, String routingKey) {
                log.error("exchange:{}, routingKey:{}, replyCode:{}, replyText:{}", exchange, routingKey, replyCode, replyText);
            }
        });
        return rabbitTemplate;
    }
}

生產(chǎn)端代碼

// 相應(yīng)接口
import com.show.model.User;
import java.util.Map;

public interface RabbitService {

    // 發(fā)送消息
    public void sendRabbitMsg(String exchange, String queue, String routingKey, Object message, Map<String, Object> properties);

    // 發(fā)送對(duì)象消息
    public void sendRabbitObjectMsg(String exchange, String queue, String routingKey, User user, Map<String, Object> properties);

}

// 接口實(shí)現(xiàn)
import com.show.model.User;
import com.show.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
import java.util.UUID;

@Service(value = "rabbitService")
@Slf4j
public class RabbitServiceImpl implements RabbitService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    /*
     * @Description: 生成correlationDataId
     * @return java.lang.String
     * @date 2021/3/22 10:12
     */
    public String getUUID () {
        return Optional.ofNullable(UUID.randomUUID())
                .map(UUID::toString)
                .map(w->w.replaceAll("-", "")).get();
    }

    @Override
    public void sendRabbitMsg(String exchange, String queue, String routingKey, Object message, Map<String, Object> properties) {
        final MessageHeaders messageHeaders = new MessageHeaders(properties);
        final Message msg = MessageBuilder.createMessage(message, messageHeaders);
        final CorrelationData correlationData = new CorrelationData(this.getUUID());
        rabbitTemplate.convertAndSend(exchange, routingKey, msg, correlationData);
    }

    @Override
    public void sendRabbitObjectMsg(String exchange, String queue, String routingKey, User user, Map<String, Object> properties) {
        final CorrelationData correlationData = new CorrelationData(this.getUUID());
        rabbitTemplate.convertAndSend(exchange, routingKey, user, correlationData);
    }

}

// 對(duì)應(yīng)測(cè)試類
import com.google.common.collect.ImmutableMap;
import com.show.model.User;
import com.show.service.RabbitConfig;
import com.show.service.RabbitService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.UUID;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = EProducerApplicationTests.class)
@ComponentScan(basePackages = {"com.show.*"})
@Slf4j
public class EProducerApplicationTests {

    private final static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Resource
    private RabbitService rabbitService;

    @Test
    public void sendRabbitMsg() throws Exception {
        final Map<String, Object> properties =
                ImmutableMap.of("number", UUID.randomUUID().toString(),"sendTime", simpleDateFormat.format(new Date()));
        rabbitService.sendRabbitMsg(RabbitConfig.PRODUCER_EXCHANGE_NAME, RabbitConfig.PRODUCER_QUEUE_NAME,
                RabbitConfig.ROUTING_KEY, "Hello RabbitMQ", properties);
    }

    @Test
    public void sendRabbitObjMsg() throws Exception {
        final Map<String, Object> properties =
                ImmutableMap.of("number", UUID.randomUUID().toString(),"sendTime", simpleDateFormat.format(new Date()));
        final User user = new User("simon", "simon", 22, new BigDecimal(100));
        rabbitService.sendRabbitObjectMsg(RabbitConfig.PRODUCER_OBJ_EXCHANGE_NAME, RabbitConfig.PRODUCER_OBJ_QUEUE_NAME,
                RabbitConfig.ROUTING_KEY, user, properties);
    }

}

消息消費(fèi)端

消費(fèi)端配置說(shuō)明

spring:
  rabbitmq:
    addresses: 127.0.0.1:5672 # 設(shè)置連接地址
    username: nihao # 對(duì)應(yīng)用戶名稱
    password: 123456 # 對(duì)應(yīng)用戶密碼 
    virtual-host: /  # 虛擬主機(jī) 默認(rèn)是/
    connection-timeout: 15000 # 連接超時(shí)時(shí)間
    listener:
      simple:
        acknowledge-mode: manual  # 設(shè)置簽收模式 
        concurrency: 5  # 默認(rèn)處理線程數(shù)量
        max-concurrency: 10 # 最大處理線程數(shù)量

消費(fèi)端注解說(shuō)明

// @RabbitListener 組合注解,里面注解配置如下
// @QueueBinding 開啟交換機(jī)和隊(duì)列綁定
// @Queue 設(shè)置綁定隊(duì)列屬性 
//  value 設(shè)置綁定隊(duì)列名稱
//  durable 設(shè)置綁定隊(duì)列是否持久化
// @Exchange 設(shè)置綁定交換機(jī)
//  value 這是交換機(jī)名稱
//  durable 設(shè)置交換機(jī)是否持久化
//  type 交換機(jī)類型 
//  ignoreDeclarationExceptions 是否忽略異常申明
// key 設(shè)置交換機(jī)隊(duì)列key匹配規(guī)則
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(
                value = RabbitConfig.PRODUCER_QUEUE_NAME,
                durable = RabbitConfig.DURABLE
        ),
        exchange = @Exchange(
                value = RabbitConfig.PRODUCER_EXCHANGE_NAME,
                durable = RabbitConfig.DURABLE,
                type = RabbitConfig.TYPE,
                ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
        key = RabbitConfig.PRODUCER_ROUTING_KEY
)
)
@RabbitHandler

消費(fèi)端代碼

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.show.model.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;

@Slf4j
@Component
public class RabbitService {

    /**
     * @Description: 消費(fèi)端處理 Message 消息
     * @Param: [message, channel]
     * @return: void
     * @Author: ly
     * @Date: 2021/3/21 13:20
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(
                    value = RabbitConfig.PRODUCER_QUEUE_NAME,
                    durable = RabbitConfig.DURABLE
            ),
            exchange = @Exchange(
                    value = RabbitConfig.PRODUCER_EXCHANGE_NAME,
                    durable = RabbitConfig.DURABLE,
                    type = RabbitConfig.TYPE,
                    ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
            key = RabbitConfig.PRODUCER_ROUTING_KEY
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        log.error("Message:{}", JSON.toJSONString(message));
        Long deliveryTag = (Long)message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        // 業(yè)務(wù)處理
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

    /**
     * @Description: 消費(fèi)端處理 Object 消息
     * @Param: [user, channel, headers]
     * @return: void
     * @Author: ly
     * @Date: 2021/3/21 14:35
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(
                    value = RabbitConfig.PRODUCER_OBJ_QUEUE_NAME,
                    durable = RabbitConfig.DURABLE
            ),
            exchange = @Exchange(
                    value = RabbitConfig.PRODUCER_OBJ_EXCHANGE_NAME,
                    durable = RabbitConfig.DURABLE,
                    type = RabbitConfig.TYPE,
                    ignoreDeclarationExceptions = RabbitConfig.IGNOREDECLARATIONEXCEPTIONS),
            key = RabbitConfig.PRODUCER_ROUTING_KEY
    )
    )
    @RabbitHandler
    public void onOrderMessage(@Payload User user, Channel channel, @Headers Map<String, Object> headers) throws Exception {
        log.error("User:{}", JSON.toJSONString(user));
        Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
        // 業(yè)務(wù)處理 
        // 手工ACK
        channel.basicAck(deliveryTag, false);
    }

}

本章完結(jié),后續(xù)還會(huì)持續(xù)更新,分享Java成長(zhǎng)筆記,希望我們能一起成長(zhǎng)。如果你覺(jué)得我的分享有用,記得點(diǎn)贊和關(guān)注哦!這對(duì)我是最好的鼓勵(lì)。謝謝!

PS:轉(zhuǎn)載請(qǐng)注明出處!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容