在Consumer消費(fèi)的時候總有幾個疑問:
- 消費(fèi)完成后,這個消費(fèi)進(jìn)度存在哪里
- 消費(fèi)完成后,還沒保存消費(fèi)進(jìn)度就掛了,會不會導(dǎo)致重復(fù)消費(fèi)
Consumer
消費(fèi)進(jìn)度保存
消費(fèi)完成后,會返回一個ConsumeConcurrentlyStatus.CONSUME_SUCCESS告訴MQ消費(fèi)成功,以MessageListener的consumeMessage為入口分析。
消費(fèi)的時候,是以ConsumeRequest類為Runnable對象,在線程池中進(jìn)行處理的,即ConsumeRequest的run方法會處理這個狀態(tài)
@Override
public void run() {
//....
status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
// 如果這個ProcessQueue廢棄了,則不處理
if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}
}
在消費(fèi)完成后,將status交給processConsumeResult處理,代碼如下
public void processConsumeResult(//
final ConsumeConcurrentlyStatus status, //
final ConsumeConcurrentlyContext context, //
final ConsumeRequest consumeRequest//
) {
//....消費(fèi)成功或者失敗的處理
// 將這批消息從ProcessQueue中移除,代表消費(fèi)完畢,并返回當(dāng)前ProcessQueue中的消息最小的offset
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
// 更新消費(fèi)進(jìn)度
this.defaultMQPushConsumerImpl.getOffsetStore()
.updateOffset(consumeRequest.getMessageQueue(), offset, true);
}
}
在分析ProcessQueue的時候,說過removeMessage返回有兩種情況:
- 如果移除這批消息之后已經(jīng)沒有消息了,那么返回ProcessQueue中最大的offset+1
- 如果還有消息,那么返回treeMap中最小的key,即未消費(fèi)的消息中最小的offset
getOffsetStore返回RemoteBrokerOffsetStore,看下其實現(xiàn)
@Override
public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
if (mq != null) {
// 通過MessageQueue獲取本地的對應(yīng)的消費(fèi)進(jìn)度
AtomicLong offsetOld = this.offsetTable.get(mq);
if (null == offsetOld) {
offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
}
if (null != offsetOld) {
//increaseOnly 為false則直接覆蓋
//increaseOnly為true則會判斷更新的值比老的值大才會進(jìn)行更新
if (increaseOnly) {
MixAll.compareAndIncreaseOnly(offsetOld, offset);
} else {
offsetOld.set(offset);
}
}
}
}
這里的increaseOnly參數(shù)根據(jù)不同的情況傳入不同的值,有些情況下會出現(xiàn)并發(fā)修改的情況,那么需要傳入true,內(nèi)部會進(jìn)行CAS的操作,能保證正確的賦值,而一些場景下,只需要進(jìn)行直接覆蓋或者說沒有并發(fā)修改的問題那么傳入false就行了。
消費(fèi)進(jìn)度持久化
offsetTable是一個Map,其保存了消費(fèi)進(jìn)度,這只一個內(nèi)存的結(jié)構(gòu),在Consumer啟動的時候,會啟動一個定時任務(wù)將本地的數(shù)據(jù)同步到broker,每persistConsumerOffsetInterval(默認(rèn)為5)秒進(jìn)行一次操作
// mqs為需要持久化的隊列集合
public void persistAll(Set<MessageQueue> mqs) {
if (null == mqs || mqs.isEmpty())
return;
final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
if (mqs != null && !mqs.isEmpty()) {
// 遍歷本地的消費(fèi)進(jìn)度
for(Map.Entry<MessageQueue, AtomicLong> entry:this.offsetTable.entrySet()){
MessageQueue mq = entry.getKey();
AtomicLong offset = entry.getValue();
if (offset != null) {
// 如果該隊列在需要持久化的隊列中
if (mqs.contains(mq)) {
try {
// 將消費(fèi)進(jìn)度發(fā)送到broker
this.updateConsumeOffsetToBroker(mq, offset.get());
} catch (Exception e) {
log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
}
} else {//廢棄的消費(fèi)進(jìn)度
unusedMQ.add(mq);
}
}
}
}
// 如果有廢棄的MQ,則將其消費(fèi)進(jìn)度廢棄
if (!unusedMQ.isEmpty()) {
for (MessageQueue mq : unusedMQ) {
this.offsetTable.remove(mq);
}
}
}
傳入的是當(dāng)前Consumer分配的MessageQueue列表,rebalance之后,可能分配的MessageQueue已經(jīng)變化,所以offsetTable里有些消費(fèi)進(jìn)度的隊列時不需要的,所以將它的消費(fèi)進(jìn)度廢棄
updateConsumeOffsetToBroker方法就是簡單的網(wǎng)絡(luò)請求,將offset發(fā)送給Broker
消費(fèi)進(jìn)度提交
除了定時提交消費(fèi)進(jìn)度之外,在拉取消息的時候,會順便將本地的消費(fèi)進(jìn)度一起傳到broker,例如查看拉取消息的方法DefaultMQPushConsumerImpl#pullMessage中的一段代碼
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
// 集群消費(fèi)模式
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
// 通過offsetStore獲取當(dāng)前消費(fèi)進(jìn)度
// ReadOffsetType.READ_FROM_MEMORY表示從本地獲取(即offsetTable)
commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
if (commitOffsetValue > 0) {//
// 傳給Broker,讓其判斷是否需要保存消費(fèi)進(jìn)度
commitOffsetEnable = true;
}
}
// 構(gòu)造一些標(biāo)志位,這里主要看commitOffsetEnable值
// 將commitOffsetEnable放到一個int類型的值中,讓broker判斷是否需要保存消費(fèi)進(jìn)度
int sysFlag = PullSysFlag.buildSysFlag(//
commitOffsetEnable, // commitOffset
true, // suspend
subExpression != null, // subscription
classFilter // class filter
);
//....
// 通過拉取消息請求,將commitOffsetValue和sysFlag傳給broker
this.pullAPIWrapper.pullKernelImpl(//
pullRequest.getMessageQueue(), // 1
subExpression, // 2
subscriptionData.getSubVersion(), // 3
pullRequest.getNextOffset(), // 4
this.defaultMQPushConsumer.getPullBatchSize(), // 5
sysFlag, // 6
commitOffsetValue, // 7
BrokerSuspendMaxTimeMillis, // 8
ConsumerTimeoutMillisWhenSuspend, // 9
CommunicationMode.ASYNC, // 10
pullCallback// 11
);
具體broker對消費(fèi)進(jìn)度的處理看后面分析
Broker
消費(fèi)進(jìn)度保存
RocketMQ的網(wǎng)絡(luò)請求都有一個RequestCode,更新消費(fèi)進(jìn)度的Code為UPDATE_CONSUMER_OFFSET,通過查到其使用的地方,找到對應(yīng)的Processor為ClientManageProcessor,其processRequest處理對應(yīng)的請求
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
switch (request.getCode()) {
case RequestCode.HEART_BEAT:
return this.heartBeat(ctx, request);
case RequestCode.UNREGISTER_CLIENT:
return this.unregisterClient(ctx, request);
case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
return this.getConsumerListByGroup(ctx, request);
case RequestCode.UPDATE_CONSUMER_OFFSET:
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
default:
break;
}
return null;
}
更新消費(fèi)進(jìn)度的方法為updateConsumerOffset,里面解析了請求體之后又調(diào)用了ConsumerOffsetManager.commitOffset方法
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;
this.commitOffset(clientHost, key, queueId, offset);
}
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
if (null == map) {
map = new ConcurrentHashMap<Integer, Long>(32);
map.put(queueId, offset);
this.offsetTable.put(key, map);
} else {
Long storeOffset = map.put(queueId, offset);
if (storeOffset != null && offset < storeOffset) {
log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
clientHost, key, queueId, offset, storeOffset);
}
}
}
邏輯也很簡單就不多說了,有意思的是,Broker的保存消費(fèi)進(jìn)度的結(jié)構(gòu)和Consumer類似,Broker多了一個維度,因為Broker接收的是所有消費(fèi)者的進(jìn)度,而Consumer保存的是自己的
在Consumer的消費(fèi)進(jìn)度上報到Broker之后,Broker只是保存到內(nèi)存,這并不可靠,大概也能猜出,和Consumer一樣,也有一個定時任務(wù)將消費(fèi)進(jìn)度持久化。這時,先看下ConsumerOffsetManager這個類的繼承關(guān)系,他的父類是ConfigManager,這個東西很重要,是幾個重要配置信息持久化類,看下其繼承關(guān)系:

