阿里云之消息服務(wù)實(shí)戰(zhàn)

概述

阿里云消息服務(wù)(Message Service,原 MQS)是阿里云商用的消息中間件服務(wù),基于阿里的飛天系統(tǒng),具有大規(guī)模、高可靠、高可用以及較強(qiáng)的消息堆積能力。由于項(xiàng)目架構(gòu)中,對(duì)數(shù)據(jù)的及時(shí)性沒有過高的要求,故采用消息服務(wù)來解耦接口的數(shù)據(jù)同步功能。

場景描述

在項(xiàng)目中有個(gè)前端管理界面稱為manager模塊,主要功能是配置管理一些配置數(shù)據(jù),支持新增、修改、刪除以及批量新增、批量修改、批量刪除功能,甚至批量從excel文件中導(dǎo)入的功能。對(duì)數(shù)據(jù)操作的同時(shí),需要將數(shù)據(jù)同步到各個(gè)agent的模塊中,agent模塊需要對(duì)這些數(shù)據(jù)進(jìn)行入庫、入redis的操作,系統(tǒng)架構(gòu)圖如下。


初版架構(gòu)圖.png

在不使用消息服務(wù)前,通過接口的方式,在manager模塊增刪改的時(shí)候,將修改的數(shù)據(jù)實(shí)時(shí)同步到各個(gè)agent中。這會(huì)出現(xiàn)一個(gè)問題,如果數(shù)據(jù)量較小的話,同步速度還是可以理解,如果同步量過大的話,會(huì)出現(xiàn)很明顯的卡頓情況??赡芎芏嗯笥褧?huì)問,為何不做異步尼? 異步的確也是一種解決方法,不過當(dāng)某一條數(shù)據(jù)同步失敗的時(shí)候,我們不好獲取這些異常,并且無法追蹤這些異常并進(jìn)行回滾操作,因?yàn)樾枰3謒anager模塊與agent模塊同步回滾,只要agent模塊失敗,manager模塊必須要回滾。由于對(duì)這些數(shù)據(jù)的實(shí)時(shí)性沒有太高的要求,為此引入消息隊(duì)列。

架構(gòu)設(shè)計(jì)

阿里云消息服務(wù)MNS 已經(jīng)提供隊(duì)列(queue)和主題(topic)兩種模型。其中隊(duì)列提供的是一對(duì)多的共享消息消費(fèi)模型,采用客戶端主動(dòng)拉?。≒ull)模式;主題模型提供一對(duì)多的廣播消息消費(fèi)模型,并且采用服務(wù)端主動(dòng)推送(Push)模式。上面兩種模型基本能滿足我們大多數(shù)應(yīng)用場景。
推送模式的好處是即時(shí)性能比較好,但是需要暴露客戶端地址來接收服務(wù)端的消息推送。有些情況下,比如企業(yè)內(nèi)網(wǎng),我們無法暴露推送地址,希望改用拉?。≒ull)的方式。雖然MNS不直接提供這種消費(fèi)模型,但是我們可以結(jié)合主題和隊(duì)列來實(shí)現(xiàn)一對(duì)多的拉取消息消費(fèi)模型。

阿里的官方文檔中,其廣播拉取消息模型最佳實(shí)踐正好符合項(xiàng)目的架構(gòu)模型,故采用其模型。下圖是廣播拉取消息模型。


廣播拉取消息模型.png

根據(jù)該模型的改造,改造了符合當(dāng)前項(xiàng)目場景的業(yè)務(wù)模型。


架構(gòu)圖.png

接口說明

Java SDK(1.1.5)中的CloudPullTopic 默認(rèn)支持上述解決方案。其中MNSClient 提供下面兩個(gè)接口來快速創(chuàng)建CloudPullTopic:

public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)

其中,TopicMeta 是創(chuàng)建topic的meta 設(shè)置, queueNameList里指定topic消息推送的隊(duì)列名列表;needCreateQueue表明queueNameList是否需要?jiǎng)?chuàng)建;queueMetaTemplate是創(chuàng)建queue需要的queue meta 參數(shù)設(shè)置;

