兩個系統(tǒng)大數(shù)據(jù)量對接手記

在前幾天做了一個需求:外圍系統(tǒng)下發(fā)業(yè)務(wù)數(shù)據(jù)到我方系統(tǒng)做業(yè)務(wù)處理。當(dāng)時對方負(fù)責(zé)人說最多每次只有6萬數(shù)據(jù)量,他們分1000條數(shù)據(jù)一個包傳輸?shù)轿曳较到y(tǒng)。

實現(xiàn)方式

提供實時的rest 接口

這種方式是寫好處理程序,暴露出去,提供給外圍系統(tǒng)rest接口。完成開發(fā)后交付測試。然后對方不按約定出牌,2000條一個包,一共發(fā)了50幾個包,最終結(jié)果就是把我方系統(tǒng)測試機(jī)跑掛了。所以說最重要的是保證自己系統(tǒng)的健壯性,需求往往是變化的。這種實時方式只適合少量的數(shù)據(jù),太大的數(shù)據(jù)會對系統(tǒng)產(chǎn)生影響。于是我修改模式,換為了第二種方式。

消息隊列中間存儲

整體流程

  • 接收外圍系統(tǒng)數(shù)據(jù),先不做處理,500條一次存到消息隊列。
  • 消費者處理消息隊列中的數(shù)據(jù),每處理一個就提交事務(wù),這樣避免了在第一種實時方法因為提交事務(wù)時間長了而超時。
  • 消費設(shè)計:建一個線程池,最大為10,后續(xù)為等待線程,消費者使用線程池啟動線程,每個線程放500數(shù)據(jù)。這種方式根據(jù)不同的消息隊列做不同的設(shè)計,有的消息隊列自帶并發(fā)消費模式,這時可以使用自帶的模式,不用開線程池。

代碼實現(xiàn)

客戶為我提供了rocketmq集群的隊列,我直接用,省去了我在服務(wù)器安裝等操作。這里直接上主要代碼。

  • 消息生成者
    單例模式,通過spring @value注入消息隊列的集群地址。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * rocketmq消息隊列生產(chǎn)者
 *
 */
@Component
public class Producer {
   // 定義group
    private static final String GROUP = "";

    private static String namesrvAddr;

    private static DefaultMQProducer producer = new DefaultMQProducer(GROUP);
    private static int initialState = 0;

    private Producer() {

    }

    public static DefaultMQProducer getDefaultMQProducer() {
        if (producer == null) {
            producer = new DefaultMQProducer(GROUP);
        }
        if (initialState == 0) {
            producer.setNamesrvAddr(getNamesrvAddr());
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
                return null;
            }

            initialState = 1;
        }
        return producer;
    }

    public static String getNamesrvAddr() {
        return namesrvAddr;
    }

    @Value("${rocketmq.namesrvAddr}")
    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }
}

  • 消息消費者
    單例模式
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * rocketmq消息隊列消費者
 *
 */
@Component
public class Consumer {
   // 要和生產(chǎn)者的group相同
    private static final String GROUP = "";
    private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);
    private static int initialState = 0;
    private static String namesrvAddr;

    private Consumer() {

    }

    public static DefaultMQPushConsumer getDefaultMQPushConsumer() {
        if (consumer == null) {
            consumer = new DefaultMQPushConsumer(GROUP);
        }

        if (initialState == 0) {
            consumer.setNamesrvAddr(getNamesrvAddr());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 設(shè)置并發(fā)數(shù)量
            consumer.setConsumeThreadMin(5);
            consumer.setConsumeThreadMax(10);
            initialState = 1;
        }

        return consumer;
    }

    public static String getNamesrvAddr() {
        return namesrvAddr;
    }

    @Value("${rocketmq.namesrvAddr}")
    public void setNamesrvAddr(String namesrvAddr) {
        Consumer.namesrvAddr = namesrvAddr;
    }
}
  • 消息存儲
    封裝發(fā)送方法,其中topic需要到rocketmq控制臺配置,tag自定義,生產(chǎn)和消費保持一致即可。
