RabbitMQ 高可用優(yōu)化

RabbitMQ的主要作用基本上可以用8個(gè)字概括,削峰填谷異步解耦。但是引入MQ我們也不得不考慮引入MQ后帶來的一些問題,如消息丟失。

在一些業(yè)務(wù)場景不一樣,處理方式也就不一樣,比如發(fā)短信,日志收集我們主要看吞吐量所以對消息丟失容忍度較高,這類場景基本上不用花太多時(shí)間在消息丟失問題上。另外一種,如我們用MQ來做分布式事務(wù),續(xù)保計(jì)算,提成的計(jì)算,這類業(yè)務(wù)對消息丟失容忍度較底,所以我們一定要考慮消息丟失的問題。這次分享的內(nèi)容是怎么來最大限制的防止消息丟失,順帶提一下消息的重發(fā)和重復(fù)消費(fèi)。

RabbitMQ 模型圖

RabbitMQ 模型.jpg

ConfirmCallback和ReturnCallback

在這個(gè)里我們主要實(shí)現(xiàn)了ConfirmCallback和ReturnCallback兩個(gè)接口。這兩個(gè)接口主要是用來發(fā)送消息后回調(diào)的。因?yàn)閞abbit發(fā)送消息是只管發(fā),至于發(fā)沒發(fā)成功,發(fā)送方法不管。

  • ConfirmCallback:當(dāng)消息成功到達(dá)exchange的時(shí)候觸發(fā)的ack回調(diào)。
  • ReturnCallback:當(dāng)消息成功到達(dá)exchange,但是沒有隊(duì)列與之綁定的時(shí)候觸發(fā)的ack回調(diào)。發(fā)生網(wǎng)絡(luò)分區(qū)會(huì)出現(xiàn)這種情況。

在這里一定要把這兩個(gè)開關(guān)打開, publisher-confirms="true" publisher-returns="true"。

生產(chǎn)者端使用ConfirmCallback和ReturnCallback回調(diào)機(jī)制,最大限度的保證消息不丟失,對原有CorrelationData類進(jìn)行擴(kuò)展,來實(shí)現(xiàn)消息的重發(fā),具體請看源碼。

消息的日志鏈路跟蹤

使用MQ來解耦服務(wù),異步化處理一些復(fù)雜耗時(shí)邏輯,但是也帶來了一個(gè)問題。由于異步化以后,排查問題就很不方便了,根本不知道這個(gè)消息什么時(shí)候消費(fèi),消費(fèi)的日志也很不好排查。所以引入了Slf4j MDC機(jī)制將主線程的日志鏈路和消息的日志鏈路連起來,方便MQ問題的排查。

RabbitSender

import com.alibaba.fastjson.JSON;
import com.wlqq.insurance.common.enums.MetricNameEnum;
import com.wlqq.insurance.common.enums.SystemTypeEnum;
import com.wlqq.insurance.common.log.core.FisLoggerFactory;
import com.wlqq.insurance.common.mq.CorrelationData;
import com.wlqq.insurance.common.service.AlertService;
import org.slf4j.Logger;
import org.slf4j.MDC;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.UUID;

