廢話
這篇文章主要講RocketMQ的長輪詢,為什么叫長輪詢我也不清楚....主要別人這樣叫我也這樣叫吧,大家明白意思就好。
正文
RcocketMQ消費者的模式是pull模式,也就是會定時向Broker請求消息進(jìn)行消費。在源碼中實現(xiàn)是開啟了后臺線程不停的去pull(當(dāng)然會先從隊列去PullRequest,隊列為空會阻塞),剛研究RocketMQ消費者pull的代碼之后不久,有個問題:
- 如果長時間沒有消息,消費者不停的去請求那不就會導(dǎo)致broker負(fù)載很高嗎
當(dāng)時沒有想太多這個問題,后來才發(fā)現(xiàn)的,那么我們先回顧一下Broker在沒有消息的時候是怎么處理的,首先當(dāng)然是先獲取消息了(下面代碼在PullMessageProcessor.java中)
// ....省略其他代碼
final GetMessageResult getMessageResult =
this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), subscriptionData);
// ....省略其他代碼
然后看下對Result的判斷
switch (getMessageResult.getStatus()) {
// ....省略其他代碼
case NO_MATCHED_LOGIC_QUEUE:
case NO_MESSAGE_IN_QUEUE:
if (0 != requestHeader.getQueueOffset()) {
// ....省略其他代碼
} else {
response.setCode(ResponseCode.PULL_NOT_FOUND);
}
break;
case OFFSET_FOUND_NULL:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
case OFFSET_OVERFLOW_ONE:
response.setCode(ResponseCode.PULL_NOT_FOUND);
break;
}
上面是沒有找到消息的情況,response是要相應(yīng)給消費者的,這是為特定的code。
下面看下對response的code的判斷
switch (response.getCode()) {
case ResponseCode.PULL_NOT_FOUND:
if (brokerAllowSuspend && hasSuspendFlag) {
// 如果broker開啟了長輪詢,則將長輪詢時間設(shè)置為30s(消費者傳過來的,默認(rèn)30s),否則設(shè)置為1s
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
// 將這次請求的信息包括channel全部封裝到PullRequest,并保存到pullRequestTable,即把當(dāng)前的request hold住
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData);
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
}
看到response = null了吧,如果是null,就不會給消費者響應(yīng),那么消費者就不能執(zhí)行相應(yīng)的回調(diào)方法了。
那么這時候又有兩個問題:
- 給或者不給消費者響應(yīng)對消費者有什么影響?
- 此時不給響應(yīng),那么什么時候會給消費者響應(yīng)?
先看下第一個問題,第一個問題的答案在MQClientAPIImpl的pullMessageAsync(由于源碼里寫的異步,所以調(diào)用的該異步方法,同步的情況就不一樣了)
this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
@Override
public void operationComplete(ResponseFuture responseFuture) {
RemotingCommand response = responseFuture.getResponseCommand();
if (response != null) {
try {
PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response);
assert pullResult != null;
pullCallback.onSuccess(pullResult);
} catch (Exception e) {
pullCallback.onException(e);
}
} else {
// ....
}
}
});
response==null的在這次的討論范圍,先看下響應(yīng)了會怎樣,會調(diào)用回調(diào)方法pullcallback,傳入Response,這個定義在DefaultMQPushConsumerImpl的pullMessage方法中
// 省略其他代碼
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
switch (pullResult.getPullStatus()) {
case FOUND:
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
break;
case NO_NEW_MSG:
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
}
}
}
@Override
public void onException(Throwable e) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PullTimeDelayMillsWhenException);
}
};
如果收到了響應(yīng),到最后會調(diào)用executePullRequestImmediately或者executePullRequestLater(底層也是調(diào)用executePullRequestImmediately,只不過是過一會采取執(zhí)行),其作用就是將pullRequest放回隊列,嘿嘿,那么第一個問題答案就出來了:
- 如果是成功,那么當(dāng)然是沒問題,我消費完成,把pullRequest放回隊列,后臺線程會從隊列取出來繼續(xù)請求下一個消息;
- 如果是失敗,由于Broker沒有返回,所以,自然執(zhí)行不到這里,那么后臺線程還是阻塞狀態(tài)知道返回失敗,把隊列放回去,才進(jìn)行下一次嘗試
那么接下來就是看下第二個問題的答案是什么了,這個問題嘛,需要先看下《ConsumeQueue介紹和其構(gòu)建過程》這篇文章,在ConsumeQueue定時構(gòu)建過程中,有幾句代碼
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode());
}
ConsumeQueue在CommitLog有消息寫入的時候(即有Producer發(fā)送了消息),會進(jìn)行構(gòu)建,就會調(diào)用上面那行代碼,那么這個arriving方法到底干了什么,我們進(jìn)去看看,核心是調(diào)用了PullRequestHoldService的notifyMessageArriving方法
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode) {
String key = this.buildKey(topic, queueId);
ManyPullRequest mpr = this.pullRequestTable.get(key);
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
if (requestList != null) {
List<PullRequest> replayList = new ArrayList<PullRequest>();
for (PullRequest request : requestList) {
long newestOffset = maxOffset;
if (newestOffset <= request.getPullFromThisOffset()) {
newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQuque(topic, queueId);
}
Long tmp = tagsCode;
if (newestOffset > request.getPullFromThisOffset()) {
if (tagsCode == null) {
// tmp = getLatestMessageTagsCode(topic, queueId,
// maxOffset);
}
if (this.messageFilter.isMessageMatched(request.getSubscriptionData(), tmp)) {
try {
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (RemotingCommandException e) {
log.error("", e);
}
continue;
}
}
// 如果已經(jīng)超過限制了,那么再次重試,如果還是失敗,那么直接返回
if (System.currentTimeMillis() >= (request.getSuspendTimestamp() + request.getTimeoutMillis())) {
try {
this.brokerController.getPullMessageProcessor().excuteRequestWhenWakeup(request.getClientChannel(),
request.getRequestCommand());
} catch (RemotingCommandException e) {
log.error("", e);
}
continue;
}
replayList.add(request);
}
if (!replayList.isEmpty()) {
mpr.addPullRequest(replayList);
}
}
}
}
邏輯很簡單,先從pullRequestTable中取出對于topic+qid下hold住的PullRequest,如果tag是相符的,那么調(diào)用PullMessageProcessor.this.processRequest模擬broker處理消費者的請求消息,并做回響應(yīng),這里進(jìn)行調(diào)用如果還是查不到消息,那么不會再hold住Request,直接返回
看到這,第二個問題也得到了答案,整個長輪詢的原理也清楚了,總結(jié)一下:
- 消費者會不停的從PullRequest的隊列里取request然后想broker請求消息,得到broker的響應(yīng)后會做相應(yīng)處理并把PullRequest放回隊列以便下一次請求
- broker在查不到消息的情況下會hold住請求,在ReputMessageService不停構(gòu)建ConsumeQueue的時候,會拿出hold住的請求進(jìn)行二次處理