生產(chǎn)者manager代碼

作為數(shù)據(jù)生產(chǎn)者的manager端,bean配置文件中配置如下代碼的實(shí)例bean,目的為不斷的插入數(shù)據(jù)到隊(duì)列為消費(fèi)者各個(gè)agent提供數(shù)據(jù)源。

    @Value("${aliyun.mns.accessKeyId}")
    private String accessKeyId;

    @Value("${aliyun.mns.accessKeySecret}")
    private String accessKeySecret;

    @Value("${aliyun.mns.accountEndpoint}")
    private String accountEndpoint;

    @Value("${aliyun.mns.queueNameProducers}")
    private String queueNameProducers;

    @Value("${aliyun.mns.topicNameProducer}")
    private String topicName;
    @Bean
    public MNSClient mnsClient() {
        CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
        MNSClient client = account.getMNSClient();
        return client;
    }

    @Bean
    public CloudPullTopic cloudPullTopic(@Qualifier("mnsClient")MNSClient mnsClient){

        String[] queueArr = queueNameProducers.split(",");
        List list = Arrays.asList(queueArr);

        Vector<String> queueNameList = new Vector<>();
        queueNameList.addAll(list);

        for (String queueName : queueNameList) {
            if (!mnsClient.getQueueRef(queueName).isQueueExist()) {
                QueueMeta qMeta = new QueueMeta();
                qMeta.setQueueName(queueName);
                qMeta.setPollingWaitSeconds(WAIT_SECONDS);
                mnsClient.createQueue(qMeta);
            }
        }

        TopicMeta topicMeta = new TopicMeta();
        topicMeta.setTopicName(topicName);

        return mnsClient.createPullTopic(topicMeta,queueNameList,false,null);
    }

如上代碼中,對(duì)當(dāng)前隊(duì)列做了一個(gè)為空判斷,如果第一個(gè)隊(duì)列不存在,采用新增隊(duì)列的方式:createPullTopic(topicMeta,queueNameList,true,queueMetaTemplate);
其中第三個(gè)參數(shù)true代表當(dāng)前是重新創(chuàng)建隊(duì)列的方式。
同步到隊(duì)列的統(tǒng)一數(shù)據(jù)格式:

/**
 * 結(jié)果類
 *
 * @author huwk
 * @date 2018/12/5
 */
public class SynModel {

    /**
     * 操作方法
     */
    private String action;

    /**
     * 返回對(duì)象
     */
    private Object result;

    public SynModel(){}

    public SynModel(String action, Object result) {
        this.action = action;
        this.result = result;
    }

    public String getAction() {
        return action;
    }

    public void setAction(String action) {
        this.action = action;
    }

    public Object getResult() {
        return result;
    }

    public void setResult(Object result) {
        this.result = result;
    }
}

因?yàn)闃I(yè)務(wù)需要,考慮到隊(duì)列中會(huì)推入不同的格式的數(shù)據(jù),故統(tǒng)一定義一個(gè)實(shí)體類,統(tǒng)一下數(shù)據(jù)結(jié)構(gòu),action表示具體操作的類型,result表示放入的實(shí)體對(duì)象。
如下是發(fā)送消息代碼:

/* 同步隊(duì)列 */
        try {
            /*封裝對(duì)象*/
            EnterpriseWhite white = new EnterpriseWhite();
            white.setStatus(status);
            white.setEnterpriseId(enterpriseId);
            white.setComment(comment);
            white.setCreateTime(new Date());

            SynModel synModel = new SynModel(QueueAction.CreateEnterpriseWhite.name(),white);
            /*發(fā)送消息*/
            publishObjectMessage(synModel);

        } catch (Exception e) {
            logger.error("數(shù)據(jù)接口異常", e);
        }

