概述
阿里云消息服務(wù)(Message Service,原 MQS)是阿里云商用的消息中間件服務(wù),基于阿里的飛天系統(tǒng),具有大規(guī)模、高可靠、高可用以及較強(qiáng)的消息堆積能力。由于項(xiàng)目架構(gòu)中,對(duì)數(shù)據(jù)的及時(shí)性沒有過高的要求,故采用消息服務(wù)來解耦接口的數(shù)據(jù)同步功能。
場景描述
在項(xiàng)目中有個(gè)前端管理界面稱為manager模塊,主要功能是配置管理一些配置數(shù)據(jù),支持新增、修改、刪除以及批量新增、批量修改、批量刪除功能,甚至批量從excel文件中導(dǎo)入的功能。對(duì)數(shù)據(jù)操作的同時(shí),需要將數(shù)據(jù)同步到各個(gè)agent的模塊中,agent模塊需要對(duì)這些數(shù)據(jù)進(jìn)行入庫、入redis的操作,系統(tǒng)架構(gòu)圖如下。

在不使用消息服務(wù)前,通過接口的方式,在manager模塊增刪改的時(shí)候,將修改的數(shù)據(jù)實(shí)時(shí)同步到各個(gè)agent中。這會(huì)出現(xiàn)一個(gè)問題,如果數(shù)據(jù)量較小的話,同步速度還是可以理解,如果同步量過大的話,會(huì)出現(xiàn)很明顯的卡頓情況??赡芎芏嗯笥褧?huì)問,為何不做異步尼? 異步的確也是一種解決方法,不過當(dāng)某一條數(shù)據(jù)同步失敗的時(shí)候,我們不好獲取這些異常,并且無法追蹤這些異常并進(jìn)行回滾操作,因?yàn)樾枰3謒anager模塊與agent模塊同步回滾,只要agent模塊失敗,manager模塊必須要回滾。由于對(duì)這些數(shù)據(jù)的實(shí)時(shí)性沒有太高的要求,為此引入消息隊(duì)列。
架構(gòu)設(shè)計(jì)
阿里云消息服務(wù)MNS 已經(jīng)提供隊(duì)列(queue)和主題(topic)兩種模型。其中隊(duì)列提供的是一對(duì)多的共享消息消費(fèi)模型,采用客戶端主動(dòng)拉?。≒ull)模式;主題模型提供一對(duì)多的廣播消息消費(fèi)模型,并且采用服務(wù)端主動(dòng)推送(Push)模式。上面兩種模型基本能滿足我們大多數(shù)應(yīng)用場景。
推送模式的好處是即時(shí)性能比較好,但是需要暴露客戶端地址來接收服務(wù)端的消息推送。有些情況下,比如企業(yè)內(nèi)網(wǎng),我們無法暴露推送地址,希望改用拉?。≒ull)的方式。雖然MNS不直接提供這種消費(fèi)模型,但是我們可以結(jié)合主題和隊(duì)列來實(shí)現(xiàn)一對(duì)多的拉取消息消費(fèi)模型。
阿里的官方文檔中,其廣播拉取消息模型最佳實(shí)踐正好符合項(xiàng)目的架構(gòu)模型,故采用其模型。下圖是廣播拉取消息模型。

根據(jù)該模型的改造,改造了符合當(dāng)前項(xiàng)目場景的業(yè)務(wù)模型。

