在前幾天做了一個需求:外圍系統(tǒng)下發(fā)業(yè)務(wù)數(shù)據(jù)到我方系統(tǒng)做業(yè)務(wù)處理。當(dāng)時對方負(fù)責(zé)人說最多每次只有6萬數(shù)據(jù)量,他們分1000條數(shù)據(jù)一個包傳輸?shù)轿曳较到y(tǒng)。
實現(xiàn)方式
提供實時的rest 接口
這種方式是寫好處理程序,暴露出去,提供給外圍系統(tǒng)rest接口。完成開發(fā)后交付測試。然后對方不按約定出牌,2000條一個包,一共發(fā)了50幾個包,最終結(jié)果就是把我方系統(tǒng)測試機(jī)跑掛了。所以說最重要的是保證自己系統(tǒng)的健壯性,需求往往是變化的。這種實時方式只適合少量的數(shù)據(jù),太大的數(shù)據(jù)會對系統(tǒng)產(chǎn)生影響。于是我修改模式,換為了第二種方式。
消息隊列中間存儲
整體流程
- 接收外圍系統(tǒng)數(shù)據(jù),先不做處理,500條一次存到消息隊列。
- 消費者處理消息隊列中的數(shù)據(jù),每處理一個就提交事務(wù),這樣避免了在第一種實時方法因為提交事務(wù)時間長了而超時。
- 消費設(shè)計:建一個線程池,最大為10,后續(xù)為等待線程,消費者使用線程池啟動線程,每個線程放500數(shù)據(jù)。這種方式根據(jù)不同的消息隊列做不同的設(shè)計,有的消息隊列自帶并發(fā)消費模式,這時可以使用自帶的模式,不用開線程池。
代碼實現(xiàn)
客戶為我提供了rocketmq集群的隊列,我直接用,省去了我在服務(wù)器安裝等操作。這里直接上主要代碼。
- 消息生成者
單例模式,通過spring @value注入消息隊列的集群地址。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* rocketmq消息隊列生產(chǎn)者
*
*/
@Component
public class Producer {
// 定義group
private static final String GROUP = "";
private static String namesrvAddr;
private static DefaultMQProducer producer = new DefaultMQProducer(GROUP);
private static int initialState = 0;
private Producer() {
}
public static DefaultMQProducer getDefaultMQProducer() {
if (producer == null) {
producer = new DefaultMQProducer(GROUP);
}
if (initialState == 0) {
producer.setNamesrvAddr(getNamesrvAddr());
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
return null;
}
initialState = 1;
}
return producer;
}
public static String getNamesrvAddr() {
return namesrvAddr;
}
@Value("${rocketmq.namesrvAddr}")
public void setNamesrvAddr(String namesrvAddr) {
this.namesrvAddr = namesrvAddr;
}
}
- 消息消費者
單例模式
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* rocketmq消息隊列消費者
*
*/
@Component
public class Consumer {
// 要和生產(chǎn)者的group相同
private static final String GROUP = "";
private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);
private static int initialState = 0;
private static String namesrvAddr;
private Consumer() {
}
public static DefaultMQPushConsumer getDefaultMQPushConsumer() {
if (consumer == null) {
consumer = new DefaultMQPushConsumer(GROUP);
}
if (initialState == 0) {
consumer.setNamesrvAddr(getNamesrvAddr());
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// 設(shè)置并發(fā)數(shù)量
consumer.setConsumeThreadMin(5);
consumer.setConsumeThreadMax(10);
initialState = 1;
}
return consumer;
}
public static String getNamesrvAddr() {
return namesrvAddr;
}
@Value("${rocketmq.namesrvAddr}")
public void setNamesrvAddr(String namesrvAddr) {
Consumer.namesrvAddr = namesrvAddr;
}
}
- 消息存儲
封裝發(fā)送方法,其中topic需要到rocketmq控制臺配置,tag自定義,生產(chǎn)和消費保持一致即可。
private void sendMsg(List newList) {
// 獲取消息生產(chǎn)者
DefaultMQProducer producer = Producer.getDefaultMQProducer();
try {
Message msg = new Message(
TOPIC,
TAG,
JSON.toJSONString(newList).getBytes());
SendResult sendResult = producer.send(msg);
LOGGER.info("sendResult:{}", sendResult);
} catch (MQClientException e) {
LOGGER.info("-----發(fā)送失?。? + e.toString());
} catch (RemotingException e) {
LOGGER.info("-----發(fā)送失?。? + e.toString());
} catch (MQBrokerException e) {
LOGGER.info("-----發(fā)送失?。? + e.toString());
} catch (InterruptedException e) {
LOGGER.info("-----發(fā)送失敗:" + e.toString());
}
}
接收到數(shù)據(jù),分500一次,調(diào)用封裝的發(fā)送方法即可存到消息隊列。
- 消息消費
設(shè)置消費者為開機(jī)啟動,只要有消息就消費
import com.alibaba.fastjson.JSONArray;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
/**
* 消費開啟執(zhí)行
*
*/
@Component
public class ConsumerInit implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInit.class);
//和生產(chǎn)者保持一致
private static final String TAG = "";
//和控制臺配置保持一致
private static final String TOPIC = "";
@Autowired
private FixedAssetsService fixedAssetsService;
@Override
public void run(String... strings) throws Exception {
receiveMsg();
}
private void receiveMsg() {
// 獲取消息生產(chǎn)者
DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();
// 訂閱主體
try {
consumer.subscribe(TOPIC, TAG);
//MessageListenerConcurrently 并行消費
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* * 默認(rèn)msgs里唯獨一條消息,能夠通過設(shè)置consumeMessageBatchMaxSize參數(shù)來批量接收消息
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt msg = msgs.get(0);
if (msg.getTopic().equals(TOPIC)) {
if (msg.getTags() != null && msg.getTags().equals(TAG)) {
String message = null;
try {
message = new String(msg.getBody(),"UTF-8");
} catch (UnsupportedEncodingException e) {
LOGGER.info("message轉(zhuǎn)換失敗");
}
List<TransferSapResultVO> list = JSONArray.parseArray(message, TransferSapResultVO.class);
// 消費消息
fixedAssetsService.consumerMqMessage(list);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/**
* Consumer對象在使用之前必須要調(diào)用start初始化。初始化一次就可以<br>
*/
consumer.start();
LOGGER.info("Consumer Started.");
} catch (MQClientException e) {
LOGGER.info("消費消息錯誤" + e.toString());
}
}
}
- 測試結(jié)果
根據(jù)上述步驟,主要的流程代碼已經(jīng)展示。主要就是接收消息-存儲消息-并發(fā)消費消息。 開發(fā)完成后交付測試。和第一種方式一樣2000條數(shù)據(jù)一個包,50幾個包,系統(tǒng)很穩(wěn)定,速度比第一種方式快了至少3倍,數(shù)據(jù)也能正確處理完成。
這種方式在性能,速度等方面都非常良好。但是好景不長,說推過來的10幾萬數(shù)據(jù)少了3條,讓我檢查為什么少。我第一反應(yīng)是難道是丟包了? 如果是丟包,我500條一個包,只能少500的整數(shù)倍啊。然后百度發(fā)現(xiàn)rocketmq的安全性是可以保證的,幾乎不會出現(xiàn)丟包的情況。然后我問對方技術(shù)是不是根本就沒有傳這三條數(shù)據(jù),但對方就是咬死說傳了。為了驗證我說重新在傳輸一次,結(jié)果和第一次一樣,還是丟了這3條。我自己感覺是對方肯定沒有傳,但這種消息隊列的方式的缺點就暴露了出來,缺少監(jiān)控。于是又切換到第三種方式。
中間表模式
整體流程
- 外圍系統(tǒng)插入到我方中間表。插入過程不用我自己管,插入不走程序接口,不會影響系統(tǒng)性能。
- 做定時任務(wù)。每30分鐘并發(fā)消費中間表數(shù)據(jù)。已經(jīng)消費的更新消費標(biāo)識為1。其中消費標(biāo)識為0為未消費,1為已消費。
- 做定時任務(wù)。每7天執(zhí)行一次。刪除一個星期前已經(jīng)消費的數(shù)據(jù)。
- 消費設(shè)計:每30分鐘查詢所有未消費的數(shù)據(jù),并發(fā)進(jìn)行消費。此時有一個問題,如果前一次沒有消費完的數(shù)據(jù),在下一次任務(wù)又會被查詢出來,出現(xiàn)了重復(fù)消費的問題。自己的解決方式是,增加批次號字段,設(shè)定消費標(biāo)志為2 是處理中。
-- 第一步查詢出所有未消費的數(shù)據(jù)。
-- 設(shè)置批次號和消費標(biāo)志為2(批量跟新)。
-- 查詢當(dāng)前批次的數(shù)據(jù)。
-- 消費數(shù)據(jù)。完成后把標(biāo)志置為1。
這種方式有了監(jiān)控的功能,如果說少了數(shù)據(jù),直接在中間表中查,看是否外圍系統(tǒng)推送了數(shù)據(jù)。
總結(jié)
當(dāng)數(shù)據(jù)量較少時,可以用第一種實時的方式,比較方便。
數(shù)據(jù)量大且不需要監(jiān)控功能用第二種方式較好。
數(shù)據(jù)量大且需要監(jiān)控功能用第三種方式。
這次需求自己相當(dāng)于開發(fā)了3次。用了3種模式。 積累了經(jīng)驗。以后在做這種需求時,兩個問題, 數(shù)據(jù)量大不大? 是否需要監(jiān)控?