activeMQ 了解一下(三)——重發(fā)機(jī)制+項(xiàng)目應(yīng)用

  • 項(xiàng)目場景:

推送通知給第三方,并得到第三方的feedback,json交互

  • 問題:

推送過程遇到網(wǎng)絡(luò)異常?第三方異常?數(shù)據(jù)格式錯誤?等等問題怎么辦

【這就是重試機(jī)制了,即什么時候觸發(fā)+觸發(fā)后做什么】

  • 希望達(dá)成的目的:

1.網(wǎng)絡(luò)異常,代碼異常,對方響應(yīng)碼為失敗等無關(guān)業(yè)務(wù)數(shù)據(jù)的推送失敗,重試發(fā)送消息,上限為3次
2.數(shù)據(jù)異常,針對消息里數(shù)據(jù)解析后對方給回失敗響應(yīng),不重發(fā),記錄發(fā)送失敗

  • 實(shí)現(xiàn)方式:

1,使用activemq,mq的好處見參見文章activeMQ了解一下(一)。且其有重發(fā)機(jī)制,剛好適合本場景(配置mq)
2,每次重發(fā),都記錄次數(shù);視情況記錄發(fā)送結(jié)果為失敗/成功(數(shù)據(jù)庫記錄)

搭建mq見文章activeMQ了解一下(二),本文只說如何配置重發(fā)機(jī)制

【第一步,配置xml】

spring-activemq.xml如下

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    xmlns:context="http://www.springframework.org/schema/context"  
    xmlns:tx="http://www.springframework.org/schema/tx"  
    xmlns:mongo="http://www.springframework.org/schema/data/mongo"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans    
           http://www.springframework.org/schema/beans/spring-beans-3.2.xsd    
           http://www.springframework.org/schema/aop     
           http://www.springframework.org/schema/aop/spring-aop-3.2.xsd    
           http://www.springframework.org/schema/tx    
           http://www.springframework.org/schema/tx/spring-tx-3.2.xsd    
           http://www.springframework.org/schema/context    
           http://www.springframework.org/schema/context/spring-context-3.2.xsd">  

    <context:component-scan base-package="com.latech"/>
    <!-- 重發(fā)機(jī)制  -->
    <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
        <!--是否在每次嘗試重新發(fā)送失敗后,增長這個等待時間 -->
        <property name="useExponentialBackOff" value="true"></property>
        <!--重發(fā)次數(shù),默認(rèn)為6次   這里設(shè)置為2次 -->
        <property name="maximumRedeliveries" value="2"></property>
        <!--重發(fā)時間間隔,默認(rèn)為5秒,設(shè)置為1秒 -->
        <property name="initialRedeliveryDelay" value="1000"></property>
        <!--第一次失敗后重新發(fā)送之前等待1秒,第二次失敗再等待1 * 2秒,這里的2就是value -->
        <property name="backOffMultiplier" value="2"></property>
        <!--最大傳送延遲,最大重發(fā)時間間隔時,以后每次重連時間間隔都為最大重連時間間隔。 -->
        <property name="maximumRedeliveryDelay" value="10000"></property>
    </bean>
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL"><value>tcp://192.168.28.2:61616?</value></property>
                <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />
            </bean>
        </property>
        <property name="maxConnections" value="100"></property>
    </bean>

    <!--使用緩存可以提升效率-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
        <property name="targetConnectionFactory" ref="jmsFactory"/>
        <property name="sessionCacheSize" value="100"/>
    </bean>
    
    <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它發(fā)送、接收消息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>    
    <!-- 定義推送中獎消息隊(duì)列(Queue) -->
    <bean id="awardMsgDestinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg name="name" value="awardMsgDestinationQueue"/>
    </bean>  
    <!-- 配置監(jiān)聽者(Queue) -->
    <bean id="awardMsgQueueListener" class="com.latech.notify.consumer.AwardMsgQueueListener" />
    <!-- 配置多個消息監(jiān)聽容器,配置連接工廠,監(jiān)聽的目標(biāo)是defaultDestinationQueue,監(jiān)聽器是上面定義的監(jiān)聽器 -->
    <bean id="queueListenerContainer1" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="awardMsgDestinationQueue" />
        <property name="messageListener" ref="awardMsgQueueListener" />
        <property name="sessionTransacted" value="true"/>
    </bean>
</beans>

敲重點(diǎn):

  • 要配置重發(fā)機(jī)制
  • 監(jiān)聽器要開啟session事務(wù),見配置最后一行

【第二步,代碼里觸發(fā)】

public class AwardMsgQueueListener implements MessageListener{
    
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    
    @Autowired
    private IOrderService orderService;
    