接口說明
Java SDK(1.1.5)中的CloudPullTopic 默認(rèn)支持上述解決方案。其中MNSClient 提供下面兩個(gè)接口來快速創(chuàng)建CloudPullTopic:
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
其中,TopicMeta 是創(chuàng)建topic的meta 設(shè)置, queueNameList里指定topic消息推送的隊(duì)列名列表;needCreateQueue表明queueNameList是否需要?jiǎng)?chuàng)建;queueMetaTemplate是創(chuàng)建queue需要的queue meta 參數(shù)設(shè)置;
生產(chǎn)者manager代碼
作為數(shù)據(jù)生產(chǎn)者的manager端,bean配置文件中配置如下代碼的實(shí)例bean,目的為不斷的插入數(shù)據(jù)到隊(duì)列為消費(fèi)者各個(gè)agent提供數(shù)據(jù)源。
@Value("${aliyun.mns.accessKeyId}")
private String accessKeyId;
@Value("${aliyun.mns.accessKeySecret}")
private String accessKeySecret;
@Value("${aliyun.mns.accountEndpoint}")
private String accountEndpoint;
@Value("${aliyun.mns.queueNameProducers}")
private String queueNameProducers;
@Value("${aliyun.mns.topicNameProducer}")
private String topicName;
@Bean
public MNSClient mnsClient() {
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
MNSClient client = account.getMNSClient();
return client;
}
@Bean
public CloudPullTopic cloudPullTopic(@Qualifier("mnsClient")MNSClient mnsClient){
String[] queueArr = queueNameProducers.split(",");
List list = Arrays.asList(queueArr);
Vector<String> queueNameList = new Vector<>();
queueNameList.addAll(list);
for (String queueName : queueNameList) {
if (!mnsClient.getQueueRef(queueName).isQueueExist()) {
QueueMeta qMeta = new QueueMeta();
qMeta.setQueueName(queueName);
qMeta.setPollingWaitSeconds(WAIT_SECONDS);
mnsClient.createQueue(qMeta);
}
}
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
return mnsClient.createPullTopic(topicMeta,queueNameList,false,null);
}
如上代碼中,對(duì)當(dāng)前隊(duì)列做了一個(gè)為空判斷,如果第一個(gè)隊(duì)列不存在,采用新增隊(duì)列的方式:createPullTopic(topicMeta,queueNameList,true,queueMetaTemplate);
其中第三個(gè)參數(shù)true代表當(dāng)前是重新創(chuàng)建隊(duì)列的方式。
同步到隊(duì)列的統(tǒng)一數(shù)據(jù)格式:
/**
* 結(jié)果類
*
* @author huwk
* @date 2018/12/5
*/
public class SynModel {
/**
* 操作方法
*/
private String action;
/**
* 返回對(duì)象
*/
private Object result;
public SynModel(){}
public SynModel(String action, Object result) {
this.action = action;
this.result = result;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
因?yàn)闃I(yè)務(wù)需要,考慮到隊(duì)列中會(huì)推入不同的格式的數(shù)據(jù),故統(tǒng)一定義一個(gè)實(shí)體類,統(tǒng)一下數(shù)據(jù)結(jié)構(gòu),action表示具體操作的類型,result表示放入的實(shí)體對(duì)象。
如下是發(fā)送消息代碼:
/* 同步隊(duì)列 */
try {
/*封裝對(duì)象*/
EnterpriseWhite white = new EnterpriseWhite();
white.setStatus(status);
white.setEnterpriseId(enterpriseId);
white.setComment(comment);
white.setCreateTime(new Date());
SynModel synModel = new SynModel(QueueAction.CreateEnterpriseWhite.name(),white);
/*發(fā)送消息*/
publishObjectMessage(synModel);
} catch (Exception e) {
logger.error("數(shù)據(jù)接口異常", e);
}
使用了一個(gè)QueueAction枚舉類型,定義了各種操作某種實(shí)體的類型,這樣下次消費(fèi)者去取數(shù)據(jù)的時(shí)候,根據(jù)其類型可以給指定的消費(fèi)者消費(fèi)。
抽取的推送數(shù)據(jù)給隊(duì)列的公共方法
/**
* 抽取解析對(duì)象以及發(fā)送消息方法
*
* @param synModel
* @throws JsonProcessingException
*/
private void publishObjectMessage(SynModel synModel) throws JsonProcessingException {
/*轉(zhuǎn)換為json字符串*/
String objStr = objectMapper.writeValueAsString(synModel);
/*主題消息封裝*/
TopicMessage topicMessage = new Base64TopicMessage();
topicMessage.setBaseMessageBody(objStr);
/*發(fā)送主題消息*/
cloudPullTopic.publishMessage(topicMessage);
}
消費(fèi)者agent代碼
定義一個(gè)線程類,讓其在項(xiàng)目啟動(dòng)的時(shí)候就開啟線程。agent模塊根據(jù)FIFO,根據(jù)不同的SynModel類型去從消息隊(duì)列中取消息并消費(fèi)掉,完成一個(gè)完整的消息隊(duì)列的過程。
@Component
public class ManagerSynDataEngine extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private ObjectMapper objectMapper;
@Autowired
private MNSClient mnsClient;
@Autowired
@Qualifier("managerSynDataTreadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Override
public void run() {
try {
MessageReceiver receiver = new MessageReceiver(mnsClient, queueName);
while (true) {
Message message = receiver.receiveMessage();
String messageBody = message.getMessageBody();
logger.info("隊(duì)列中的消息: " + messageBody);
// 當(dāng)線程池內(nèi)的線程全部繁忙時(shí),暫停向線程池派發(fā)任務(wù),等待線程釋放
while (threadPoolTaskExecutor.getActiveCount() >= threadPoolTaskExecutor.getMaxPoolSize()) {
logger.warn("ManagerSynDataEngine thread busy!");
Thread.sleep(1000);
}
threadPoolTaskExecutor.execute(() -> {
try {
SynModel synModel = objectMapper.readValue(messageBody, SynModel.class);
Class clazz = QueueAction.valueOf(synModel.getAction()).getClazz();
Object object = objectMapper.convertValue(synModel.getResult(), clazz);
operate(synModel, object);
} catch (Exception e) {
logger.error("寫入數(shù)據(jù)庫出錯(cuò)。" + "隊(duì)列名:;" + "消息詳情:" + messageBody, e);
} finally {
/*刪除隊(duì)列元素*/
mnsClient.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
}
});
}
} catch (Exception e) {
logger.error("解析對(duì)象錯(cuò)誤", e);
}
}
/**
* 業(yè)務(wù)代碼
**/
private void operate(SynModel synModel, Object object) throws Exception {
/*業(yè)務(wù)代碼*/
if (QueueAction.CreateEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.insert((EnterpriseWhite) object);
} else if (QueueAction.UpdateEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.updateById((EnterpriseWhite) object);
} else if (QueueAction.DeleteEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.deleteByEnterpriseId(((EnterpriseWhite) object).getEnterpriseId());
}
}
}
附錄
長輪詢模式
MNS提供了LongPolling類型的ReceiveMessage的方法,只需要在ReceiveMessage的時(shí)候把WaitSecond設(shè)為一個(gè)1-30之間的數(shù)就可以了。使用LongPolling可以讓Request一直掛在Server上,等到有Message的時(shí)候才返回,在保證了第一時(shí)間收到消息的同時(shí)也避免用戶發(fā)送大量無效Request。LongPolling也是MNS的推薦用法。
LongPolling是需要掛HTTP層的長連接在Server上,而對(duì)于Server來說,HTTP層的長連接的資源是比較有限的。為了避免受到一些惡意攻擊,所以MNS對(duì)單用戶的LongPolling連接數(shù)是有限制的。
這里使用了阿里的推薦的最佳實(shí)踐長輪詢模式,防止當(dāng)隊(duì)列中沒有數(shù)據(jù)時(shí)候,大量線程去請(qǐng)求的問題,減少不必要的調(diào)用量,節(jié)省成本。
/**
* 隊(duì)列消息接收
*
* @author huwk
* @date 2018/12/4
*/
public class MessageReceiver {
private Logger logger = LoggerFactory.getLogger(getClass());
public static final int WAIT_SECONDS = 30;
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
protected Object lockObj;
protected String queueName;
protected CloudQueue cloudQueue;
public MessageReceiver(MNSClient mnsClient, String queue) {
cloudQueue = mnsClient.getQueueRef(queue);
cloudQueue.popMessage();
queueName = queue;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
}
public boolean setPolling() {
synchronized (lockObj) {
Boolean ret = sPollingMap.get(queueName);
if (ret == null || !ret) {
sPollingMap.put(queueName, true);
return true;
}
return false;
}
}
public void clearPolling() {
synchronized (lockObj) {
sPollingMap.put(queueName, false);
lockObj.notifyAll();
logger.info("喚醒所有線程開始工作!");
}
}
public Message receiveMessage() {
boolean polling = false;
while (true) {
synchronized (lockObj) {
Boolean p = sPollingMap.get(queueName);
if (p != null && p) {
try {
logger.info(" 線程睡眠!");
polling = false;
lockObj.wait();
} catch (InterruptedException e) {
logger.error("MessageReceiver 中斷! 隊(duì)列名為 " + queueName);
return null;
}
}
}
try {
Message message = null;
if (!polling) {
message = cloudQueue.popMessage();
if (message == null) {
polling = true;
continue;
}
} else {
if (setPolling()) {
logger.info("線程" + " Polling!");
} else {
continue;
}
do {
logger.info("線程" + " 保持 Polling!");
try {
message = cloudQueue.popMessage(WAIT_SECONDS);
} catch (Exception e) {
logger.error("線程異常 polling popMessage: " + e);
}
} while (message == null);
clearPolling();
}
return message;
} catch (Exception e) {
// it could be network exception
logger.error("popMessage時(shí)發(fā)生異常: " + e);
}
}
}
}
當(dāng)隊(duì)列為空的時(shí)候,只會(huì)保持一個(gè)線程去請(qǐng)求,并且保持長連接為30秒,如果30秒內(nèi)有數(shù)據(jù)后,喚醒所有的線程去消費(fèi),一旦發(fā)現(xiàn)隊(duì)列為空,繼續(xù)讓所有線程等待,只保留一個(gè)線程。
至此,一個(gè)簡單的消息服務(wù)實(shí)戰(zhàn)結(jié)束,后續(xù)還會(huì)有更多的高級(jí)應(yīng)用,一步步等待去開辟。
不會(huì)寫文章的程序員不是好的吉他手o( ̄︶ ̄)o