背景
rocketmq支持順序消費(fèi),是很多業(yè)務(wù)中要用的一個(gè)場(chǎng)景,我就好奇他是怎么實(shí)現(xiàn)的,需要了解背后的原理,是怎么支持順序消費(fèi)的,這樣有問(wèn)題的時(shí)候我們才能快速的定位問(wèn)題,這是一個(gè)合格的架構(gòu)師必備的能力。
分配MessageQueue
rocketmq 在啟動(dòng)消費(fèi)時(shí),會(huì)對(duì)topic的mq進(jìn)行reblance,如果是新分配的message queue,如果是順序消費(fèi),即isorder為true。則需要先對(duì)該
message queue 獲取分布式鎖,獲取成功才能真正開(kāi)始消費(fèi),代碼入心:
boolean allMQLocked = true;
List<PullRequest> pullRequestList = new ArrayList<>();
for (MessageQueue mq : mqSet) {
if (!this.processQueueTable.containsKey(mq)) {
//新分配的message queue 如果是順序消費(fèi),需要先獲取鎖,獲取成功
//則創(chuàng)建messagequeue 開(kāi)始拉起數(shù)據(jù),否則不能消費(fèi)給mq。
if (isOrder && !this.lock(mq)) {
log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
//如果獲取失敗,則不消費(fèi)這個(gè)mq。
allMQLocked = false;
continue;
}
//如果是順序消費(fèi),只有獲取成功,才開(kāi)始消費(fèi)的準(zhǔn)備工作。
this.removeDirtyOffset(mq);
ProcessQueue pq = createProcessQueue(topic);
pq.setLocked(true);
long nextOffset = this.computePullFromWhere(mq);
if (nextOffset >= 0) {
ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
if (pre != null) {
log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
} else {
log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
PullRequest pullRequest = new PullRequest();
pullRequest.setConsumerGroup(consumerGroup);
pullRequest.setNextOffset(nextOffset);
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequestList.add(pullRequest);
changed = true;
}
} else {
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
}
獲取鎖
獲取鎖的代碼不需要看,我們只需要關(guān)心下請(qǐng)求參數(shù)即可,因?yàn)殛P(guān)鍵實(shí)現(xiàn)在broker端:
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.getMqSet().add(mq);
try {
Set<MessageQueue> lockedMq =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
for (MessageQueue mmqq : lockedMq) {
ProcessQueue processQueue = this.processQueueTable.get(mmqq);
if (processQueue != null) {
processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
boolean lockOK = lockedMq.contains(mq);
log.info("message queue lock {}, {} {}", lockOK ? "OK" : "Failed", this.consumerGroup, mq);
return lockOK;
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mq, e);
}
順序消費(fèi)獲取鎖的代碼可用看出,需要告訴broker端三個(gè)參數(shù):
- consumer group 消費(fèi)分組。
- 客戶端id,即consumer的標(biāo)識(shí)
- mq,即message queue 是對(duì)那個(gè)queue的順序消費(fèi)。
請(qǐng)求類型是LOCK_BATCH_MQ,broker server 會(huì)用默認(rèn)的processor來(lái)處理這個(gè)請(qǐng)。如果沒(méi)有獲取到鎖,則lockedMq是空的,沒(méi)有直,則返回false,所以接下來(lái),我們看下服務(wù)端是怎么做的,來(lái)保證這個(gè)順序消費(fèi)。
Broker鎖實(shí)現(xiàn)
broker server 處理LOCK_BATCH_MQ 的請(qǐng)求時(shí)通過(guò)defaultRequestProcessorPair來(lái)負(fù)責(zé)處理,defaultRequestProcessorPair是AdminBrokerProcessor,實(shí)現(xiàn)邏輯在lockBatchMQ方法,代碼如下:
private RemotingCommand lockBatchMQ(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
LockBatchRequestBody requestBody = LockBatchRequestBody.decode(request.getBody(), LockBatchRequestBody.class);
Set<MessageQueue> lockOKMQSet = new HashSet<>();
//根據(jù)group和mq,嘗試對(duì)沒(méi)有被其他consumer鎖定會(huì)加鎖,只有沒(méi)有枷鎖的messagequeue,或者其他的鎖已經(jīng)過(guò)期了,才能上鎖。
//selfLockOKMQSet 是成功獲取鎖的message queue
Set<MessageQueue> selfLockOKMQSet = this.brokerController.getRebalanceLockManager().tryLockBatch(
requestBody.getConsumerGroup(),
requestBody.getMqSet(),
requestBody.getClientId());
//看是否要請(qǐng)求其他的server,客戶端發(fā)起的時(shí)false,broker發(fā)起的是true
if (requestBody.isOnlyThisBroker() || !brokerController.getBrokerConfig().isLockInStrictMode()) {
lockOKMQSet = selfLockOKMQSet;
} else {
//設(shè)置OnlyThisBroker為true,讓其他的server接到請(qǐng)求時(shí)不再請(qǐng)求其他的server了
requestBody.setOnlyThisBroker(true);
//獲取副本數(shù)
int replicaSize = this.brokerController.getMessageStoreConfig().getTotalReplicas();
//計(jì)算過(guò)半quorum
int quorum = replicaSize / 2 + 1;
if (quorum <= 1) {
//如果就一個(gè),則不需要再請(qǐng)求其他的broker server
lockOKMQSet = selfLockOKMQSet;
} else {
//有多個(gè)副本,對(duì)所有broker嘗試加鎖。
final ConcurrentMap<MessageQueue, Integer> mqLockMap = new ConcurrentHashMap<>();
//先對(duì)本地加鎖的mq 標(biāo)記為1
for (MessageQueue mq : selfLockOKMQSet) {
if (!mqLockMap.containsKey(mq)) {
mqLockMap.put(mq, 0);
}
mqLockMap.put(mq, mqLockMap.get(mq) + 1);
}
BrokerMemberGroup memberGroup = this.brokerController.getBrokerMemberGroup();
if (memberGroup != null) {
Map<Long, String> addrMap = new HashMap<>(memberGroup.getBrokerAddrs());
addrMap.remove(this.brokerController.getBrokerConfig().getBrokerId());
final CountDownLatch countDownLatch = new CountDownLatch(addrMap.size());
requestBody.setMqSet(selfLockOKMQSet);
requestBody.setOnlyThisBroker(true);
for (Long brokerId : addrMap.keySet()) {
try {
this.brokerController.getBrokerOuterAPI().lockBatchMQAsync(addrMap.get(brokerId),
requestBody, 1000, new LockCallback() {
@Override
public void onSuccess(Set<MessageQueue> lockOKMQSet) {
for (MessageQueue mq : lockOKMQSet) {
if (!mqLockMap.containsKey(mq)) {
mqLockMap.put(mq, 0);
}
//加鎖成功,對(duì)加鎖次數(shù)加1
mqLockMap.put(mq, mqLockMap.get(mq) + 1);
}
countDownLatch.countDown();
}
@Override
public void onException(Throwable e) {
LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
countDownLatch.countDown();
}
});
} catch (Exception e) {
LOGGER.warn("lockBatchMQAsync on {} failed, {}", addrMap.get(brokerId), e);
countDownLatch.countDown();
}
}
try {
countDownLatch.await(2000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.warn("lockBatchMQ exception on {}, {}", this.brokerController.getBrokerConfig().getBrokerName(), e);
}
}
//計(jì)算哪些mq是成功實(shí)現(xiàn)過(guò)半加鎖的,返回給客戶端
for (MessageQueue mq : mqLockMap.keySet()) {
if (mqLockMap.get(mq) >= quorum) {
lockOKMQSet.add(mq);
}
}
}
}
上面的代碼挺多,主要是實(shí)現(xiàn)了兩個(gè)關(guān)鍵點(diǎn),分別是對(duì)本地mq 加鎖,和對(duì)其他的broker server 獲取鎖,計(jì)算加鎖成功的broker server是否過(guò)半,過(guò)半則成功,否則失敗。
- 對(duì)本地message queue 加鎖
看本broker server 的message queue 嘗試獲取鎖,能加鎖成功的條件是沒(méi)有加鎖的mq,或者已經(jīng)加鎖了,但是已經(jīng)過(guò)期了,其他的都是被其他的客戶端鎖定中,關(guān)鍵代碼如下:
private boolean isLocked(final String group, final MessageQueue mq, final String clientId) {
ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
if (groupValue != null) {
LockEntry lockEntry = groupValue.get(mq);
if (lockEntry != null) {
//檢查clientid和是否過(guò)期
boolean locked = lockEntry.isLocked(clientId);
if (locked) {
lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
}
return locked;
}
}
return false;
}
- 分布式鎖
分布式我們都知道需要通過(guò)zk,redis,consul等實(shí)現(xiàn),但是rocketmq并沒(méi)有這樣做,個(gè)人理解是rocketmq 不想因?yàn)檫@個(gè)問(wèn)題要依賴其他的外部組件,因?yàn)橐蕾囈粋€(gè)組件你還要對(duì)依賴組件的穩(wěn)定性,所以自己巧妙的實(shí)現(xiàn)了對(duì)所有broker server message queue 加鎖時(shí),應(yīng)用了leader選舉的思想,因?yàn)閎roker肯定是集群部署,不同的客戶端同時(shí)發(fā)起順序消費(fèi)時(shí),很有可能鏈接的不同的broker server,如果只對(duì)單broker server判斷獲取鎖成功是有問(wèn)題的,通過(guò)對(duì)所有的broker server都獲取鎖,如果有一半以上獲取鎖成功,則肯定是只有一個(gè)客戶端能獲取到鎖,類似leader選舉的思路,是值得學(xué)習(xí)的地方。
定期刷新鎖
順序消費(fèi)的這個(gè)鎖也是一個(gè)鎖租約的機(jī)制,到了時(shí)間不續(xù)租,就釋放了,所以broker分布式鎖除了兩看consumer的客戶端id,還有一個(gè)時(shí)間的限制,如果客戶端出現(xiàn)問(wèn)題,沒(méi)有主動(dòng)更新鎖的時(shí)間,則會(huì)被其他的客戶端獲取到鎖,續(xù)租也有可能是和其他的客戶端并發(fā)的,所以就有可能鎖續(xù)租失敗,失敗了就不能消費(fèi)這個(gè)message queue了,所以在消費(fèi)的時(shí)候需要檢查是否持有鎖,更新是通過(guò)一個(gè)定時(shí)任務(wù)更新的,時(shí)間周期為20秒一次,通過(guò)rocketmq.client.rebalance.lockInterval 變量控制。
還有一個(gè)值得注意的是,一個(gè)topic有多個(gè)message queue,兩個(gè)客戶端同時(shí)發(fā)起順序消費(fèi)時(shí),在獲取分布式鎖時(shí),有可能兩個(gè)分別獲得部分mq的鎖,rocketmq的順序是保證在mq級(jí)別的。
分發(fā)消息
獲取到對(duì)應(yīng)message queue的鎖后,就可以創(chuàng)建pullRequest請(qǐng)求到隊(duì)列messageRequestQueue 中,這時(shí)候拉消息的線程就會(huì)被換醒,去拉消息,拉到消息后,會(huì)把消息緩存在一個(gè)treeMap中,這個(gè)和并發(fā)消費(fèi)是一樣的,添加到treeMap中,返回結(jié)果判斷是否需要提交新的ConsumeRequest task,如果前面的消費(fèi)任務(wù)已經(jīng)消費(fèi)完了,則會(huì)返回true,即需要提交新的ConsumeRequest,代碼如下:
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
順序消費(fèi)在分發(fā)的時(shí)候,不像并發(fā)消費(fèi)一樣,默認(rèn)一個(gè)請(qǐng)求提交一個(gè)ConsumeRequest task到線程執(zhí)行,來(lái)實(shí)現(xiàn)并發(fā)消費(fèi)。
順序消費(fèi)如果沒(méi)有入在消費(fèi)的判斷,在把消息加入到processQueue時(shí)會(huì)判斷有沒(méi)有線程在消費(fèi),如果有,則不能提交消費(fèi)任務(wù),只有沒(méi)有線程消費(fèi)的時(shí)候,才創(chuàng)建一個(gè)ConsumeRequest task到線程池執(zhí)行, 因?yàn)橛刑峤灰粋€(gè)任務(wù)后,會(huì)不斷的從processQueue 的treemap 里獲取message,如果獲取不到了,才把consuming的標(biāo)記設(shè)置為false,下次拉到消息時(shí),就重新提交一個(gè)新的ConsumeRequest。
ConsumeRequest 的run 方法如下:
public void run() {
.....
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| this.processQueue.isLocked() && !this.processQueue.isLockExpired()) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}
//consumeBatchSize 默認(rèn)是1,從tree map里取出一批消息,默認(rèn)是一條消息
final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);
ConsumeOrderlyStatus status = null;
ConsumeMessageContext consumeMessageContext = null;
//.... hook partion
long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//這里需要加鎖,一定是等前面一條消息處理完后,才能繼續(xù)消費(fèi)下一條消息。
this.processQueue.getConsumeLock().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//執(zhí)行業(yè)務(wù)的消費(fèi)代碼
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",
UtilAll.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue), e);
hasException = true;
} finally {
this.processQueue.getConsumeLock().unlock();
}
//去掉部分代碼
long consumeRT = System.currentTimeMillis() - beginTimestamp;
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
代碼有點(diǎn)多,省掉了部分非關(guān)鍵的代碼,ConsumeRequest 的run 方法主要干了如下幾件事情:
- 首先獲取鎖,這個(gè)鎖是以message queue為單位的,就是為每個(gè)message queue 創(chuàng)建了一個(gè)object,通過(guò)對(duì)synchronized 對(duì)object 加鎖,防止并發(fā)執(zhí)行。
- 檢查processqueue 是否還被鎖住,就是前面說(shuō)的,會(huì)定期更新鎖,即續(xù)租成功,就還是locked,如果失敗,則不能消費(fèi)。
- 檢查消費(fèi)的時(shí)間,如果持續(xù)消費(fèi)超過(guò)了1分鐘,說(shuō)明消費(fèi)有瓶頸,則等10毫秒再繼續(xù)消費(fèi)。
- 取消息,從msgTreeMap里獲取消息,默認(rèn)是一次獲取1條,這里還有對(duì)這條消息做了一個(gè)暫存,存在consumingMsgOrderlyTreeMap里面,是用來(lái)消費(fèi)成功后,做commit offset的。
5.獲取 processqueue的consumer lock,拿到鎖后,即開(kāi)始執(zhí)行業(yè)務(wù)的消費(fèi)代碼,這里的鎖不是很理解,順序消費(fèi)的task 同時(shí)只有一個(gè)線程在運(yùn)行,前面已經(jīng)對(duì)message queue加了一個(gè)大鎖。
6.執(zhí)行業(yè)務(wù)的消費(fèi)代碼,獲取消費(fèi)結(jié)果。
7.處理消費(fèi)結(jié)果,如果成功的情況下,會(huì)更新本地的offset,這里不更新到broker server端,還是統(tǒng)一通過(guò)定時(shí)任務(wù)上報(bào)給broker server的。
總結(jié)時(shí)刻
本文對(duì)rocketmq 的順序消費(fèi)模式的代碼擼了一遍,讓我們了解了順序消費(fèi)背后的原理和邏輯,即是怎么保證客戶端能順序消費(fèi)消息的,主要有下幾點(diǎn):
- 順序消費(fèi)時(shí)group級(jí)別對(duì)message queue保證有順序。
- 開(kāi)始消費(fèi)message queue前需要獲取分布式鎖,這里和選舉leader一樣的思路,通過(guò)對(duì)集群的broker都獲取鎖,有一半獲取成功就說(shuō)明加鎖成功。
- 順序消費(fèi)時(shí)拉到消息后,只提交一個(gè)ConsumeRequest任務(wù),甚至有可能不提交,如果前面一個(gè)還在消費(fèi)的情況下,通過(guò)一個(gè)ConsumeRequest來(lái)循環(huán)從msgTree里獲取,默認(rèn)一次取一條消息,來(lái)執(zhí)行業(yè)務(wù)的消費(fèi)代碼,也就是單線程在執(zhí)行,雖然是線程池。
- 每消費(fèi)完一條消息,更新一次消費(fèi)的offset。
注:目前看機(jī)會(huì)中,關(guān)注基礎(chǔ)架構(gòu),中間件,高并發(fā)系統(tǒng)建設(shè)和治理。