Spring Boot RabbitMQ實踐

背景

我們現(xiàn)在有兩個主要的系統(tǒng)一個是活動系統(tǒng)一個是獎品系統(tǒng),活動系統(tǒng)會調(diào)用獎品系統(tǒng)發(fā)放獎勵。

最開始兩個之間只通過http直接調(diào)用,優(yōu)點:開發(fā)成本低,沒有多余組件引入;發(fā)放獎勵實時返回;活動系統(tǒng)不需要管獎品是否還有剩余庫存。缺點:這樣就導(dǎo)致上游活動系統(tǒng)強依賴于下游的獎品系統(tǒng),如果一旦獎品系統(tǒng)掛掉,我們活動系統(tǒng)也就不可用了;這里還有個bug,在調(diào)用獎品系統(tǒng)發(fā)放獎勵,獎品系統(tǒng)發(fā)放成功了,但是活動系統(tǒng)請求超時了,就會導(dǎo)致提示客戶的沒有獎品了,但是實際獎品又發(fā)放了。

訪問量上來后發(fā)直接走http肯定是不行的,所以引入了MQ將將兩個系統(tǒng)隔離開,優(yōu)點:所有發(fā)放流程異步執(zhí)行,活動系統(tǒng)響應(yīng)更快了;這兩個系統(tǒng)就變成弱引用關(guān)系,即使獎品系統(tǒng)掛掉,活動系統(tǒng)仍能正常運行;不會出現(xiàn)上面說的bug了;缺點:發(fā)放獎勵將會有延遲;引入MQ增加了項目復(fù)雜度,我們必須去考慮消息的丟失,重復(fù)消費等問題;活動系統(tǒng)需要知道獎品的庫存情況。

解決方案

針對上面使用MQ發(fā)放獎勵會遇到的問題,我們可以通過面的方案來解決。

消息的丟失問題

在數(shù)據(jù)庫創(chuàng)建一張異常消息表。

  • 在發(fā)消息的時候如果出現(xiàn)異常,直接將消息記錄到異常消息表,等待后臺跑批,進行補償發(fā)放。
  • 在發(fā)消息的時候,如果發(fā)送消息的ack回調(diào)沒沒有發(fā)送成功,將進行消息重發(fā),如果重發(fā)3次還是失敗,該消息就記錄到異常消息表,等待后臺跑批,進行補償發(fā)放。消息的重復(fù)發(fā)送可以使用RabbitMQ的ConfirmCallback、ReturnCallback機制來實現(xiàn)。
  • 在消費端處理消息(調(diào)用獎品系統(tǒng)發(fā)放獎勵)的時候,如果出現(xiàn)異常也將消息放到異常消息表中,等待后臺跑批,進行補償發(fā)放。如果將異常消息保存到數(shù)據(jù)庫時發(fā)生了異常,則將消息放到死信隊列,等待后臺跑批,進行補償發(fā)放。

這樣子雖然還是不能完全杜絕消息丟失,但是絕大部分情況下是沒有問題的。

重復(fù)消費問題

為每個消息生成業(yè)務(wù)流水號,將流水號和發(fā)放里的參數(shù)一起發(fā)送到獎品系統(tǒng),獎品系統(tǒng)在發(fā)放獎勵的時候先判斷這個流水號是否存在,存在就表示該獎品已經(jīng)發(fā)過來直接返回發(fā)放成功,如果沒有就進行發(fā)放獎勵操作。

活動系統(tǒng)需要知道獎品的庫存情況。

我們在配置活動的時候會將獎品的庫存放到我們活動系統(tǒng),在發(fā)MQ消息之前回去判斷是否有剩余庫存,如果沒有直接返回獎勵領(lǐng)完了,如果有才回去發(fā)MQ消息。扣減庫存可以參考基于redis實現(xiàn)的扣減庫存

活動流程圖

下面是引入MQ過后我們系統(tǒng)的流程圖


MQ解耦系統(tǒng)間的依賴關(guān)系
生產(chǎn)者.png
消費者.png

生產(chǎn)者端實現(xiàn)

/**
 * Rabbit 發(fā)送消息
 *
 * @author yuhao.wang
 */