    @Override
    @SuppressWarnings("unchecked")
    public void onMessage(Message message) {
        TextMessage tm = (TextMessage) message;
        Integer sendTimes = 0;
        String msg="",data = "",notifyUrl="",channel="";
        try {
            msg = tm.getText();
        } catch (JMSException e1) {
            logger.error("從消息隊(duì)列獲取消息出現(xiàn)異常,請檢查");
            e1.printStackTrace();
            return;
        }
        logger.info("接收到的消息為:"+msg);
        try{
            Map<String, Map<String, Object>> msgMap = JSONObject.parseObject(msg, new TypeReference<Map<String,Map<String,Object>>>(){});
            notifyUrl = msgMap.keySet().iterator().next();
            data = JSONObject.toJSONString(msgMap.get(notifyUrl));
//          notifyUrl = "http://127.0.0.1:10003/order/receiveMsg.json";
            Map<String, Object> dataMap = JSONObject.parseObject(data,new TypeReference<Map<String,Object>>(){});
            channel = (String)dataMap.get("channel");
            CloseableHttpClient httpClient = HttpClients.createDefault();
            HttpPost httpPost= new HttpPost(notifyUrl);
            httpPost.setHeader("Content-type", "application/json");
            httpPost.setEntity(new StringEntity(data));
            CloseableHttpResponse response = httpClient.execute(httpPost);
            if(200 !=response.getStatusLine().getStatusCode()){
                //請求失敗時,記錄推送次數(shù),狀態(tài)仍為推送中,不提交事務(wù)
                logger.error("推送失敗,statusCode為:"+response.getStatusLine().getStatusCode());
                this.updateOrderAfterFail(data);
                throw new RuntimeException();
            }else{
                HttpEntity responseEntity = response.getEntity();
                if(responseEntity == null || responseEntity.getContent()==null){
                    logger.error("推送商戶中獎信息成功,但返回內(nèi)容為空,url:"+notifyUrl);
                    this.updateOrderAfterFail(data);
                    throw new RuntimeException("推送商戶中獎信息成功,但返回內(nèi)容為空,url:"+notifyUrl);
                }else{
                    StringBuilder entityStringBuilder = new StringBuilder();  
                    BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(responseEntity.getContent(),"UTF-8"), 8 * 1024);  
                    String line = null;  
                    while ((line = bufferedReader.readLine()) != null) {  
                        entityStringBuilder.append(line);  
                    } 
                    String result = entityStringBuilder.toString();
                    Map<String,Object> resultMap = JSONObject.parseObject(result,new TypeReference<Map<String,Object>>(){});
                    if(!resultMap.containsKey("data") || !resultMap.containsKey("security") || !resultMap.containsKey("response")){
                        logger.error("推送商戶中獎信息成功,但返回內(nèi)容不合規(guī),響應(yīng)數(shù)據(jù)中可能不包含“data,security,reponse”中的一個或多個");
                        this.updateOrderAfterFail(data);
                        throw new RuntimeException("推送商戶中獎信息成功,但返回內(nèi)容不合規(guī),響應(yīng)數(shù)據(jù)中可能不包含“data,security,reponse”中的一個或多個");
                    }
                    if("11111".equals((String)resultMap.get("response"))){
                        logger.error("推送商戶中獎信息成功,但返回內(nèi)容響應(yīng)碼為11111");
                        this.updateOrderAfterFail(data);
                        throw new RuntimeException("推送商戶中獎信息成功,但返回內(nèi)容響應(yīng)碼為11111");
                    }
                    List<Map<String,String>> orderList = (List<Map<String,String>>)resultMap.get("data");
                    for(Map<String,String> order : orderList){
                        List<OrderInfo> orderResult = orderService.getByOrderNumAndChannel(order.get("orderNum"),channel);
                        String serialNumber = orderResult.get(0).getSerialNumber();
                        OrderInfoVo existOrder = orderService.queryBySerialNumber(serialNumber);
                        OrderInfo orderInfo = new OrderInfo();
                        orderInfo.setSerialNumber(serialNumber);
                        orderInfo.setSendAwardTimes(existOrder.getSendAwardTimes()+1);
                        orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_SUCCESS.getCode());
                        if("111111".equals(order.get("code"))){
                            orderInfo.setSendAwardFlag(SendAwardFlagEnum.SEND_FAIL.getCode());
                        }
                        orderService.updateByOrderInfo(orderInfo);
                    }
                }
            }
        } catch (Exception e){
            logger.error("推送消息異常捕捉:"+e.getMessage());
            e.printStackTrace();
            this.updateOrderAfterFail(data);
            throw new RuntimeException();//拋出此異常,觸發(fā)重發(fā)機(jī)制
        }
    }

敲重點(diǎn):

在需要重發(fā)時,拋出RuntimeException異常,會自動觸發(fā)重發(fā)機(jī)制
由于還涉及到其他可能的異常,所以整體代碼try..catch..最后統(tǒng)一throw

不過呢,現(xiàn)在還存在一個問題,配置里設(shè)置了重發(fā)次數(shù)為2,即包括初次發(fā),一共發(fā)三次。但是最后數(shù)據(jù)庫顯示是最后發(fā)了6次的,6次是mq默認(rèn)的次數(shù),說明那個配置沒生效,暫時還未解決這個問題

另外,消息隊(duì)列里的消息這里是同步發(fā)送的,即按順序一條條的來。前一條發(fā)送失敗并重發(fā)時候,后一條是等待的。也可以配置異步發(fā)送,這里暫不做研究

PS:這里用到的消費(fèi)者監(jiān)聽器時最簡單基礎(chǔ)的,還有一種SessionAwareMessageListener,重發(fā)機(jī)制的配置會略有不同,待嘗試

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容