private void sendMsg(List newList) {
        // 獲取消息生產(chǎn)者
        DefaultMQProducer producer = Producer.getDefaultMQProducer();
        try {
            Message msg = new Message(
                    TOPIC,
                    TAG,
                    JSON.toJSONString(newList).getBytes());
            SendResult sendResult = producer.send(msg);
            LOGGER.info("sendResult:{}", sendResult);
        } catch (MQClientException e) {
            LOGGER.info("-----發(fā)送失?。? + e.toString());
        } catch (RemotingException e) {
            LOGGER.info("-----發(fā)送失?。? + e.toString());
        } catch (MQBrokerException e) {
            LOGGER.info("-----發(fā)送失?。? + e.toString());
        } catch (InterruptedException e) {
            LOGGER.info("-----發(fā)送失敗:" + e.toString());
        }
    }

接收到數(shù)據(jù),分500一次,調(diào)用封裝的發(fā)送方法即可存到消息隊列。

  • 消息消費
    設(shè)置消費者為開機(jī)啟動,只要有消息就消費
import com.alibaba.fastjson.JSONArray;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * 消費開啟執(zhí)行
 *
 */
@Component
public class ConsumerInit implements CommandLineRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInit.class);
   //和生產(chǎn)者保持一致
    private static final String TAG = "";
    //和控制臺配置保持一致
    private static final String TOPIC = "";
    @Autowired
    private FixedAssetsService fixedAssetsService;

    @Override
    public void run(String... strings) throws Exception {
        receiveMsg();
    }

    private void receiveMsg() {

        // 獲取消息生產(chǎn)者
        DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();

        // 訂閱主體
        try {
            consumer.subscribe(TOPIC, TAG);
            //MessageListenerConcurrently 并行消費
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 默認(rèn)msgs里唯獨一條消息,能夠通過設(shè)置consumeMessageBatchMaxSize參數(shù)來批量接收消息
                 */
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt msg = msgs.get(0);
                    if (msg.getTopic().equals(TOPIC)) {
                        if (msg.getTags() != null && msg.getTags().equals(TAG)) {
                            String message = null;
                            try {
                                message = new String(msg.getBody(),"UTF-8");
                            } catch (UnsupportedEncodingException e) {
                                LOGGER.info("message轉(zhuǎn)換失敗");
                            }
                            List<TransferSapResultVO> list = JSONArray.parseArray(message, TransferSapResultVO.class);
                            // 消費消息
                            fixedAssetsService.consumerMqMessage(list);
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer對象在使用之前必須要調(diào)用start初始化。初始化一次就可以<br>
             */
            consumer.start();

            LOGGER.info("Consumer Started.");
        } catch (MQClientException e) {
            LOGGER.info("消費消息錯誤" + e.toString());
        }
    }
}
  • 測試結(jié)果
    根據(jù)上述步驟,主要的流程代碼已經(jīng)展示。主要就是接收消息-存儲消息-并發(fā)消費消息。 開發(fā)完成后交付測試。和第一種方式一樣2000條數(shù)據(jù)一個包,50幾個包,系統(tǒng)很穩(wěn)定,速度比第一種方式快了至少3倍,數(shù)據(jù)也能正確處理完成。

這種方式在性能,速度等方面都非常良好。但是好景不長,說推過來的10幾萬數(shù)據(jù)少了3條,讓我檢查為什么少。我第一反應(yīng)是難道是丟包了? 如果是丟包,我500條一個包,只能少500的整數(shù)倍啊。然后百度發(fā)現(xiàn)rocketmq的安全性是可以保證的,幾乎不會出現(xiàn)丟包的情況。然后我問對方技術(shù)是不是根本就沒有傳這三條數(shù)據(jù),但對方就是咬死說傳了。為了驗證我說重新在傳輸一次,結(jié)果和第一次一樣,還是丟了這3條。我自己感覺是對方肯定沒有傳,但這種消息隊列的方式的缺點就暴露了出來,缺少監(jiān)控。于是又切換到第三種方式。

中間表模式

整體流程

  • 外圍系統(tǒng)插入到我方中間表。插入過程不用我自己管,插入不走程序接口,不會影響系統(tǒng)性能。
  • 做定時任務(wù)。每30分鐘并發(fā)消費中間表數(shù)據(jù)。已經(jīng)消費的更新消費標(biāo)識為1。其中消費標(biāo)識為0為未消費,1為已消費。
  • 做定時任務(wù)。每7天執(zhí)行一次。刪除一個星期前已經(jīng)消費的數(shù)據(jù)。
  • 消費設(shè)計:每30分鐘查詢所有未消費的數(shù)據(jù),并發(fā)進(jìn)行消費。此時有一個問題,如果前一次沒有消費完的數(shù)據(jù),在下一次任務(wù)又會被查詢出來,出現(xiàn)了重復(fù)消費的問題。自己的解決方式是,增加批次號字段,設(shè)定消費標(biāo)志為2 是處理中。
    -- 第一步查詢出所有未消費的數(shù)據(jù)。
    -- 設(shè)置批次號和消費標(biāo)志為2(批量跟新)。
    -- 查詢當(dāng)前批次的數(shù)據(jù)。
    -- 消費數(shù)據(jù)。完成后把標(biāo)志置為1。

這種方式有了監(jiān)控的功能,如果說少了數(shù)據(jù),直接在中間表中查,看是否外圍系統(tǒng)推送了數(shù)據(jù)。

總結(jié)

當(dāng)數(shù)據(jù)量較少時,可以用第一種實時的方式,比較方便。
數(shù)據(jù)量大且不需要監(jiān)控功能用第二種方式較好。
數(shù)據(jù)量大且需要監(jiān)控功能用第三種方式。
這次需求自己相當(dāng)于開發(fā)了3次。用了3種模式。 積累了經(jīng)驗。以后在做這種需求時,兩個問題, 數(shù)據(jù)量大不大? 是否需要監(jiān)控?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,641評論 1 32
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進(jìn)程,并將多個進(jìn)程對硬件...
    drfung閱讀 3,756評論 0 5
  • 簡介 RocketMQ 特點 RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給 Ap...
    預(yù)流閱讀 39,498評論 7 55
  • 填滿杯子,和用裝滿的杯子炫耀是一回事。不用把其中一個看得那么“高”,也不須把其中一個看得那么“低”。 我并不是為了...
    aristotler閱讀 2,276評論 1 7
  • 記錄運動筆記的初衷,是在一段時間的健身后,得到了實實在在的好處,并希望根據(jù)自己的身體素質(zhì)慢慢找到適合自己的運動方式...
    薇雨依晴閱讀 650評論 5 9

友情鏈接更多精彩內(nèi)容