/**
 * Rabbit 發(fā)送消息
 *
 * @author yuhao.wang
 */
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
    private final Logger logger = FisLoggerFactory.getLogger(RabbitSender.class);

    @Value("${mq.retry.count}")
    private int mqRetryCount;

    /**
     * 告警服務(wù)
     */
    @Autowired
    private AlertService alertService;

    /**
     * Rabbit MQ 客戶端
     */
    private RabbitTemplate rabbitTemplate;

    /**
     * 發(fā)送MQ消息,異步
     *
     * @param exchangeName 交換機(jī)名稱
     * @param routingKey   路由名稱
     * @param message      發(fā)送消息體
     */
    public void sendMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
        Assert.notNull(message, "message 消息體不能為NULL");
        Assert.notNull(exchangeName, "exchangeName 不能為NULL");
        Assert.notNull(routingKey, "routingKey 不能為NULL");
        // 獲取CorrelationData對象
        CorrelationData correlationData = this.correlationData(message, message.getMessageId());
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("發(fā)送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
        // 發(fā)送消息
        this.convertAndSend(exchangeName, routingKey, message, correlationData);
    }

    /**
     * RPC方式,發(fā)送MQ消息
     *
     * @param exchangeName 交換機(jī)名稱
     * @param routingKey   路由名稱
     * @param message      發(fā)送消息體
     */
    public void sendAndReceiveMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
        Assert.notNull(message, "message 消息體不能為NULL");
        Assert.notNull(exchangeName, "exchangeName 不能為NULL");
        Assert.notNull(routingKey, "routingKey 不能為NULL");
        // 獲取CorrelationData對象
        CorrelationData correlationData = this.correlationData(message, message.getMessageId());
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("發(fā)送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);

        rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
    }

    /**
     * 用于實(shí)現(xiàn)消息發(fā)送到RabbitMQ交換器后接收ack回調(diào)。
     * 如果消息發(fā)送確認(rèn)失敗就進(jìn)行重試。
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
        CorrelationData correlationDataExtends = null;
        if (correlationData instanceof CorrelationData) {
            correlationDataExtends = (CorrelationData) correlationData;
            if (correlationDataExtends.getMdcContainer() != null) {
                // 日志鏈路跟蹤
                MDC.setContextMap(correlationDataExtends.getMdcContainer());
            }
        }

        // 消息回調(diào)確認(rèn)失敗處理
        if (!ack) {
            if (correlationDataExtends != null) {
                //消息發(fā)送失敗,就進(jìn)行重試,重試過后還不能成功就記錄到數(shù)據(jù)庫
                if (correlationDataExtends.getRetryCount() < mqRetryCount) {
                    logger.info("MQ消息發(fā)送失敗,消息重發(fā),消息ID:{},重發(fā)次數(shù):{},消息體:{}", correlationDataExtends.getId(),
                            correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));

                    // 將重試次數(shù)加一
                    correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);

                    // 重發(fā)發(fā)消息
                    this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                            correlationDataExtends.getMessage(), correlationDataExtends);
                } else {
                    //消息重試發(fā)送失敗,將消息放到數(shù)據(jù)庫等待補(bǔ)發(fā)
                    logger.error("MQ消息重發(fā)失敗,消息ID:{},消息體:{}", correlationData.getId(),
                            JSON.toJSONString(correlationDataExtends.getMessage()));

                    alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(),
                            correlationDataExtends.getExchange(), null);
                }
            }
        } else {
            logger.info("消息發(fā)送成功,消息ID:{}", correlationData.getId());
        }
    }

    /**
     * 用于實(shí)現(xiàn)消息發(fā)送到RabbitMQ交換器,但無相應(yīng)隊(duì)列與交換器綁定時(shí)的回調(diào)。
     * 在腦裂的情況下會(huì)出現(xiàn)這種情況。
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        // 反序列化消息
        Object msg = rabbitTemplate.getMessageConverter().fromMessage(message);
        if (msg instanceof com.wlqq.insurance.common.mq.message.Message) {
            // 日志鏈路跟蹤
            MDC.setContextMap(((com.wlqq.insurance.common.mq.message.Message) msg).getMdcContainer());
        }

        logger.error("MQ消息發(fā)送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息體:{}",
                replyCode, replyText, exchange, routingKey, JSON.toJSONString(msg));

        alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
    }

    /**
     * 消息相關(guān)數(shù)據(jù)(消息ID)
     *
     * @param message   消息體
     * @param messageId 消息ID
     * @return
     */
    private CorrelationData correlationData(Object message, String messageId) {
        // 消息ID默認(rèn)使用UUID
        if (StringUtils.isEmpty(messageId)) {
            messageId = UUID.randomUUID().toString();
        }
        return new CorrelationData(messageId, message);
    }

    /**
     * 發(fā)送消息
     *
     * @param exchange        交換機(jī)名稱
     * @param routingKey      路由key
     * @param message         消息內(nèi)容
     * @param correlationData 消息相關(guān)數(shù)據(jù)(消息ID)
     * @throws AmqpException
     */
    private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) {
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("MQ消息發(fā)送異常,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);

            alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }
}