使用了一個(gè)QueueAction枚舉類型,定義了各種操作某種實(shí)體的類型,這樣下次消費(fèi)者去取數(shù)據(jù)的時(shí)候,根據(jù)其類型可以給指定的消費(fèi)者消費(fèi)。
抽取的推送數(shù)據(jù)給隊(duì)列的公共方法

/**
     * 抽取解析對(duì)象以及發(fā)送消息方法
     *
     * @param synModel
     * @throws JsonProcessingException
     */
    private void publishObjectMessage(SynModel synModel) throws JsonProcessingException {
        /*轉(zhuǎn)換為json字符串*/
        String objStr = objectMapper.writeValueAsString(synModel);

        /*主題消息封裝*/
        TopicMessage topicMessage = new Base64TopicMessage();
        topicMessage.setBaseMessageBody(objStr);

        /*發(fā)送主題消息*/
        cloudPullTopic.publishMessage(topicMessage);
    }

消費(fèi)者agent代碼

定義一個(gè)線程類,讓其在項(xiàng)目啟動(dòng)的時(shí)候就開啟線程。agent模塊根據(jù)FIFO,根據(jù)不同的SynModel類型去從消息隊(duì)列中取消息并消費(fèi)掉,完成一個(gè)完整的消息隊(duì)列的過程。

@Component
public class ManagerSynDataEngine extends Thread {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private ObjectMapper objectMapper;

    @Autowired
    private MNSClient mnsClient;

    @Autowired
    @Qualifier("managerSynDataTreadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @Override
    public void run() {
        try {
            MessageReceiver receiver = new MessageReceiver(mnsClient, queueName);
            while (true) {
                Message message = receiver.receiveMessage();

                String messageBody = message.getMessageBody();

                logger.info("隊(duì)列中的消息: " + messageBody);
                // 當(dāng)線程池內(nèi)的線程全部繁忙時(shí),暫停向線程池派發(fā)任務(wù),等待線程釋放
                while (threadPoolTaskExecutor.getActiveCount() >= threadPoolTaskExecutor.getMaxPoolSize()) {
                    logger.warn("ManagerSynDataEngine thread busy!");
                    Thread.sleep(1000);
                }

                threadPoolTaskExecutor.execute(() -> {
                    try {
                        SynModel synModel = objectMapper.readValue(messageBody, SynModel.class);
                        Class clazz = QueueAction.valueOf(synModel.getAction()).getClazz();
                        Object object = objectMapper.convertValue(synModel.getResult(), clazz);
                        operate(synModel, object);

                    } catch (Exception e) {
                        logger.error("寫入數(shù)據(jù)庫出錯(cuò)。" + "隊(duì)列名:;" + "消息詳情:" + messageBody, e);
                    } finally {
                        /*刪除隊(duì)列元素*/
                        mnsClient.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
                    }
                });
            }
        } catch (Exception e) {
            logger.error("解析對(duì)象錯(cuò)誤", e);
        }
    }
/**
* 業(yè)務(wù)代碼
**/
private void operate(SynModel synModel, Object object) throws Exception {
        /*業(yè)務(wù)代碼*/
        if (QueueAction.CreateEnterpriseWhite.name().equals(synModel.getAction())) {
            enterpriseWhiteService.insert((EnterpriseWhite) object);
        } else if (QueueAction.UpdateEnterpriseWhite.name().equals(synModel.getAction())) {
            enterpriseWhiteService.updateById((EnterpriseWhite) object);
        } else if (QueueAction.DeleteEnterpriseWhite.name().equals(synModel.getAction())) {
            enterpriseWhiteService.deleteByEnterpriseId(((EnterpriseWhite) object).getEnterpriseId());
        }
    }
}

附錄

長輪詢模式

MNS提供了LongPolling類型的ReceiveMessage的方法,只需要在ReceiveMessage的時(shí)候把WaitSecond設(shè)為一個(gè)1-30之間的數(shù)就可以了。使用LongPolling可以讓Request一直掛在Server上,等到有Message的時(shí)候才返回,在保證了第一時(shí)間收到消息的同時(shí)也避免用戶發(fā)送大量無效Request。LongPolling也是MNS的推薦用法。
LongPolling是需要掛HTTP層的長連接在Server上,而對(duì)于Server來說,HTTP層的長連接的資源是比較有限的。為了避免受到一些惡意攻擊,所以MNS對(duì)單用戶的LongPolling連接數(shù)是有限制的。

這里使用了阿里的推薦的最佳實(shí)踐長輪詢模式,防止當(dāng)隊(duì)列中沒有數(shù)據(jù)時(shí)候,大量線程去請(qǐng)求的問題,減少不必要的調(diào)用量,節(jié)省成本。

/**
 * 隊(duì)列消息接收
 *
 * @author huwk
 * @date 2018/12/4
 */
public class MessageReceiver {