@Service
public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
    private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);

    /**
     * Rabbit MQ 客戶端
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 系統(tǒng)配置
     */
    @Autowired
    private SystemConfig systemConfig;

    /**
     * 發(fā)送MQ消息
     *
     * @param exchangeName 交換機名稱
     * @param routingKey   路由名稱
     * @param message      發(fā)送消息體
     */
    public void sendMessage(String exchangeName, String routingKey, Object message) {
        Assert.notNull(message, "message 消息體不能為NULL");
        Assert.notNull(exchangeName, "exchangeName 不能為NULL");
        Assert.notNull(routingKey, "routingKey 不能為NULL");

        // 獲取CorrelationData對象
        CorrelationData correlationData = this.correlationData(message);
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);

        logger.info("發(fā)送MQ消息,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
                correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
        // 發(fā)送消息
        this.convertAndSend(exchangeName, routingKey, message, correlationData);
    }

    /**
     * 用于實現(xiàn)消息發(fā)送到RabbitMQ交換器后接收ack回調(diào)。
     * 如果消息發(fā)送確認失敗就進行重試。
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
        // 消息回調(diào)確認失敗處理
        if (!ack && correlationData instanceof CorrelationData) {
            CorrelationData correlationDataExtends = (CorrelationData) correlationData;

            //消息發(fā)送失敗,就進行重試,重試過后還不能成功就記錄到數(shù)據(jù)庫
            if (correlationDataExtends.getRetryCount() < systemConfig.getMqRetryCount()) {
                logger.info("MQ消息發(fā)送失敗,消息重發(fā),消息ID:{},重發(fā)次數(shù):{},消息體:{}", correlationDataExtends.getId(),
                        correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));

                // 將重試次數(shù)加一
                correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);

                // 重發(fā)發(fā)消息
                this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                        correlationDataExtends.getMessage(), correlationDataExtends);
            } else {
                //消息重試發(fā)送失敗,將消息放到數(shù)據(jù)庫等待補發(fā)
                logger.warn("MQ消息重發(fā)失敗,消息入庫,消息ID:{},消息體:{}", correlationData.getId(),
                        JSON.toJSONString(correlationDataExtends.getMessage()));

                // TODO 保存消息到數(shù)據(jù)庫
            }
        } else {
            logger.info("消息發(fā)送成功,消息ID:{}", correlationData.getId());
        }
    }

    /**
     * 用于實現(xiàn)消息發(fā)送到RabbitMQ交換器,但無相應(yīng)隊列與交換器綁定時的回調(diào)。
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.error("MQ消息發(fā)送失敗,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息體:{}",
                replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody()));

        // TODO 保存消息到數(shù)據(jù)庫
    }

    /**
     * 消息相關(guān)數(shù)據(jù)(消息ID)
     *
     * @param message
     * @return
     */
    private CorrelationData correlationData(Object message) {

        return new CorrelationData(UUID.randomUUID().toString(), message);
    }

    /**
     * 發(fā)送消息
     *
     * @param exchange        交換機名稱
     * @param routingKey      路由key
     * @param message         消息內(nèi)容
     * @param correlationData 消息相關(guān)數(shù)據(jù)(消息ID)
     * @throws AmqpException
     */
    private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) throws AmqpException {
        try {
            rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
        } catch (Exception e) {
            logger.error("MQ消息發(fā)送異常,消息ID:{},消息體:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);

            // TODO 保存消息到數(shù)據(jù)庫
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }
}

生產(chǎn)者端使用ConfirmCallback和ReturnCallback回調(diào)機制,最大限度的保證消息不丟失,對原有CorrelationData類進行擴展,來實現(xiàn)消息的重發(fā),具體請看源碼。

消費者端實現(xiàn)


/**
 * 發(fā)放優(yōu)惠券的MQ處理
 *
 * @author yuhao.wang
 */
@Service
@ConditionalOnClass({RabbitTemplate.class})
public class SendMessageListener {

    private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class);

    @RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON)
    public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
        logger.info("[{}]處理發(fā)放優(yōu)惠券獎勵消息隊列接收數(shù)據(jù),消息ID:{},消息體:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON,
                message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage));

        try {
            // 參數(shù)校驗
            Assert.notNull(sendMessage, "sendMessage 消息體不能為NULL");

            // TODO 處理消息

            // 確認消息已經(jīng)消費成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            logger.error("MQ消息處理異常,消息ID:{},消息體:{}", message.getMessageProperties().getCorrelationIdString(),
                    JSON.toJSONString(sendMessage), e);

            try {
                // TODO 保存消息到數(shù)據(jù)庫

                // 確認消息已經(jīng)消費成功
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e1) {
                logger.error("保存異常MQ消息到數(shù)據(jù)庫異常,放到死性隊列,消息ID:{}", message.getMessageProperties().getCorrelationIdString());
                // 確認消息將消息放到死信隊列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    }
}

消費者端主要做了消息消費失敗的容錯處理。

源碼

https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

spring-boot-student-rabbitmq 工程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容