CorrelationData

import lombok.Data;
import org.slf4j.MDC;

import java.util.Map;

/**
 * 發(fā)送消息的相關(guān)數(shù)據(jù)
 *
 * @author yuhao.wang
 */
@Data
public class CorrelationData extends org.springframework.amqp.rabbit.support.CorrelationData {


    /**
     * MDC容器
     * 獲取父線程MDC中的內(nèi)容,做日志鏈路
     */
    private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();

    /**
     * 消息體
     */
    private volatile Object message;

    /**
     * 交換機(jī)名稱
     */
    private String exchange;

    /**
     * 路由key
     */
    private String routingKey;

    /**
     * 重試次數(shù)
     */
    private int retryCount = 0;

    public CorrelationData(String id) {
        super(id);
    }

    public CorrelationData(String id, Object data) {
        this(id);
        this.message = data;
    }
}

Message

/**
 * MQ消息的父類消息體
 *
 * @author yuhao.wang
 */
@Data
public class Message implements Serializable {
    private static final long serialVersionUID = -4731326195678504565L;

    /**
     * MDC容器
     * 獲取父線程MDC中的內(nèi)容,做日志鏈路
     */
    private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();

    /**
     * 消息ID(消息的唯一標(biāo)示)
     */
    private String messageId;
}

AbstractConsumer

/**
 * 默認(rèn)消費(fèi)者
 *
 * @author yuhao.wang3
 */
public abstract class AbstractConsumer implements MessageListener {
    private static final Logger LOGGER = FisLoggerFactory.getLogger(AbstractConsumer.class);

    @Override
    public void onMessage(Message msg) {
        String body = null;

        try {
            // 日志鏈路跟蹤邏輯
            body = new String(msg.getBody(), "utf-8");
            DefaultMessage message = JSON.parseObject(body, DefaultMessage.class);
            Map<String, String> container = message.getMdcContainer();
            if (container != null) {
                // 日志鏈路跟蹤
                MDC.setContextMap(message.getMdcContainer());
            }
        } catch (Exception e) {
            LOGGER.warn("沒有找到MQ消息日志鏈路數(shù)據(jù),無法做日志鏈路追蹤");
        }

        try {
            // 處理消息邏輯
            doMessage(msg);
            LOGGER.info("成功處理MQ消息, 消息體:{}", body);
        } catch (Exception e) {
            LOGGER.error("處理MQ消息異常 {}, 消息體:{}", JSON.toJSONString(msg), body, e);
        }
    }

    /**
     * 處理消息的實(shí)現(xiàn)方法
     *
     * @param msg
     */
    public abstract void doMessage(Message msg);
}

源碼

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-rabbitmq 工程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 來源 RabbitMQ是用Erlang實(shí)現(xiàn)的一個(gè)高并發(fā)高可靠AMQP消息隊(duì)列服務(wù)器。支持消息的持久化、事務(wù)、擁塞控...
    jiangmo閱讀 10,513評(píng)論 2 34
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,899評(píng)論 13 425
  • “ 消息隊(duì)列已經(jīng)逐漸成為企業(yè)IT系統(tǒng)內(nèi)部通信的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終一致性等一系列...
    落羽成霜丶閱讀 4,293評(píng)論 1 41
  • 消息隊(duì)列設(shè)計(jì)精要 消息隊(duì)列已經(jīng)逐漸成為企業(yè)IT系統(tǒng)內(nèi)部通信的核心手段。它具有低耦合、可靠投遞、廣播、流量控制、最終...
    meng_philip123閱讀 1,582評(píng)論 1 25
  • 從A市做完采訪,我獨(dú)自驅(qū)車回家的時(shí)候,已是黃昏。 太陽收斂起了灼熱的光芒,藏匿到了山的另一頭。天空呈現(xiàn)出灰暗的藍(lán)色...
    胡岱閱讀 1,005評(píng)論 7 6

友情鏈接更多精彩內(nèi)容