分別是訂閱關(guān)系管理,消費(fèi)進(jìn)度管理,Topic信息管理,和延遲隊列信息管理,這4個配置信息都需要通過ConfigManager去持久化和加載,看下ConfigManager的幾個方法
public abstract class ConfigManager {
// 將對象轉(zhuǎn)換成json串
public abstract String encode();
//將文件里內(nèi)容(json格式)的轉(zhuǎn)換成對象
public boolean load() {
String fileName = null;
// 獲取文件地址
fileName = this.configFilePath();
// 將文件里的內(nèi)容讀取出來
String jsonString = MixAll.file2String(fileName);
// json轉(zhuǎn)換成指定對象的數(shù)據(jù)
this.decode(jsonString);
}
// 配置文件地址
public abstract String configFilePath();
// 與load類似
private boolean loadBak() {
String fileName = null;
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName + ".bak");
this.decode(jsonString);
return true;
}
// json轉(zhuǎn)換成指定對象的數(shù)據(jù)
public abstract void decode(final String jsonString);
// 將對象里的數(shù)據(jù)轉(zhuǎn)換成json并持久化到configFilePath()文件中
public synchronized void persist() {
String jsonString = this.encode(true);
String fileName = this.configFilePath();
MixAll.string2File(jsonString, fileName);
}
public abstract String encode(final boolean prettyFormat);
那么ConsumerOffsetManager會實現(xiàn)encode和decode方法并在某個地方定時調(diào)用persist方法,查看其使用的地方,找到BrokerController的initialize方法,有段定時任務(wù)如下:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
可以看到,每flushConsumerOffsetInterval(默認(rèn)5000)毫秒會進(jìn)行一次持久化
拉取消息的時候保存消費(fèi)進(jìn)度
拉取消息的Code為RequestCode.PULL_MESSAGE,對應(yīng)的Processor為PullMessageProcessor,找到其中消費(fèi)進(jìn)度處理的地方
// 上面說的consumer傳過來的commitOffsetEnable
// 當(dāng)Consumer本地消費(fèi)進(jìn)度大于0的時候這個參數(shù)為true
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.
// brokerAllowSuspend在處理消息請求的時候為true,hold請求自己處理是false
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
// Master才需要保存進(jìn)度,slave只是同步broker的消息
storeOffsetEnable = storeOffsetEnable
&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
this.brokerController.getConsumerOffsetManager().commitOffset(
RemotingHelper.parseChannelRemoteAddr(channel),
requestHeader.getConsumerGroup(),
requestHeader.getTopic(),
requestHeader.getQueueId(),
requestHeader.getCommitOffset());//consumer傳上來的offset
}
總的來說:
當(dāng)broker為master的時候,且Consumer消費(fèi)進(jìn)度大于0則在拉取消息的時候順便將消費(fèi)進(jìn)度保存到broker
問題分析
重復(fù)消費(fèi)問題
在ProcessQueue的removeMessage的第二種情況有個問題,假設(shè)有如下情況:
批量拉取了4條消息ABCD,分別對應(yīng)的offset為400|401|402|403,此時consumeBatchSize(批量消費(fèi)數(shù)量,默認(rèn)為1,即一條一條消費(fèi)),那么會分4個線程去消費(fèi)這幾個消息,出現(xiàn)下面消費(fèi)次序
消費(fèi)D -> removeMessage -> 返回400(情況2)
消費(fèi)C -> removeMessage -> 返回400(情況2)
消費(fèi)B -> removeMessage -> 返回400(情況2)
消費(fèi)A -> removeMessage -> 返回404(情況1)
在消費(fèi)A之前,本地消費(fèi)進(jìn)度持久化到Broker之后,應(yīng)用宕機(jī)了,那么此時Broker保存的是offset=400(準(zhǔn)確來說,在消費(fèi)完A且保存消費(fèi)進(jìn)度到broker之前,offset都是400)。那么會有什么問題呢?
先假設(shè)消費(fèi)完DCB且消費(fèi)進(jìn)度上傳完成宕機(jī),然后重啟應(yīng)用,這時候會先從broker獲取應(yīng)該從哪里消費(fèi)(),因為DCB消費(fèi)完成后都是保存400這個消費(fèi)進(jìn)度,那么返回的是400,這時候consumer會請求offset為400的消費(fèi),到這里,已經(jīng)重復(fù)消費(fèi)了DCB。
消費(fèi)進(jìn)度保存在哪里
- consumer保存在內(nèi)存,定時上傳broker
- broker保存在內(nèi)存,定時刷新到磁盤文件
注:以上沒有特別聲明的都是并發(fā)消費(fèi)模式
整體流程圖