    private Logger logger = LoggerFactory.getLogger(getClass());
    public static final int WAIT_SECONDS = 30;

    protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
    protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();

    protected Object lockObj;
    protected String queueName;
    protected CloudQueue cloudQueue;

    public MessageReceiver(MNSClient mnsClient, String queue) {
        cloudQueue = mnsClient.getQueueRef(queue);
        cloudQueue.popMessage();
        queueName = queue;

        synchronized (sLockObjMap) {
            lockObj = sLockObjMap.get(queueName);
            if (lockObj == null) {
                lockObj = new Object();
                sLockObjMap.put(queueName, lockObj);
            }
        }
    }

    public boolean setPolling() {
        synchronized (lockObj) {
            Boolean ret = sPollingMap.get(queueName);
            if (ret == null || !ret) {
                sPollingMap.put(queueName, true);
                return true;
            }
            return false;
        }
    }

    public void clearPolling() {
        synchronized (lockObj) {
            sPollingMap.put(queueName, false);
            lockObj.notifyAll();
            logger.info("喚醒所有線程開始工作!");
        }
    }

    public Message receiveMessage() {
        boolean polling = false;
        while (true) {
            synchronized (lockObj) {
                Boolean p = sPollingMap.get(queueName);
                if (p != null && p) {
                    try {
                        logger.info(" 線程睡眠!");
                        polling = false;
                        lockObj.wait();
                    } catch (InterruptedException e) {
                        logger.error("MessageReceiver 中斷! 隊(duì)列名為 " + queueName);
                        return null;
                    }
                }
            }

            try {
                Message message = null;
                if (!polling) {
                    message = cloudQueue.popMessage();
                    if (message == null) {
                        polling = true;
                        continue;
                    }
                } else {
                    if (setPolling()) {
                        logger.info("線程" + " Polling!");
                    } else {
                        continue;
                    }
                    do {
                        logger.info("線程" + " 保持 Polling!");
                        try {
                            message = cloudQueue.popMessage(WAIT_SECONDS);
                        } catch (Exception e) {
                            logger.error("線程異常 polling popMessage: " + e);
                        }
                    } while (message == null);
                    clearPolling();
                }
                return message;
            } catch (Exception e) {
                // it could be network exception
                logger.error("popMessage時(shí)發(fā)生異常: " + e);
            }
        }
    }
}

當(dāng)隊(duì)列為空的時(shí)候,只會(huì)保持一個(gè)線程去請(qǐng)求,并且保持長連接為30秒,如果30秒內(nèi)有數(shù)據(jù)后,喚醒所有的線程去消費(fèi),一旦發(fā)現(xiàn)隊(duì)列為空,繼續(xù)讓所有線程等待,只保留一個(gè)線程。

至此,一個(gè)簡單的消息服務(wù)實(shí)戰(zhàn)結(jié)束,后續(xù)還會(huì)有更多的高級(jí)應(yīng)用,一步步等待去開辟。

不會(huì)寫文章的程序員不是好的吉他手o( ̄︶ ̄)o

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容