consumer在消費(fèi)時(shí),會(huì)設(shè)置從哪里開始消費(fèi)。
默認(rèn)是CONSUME_FROM_LAST_OFFSET
設(shè)置的值如代碼所示。
public enum ConsumeFromWhere {
/**
* 一個(gè)新的訂閱組第一次啟動(dòng)從隊(duì)列的最后位置開始消費(fèi)<br>
* 后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開始消費(fèi)
*/
CONSUME_FROM_LAST_OFFSET,
@Deprecated
CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
@Deprecated
CONSUME_FROM_MIN_OFFSET,
@Deprecated
CONSUME_FROM_MAX_OFFSET,
/**
* 一個(gè)新的訂閱組第一次啟動(dòng)從隊(duì)列的最前位置開始消費(fèi)<br>
* 后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開始消費(fèi)
*/
CONSUME_FROM_FIRST_OFFSET,
/**
* 一個(gè)新的訂閱組第一次啟動(dòng)從指定時(shí)間點(diǎn)開始消費(fèi)<br>
* 后續(xù)再啟動(dòng)接著上次消費(fèi)的進(jìn)度開始消費(fèi)<br>
* 時(shí)間點(diǎn)設(shè)置參見DefaultMQPushConsumer.consumeTimestamp參數(shù)
*/
CONSUME_FROM_TIMESTAMP,
}
這里要注意代碼注釋。這個(gè)參數(shù)只對(duì)一個(gè)新的consumeGroup第一次啟動(dòng)時(shí)有效。
就是說,如果是一個(gè)consumerGroup重啟,他只會(huì)從自己上次消費(fèi)到的offset,繼續(xù)消費(fèi)。這個(gè)參數(shù)是沒用的。 而判斷是不是一個(gè)新的ConsumerGroup是在broker端判斷。
要知道,消費(fèi)到哪個(gè)offset最先是存在Consumer本地的,定時(shí)和broker同步自己的消費(fèi)offset。broker在判斷是不是一個(gè)新的consumergroup,就是查broker端有沒有這個(gè)consumergroup的offset記錄。
另外,對(duì)于一個(gè)新的queue,這個(gè)參數(shù)也是沒用的,都是從0開始消費(fèi)。
所以,讓我們困惑的一個(gè)問題我已經(jīng)設(shè)置了CONSUME_FROM_LAST_OFFSET,為什么還是重復(fù)消費(fèi)了。
可能你這不是新的consumergroup,也可能是個(gè)新的Queue。