1、消費(fèi)者組
消費(fèi)者(Consumer)負(fù)責(zé)訂閱 Kafka 中的主題( Topic),并且從訂閱的主題上拉取消息。 與其他一些消息中間件不同的是:在 Kafka 的消費(fèi)理念中還有一層消費(fèi)組( Consumer Group) 的概念,每個消費(fèi)者都有 一個對應(yīng)的消費(fèi)組。當(dāng)消息發(fā)布到主題后,只會被投遞給訂閱它的每個消費(fèi)組中的一個消費(fèi)者,不同消費(fèi)組之間互不影響。
按照 Kafka默認(rèn)的規(guī)則,每一個消費(fèi)者會被分配到1個分區(qū), 每個消費(fèi)者只能消費(fèi)所分配到的分區(qū)中的消息。換言之 ,每一個分區(qū)只能被一個消費(fèi)組中的一個消費(fèi)者所消費(fèi) 。 消費(fèi)者與消費(fèi)組這種模型可以讓整體的消費(fèi)能力具備橫向伸縮性,我們可以增加(或減少)消費(fèi)者的個數(shù)來提高 (或降低)整體的消費(fèi)能力 。 對于分區(qū)數(shù)固定的情況,一昧地增加消費(fèi)者并不會讓消費(fèi)能 一直得到提升,如果消費(fèi)者過多,出現(xiàn)了消費(fèi)者的個數(shù)大于分區(qū)個數(shù)的情況, 就會有消費(fèi)者分配不到任何分區(qū)。
對于消息中間件而 言,一般有兩種消息投遞模式:
- 點(diǎn)對點(diǎn)(P2P,Point-to-Point)模式
- 發(fā)布/訂閱( Pub/Sub)模式
點(diǎn)對點(diǎn)模式是基于隊列的,消息生產(chǎn)者發(fā)送消息到隊列,消息消費(fèi)者從隊列中接收消息。發(fā)布訂閱模式定義了如何向一個內(nèi)容節(jié)點(diǎn)發(fā)布和訂閱消息,這個內(nèi)容節(jié)點(diǎn)稱為主題( Topic),主題可以認(rèn)為是消息傳遞的中介,消息發(fā)布者將消息發(fā)布到某個 主題,而消息訂閱者從主題中訂閱消息。主題使得消息的訂閱者和發(fā)布者互相保持獨(dú)立,不需要進(jìn)行接觸即可保證消息的傳遞,發(fā)布/訂閱模式在消息的一對多廣播時采用。 Kafka 同時支持兩種消息投遞模式,而這正是得益于消費(fèi)者與消費(fèi)組模型的契合:
- 如果所有的消費(fèi)者都隸屬于同一個消費(fèi)組,那么所有的消息都會被均衡地投遞給每一個消費(fèi)者,即每條消息只會被一個 消費(fèi)者處 理,這就相當(dāng)于點(diǎn)對點(diǎn)模式的應(yīng)用 。
- 如果所有的消費(fèi)者都隸屬于不同的消費(fèi)組,那么所有的消息都會被廣播給所有的消費(fèi) 者,即每條消息會被所有的消費(fèi)者處理,這就相當(dāng)于發(fā)布/訂閱模式的應(yīng)用。
消費(fèi)組是一個邏輯上的概念,它將旗下的消費(fèi)者歸為一類 ,每一個消費(fèi)者只隸屬于 一個消 費(fèi)組。每一個消費(fèi)組都會有一個固定的名稱,消費(fèi)者在進(jìn)行消費(fèi)前需要指定其所屬消費(fèi)組的名稱,這個可以通過消費(fèi)者客戶端參數(shù) group.id來配置。
消費(fèi)者并非邏輯上的概念,它是實(shí)際的應(yīng)用實(shí)例,它可以是一個錢程,也可以是一個進(jìn)程。 同一個消費(fèi)組內(nèi)的消費(fèi)者既可以部署在同 一 臺機(jī)器上,也可以部署在不同的機(jī)器上。
2、位移提交
對于 Kafka中的分區(qū)而言,它的每條消息都有唯一的 offset,用來表示消息在分區(qū)中對應(yīng)的位置。 對于消費(fèi)者而言,它也有一個 offset的概念,消費(fèi)者使用 offset 來表示消費(fèi)到分區(qū)中某個消息所在的位置。
在每次調(diào)用 poll()方法時,它返回的是還沒有被消費(fèi)過的消息集,要做到這一點(diǎn),就需要記錄上一次消費(fèi)時的消費(fèi)位移 。 并且這個消費(fèi)位移必須做持久化保存,而不是單單保存在內(nèi)存中,否則消費(fèi)者重啟之后就無法知曉之前的消費(fèi)位移。再考慮一種情況,當(dāng)有新的消費(fèi)者加入時,那么必然會有再均衡的動作 , 對于同一分區(qū)而言,它可能在再均衡動作之后分配給新的消費(fèi)者 , 如果不持久化保存消費(fèi)位移,那么這個新的消費(fèi)者也無法知曉之前的消費(fèi)位移 。
消費(fèi)位移存儲在 Kafka 內(nèi) 部的主題 _consumer_offsets 中。這里把將消費(fèi)位移存儲起來(持久化)的動作稱為“提交”,消費(fèi)者在消費(fèi)完消息之后需要執(zhí)行消費(fèi)位移的提交。
對于位移提交的具體時機(jī)的把握很有講究,有可能會造成重復(fù)消費(fèi)和消息丟失的現(xiàn)象 。

上圖中,當(dāng)前一次 poll()操作所拉取的消息集為[x+2,x+7], x+2代表上一次提交的消費(fèi)位移,說明己經(jīng)完成了 x+1 之前(包括 x+1 在內(nèi))的所有消息的消費(fèi),x+5 表示當(dāng)前正在處理的位置。 如果拉取到消息之后就進(jìn)行了位移提 交 ,即提交了 x+8,那么當(dāng)前消費(fèi) x+5 的時候遇到了異常, 在故障恢復(fù)之后,我們重新拉取的消息是從 x+8 開始的。也就是說, x+5 至 x+7 之間的消息并未能被消費(fèi),如此便發(fā)生了消息丟失的現(xiàn)象。
再考慮另外一種情形,位移提交的動作是在消費(fèi)完所有拉取到的消息之后才執(zhí)行的,那么當(dāng)消費(fèi) x+5 的時候遇到了異常,在故障恢復(fù)之后,我們重新拉取的消息是從 x+2 開始的。也就是說, x+2 至 x+4 之間的消息又重新消費(fèi)了 一遍,故而又發(fā)生了重復(fù)消費(fèi)的現(xiàn)象 。
在 Kafka 中默認(rèn)的消費(fèi)位移的提交方式是自動提交,這個由消費(fèi)者客戶端參數(shù) enable.auto.commit 配置,默認(rèn)值為 true。當(dāng)然這個默認(rèn)的自動提交不是每消費(fèi)一條消息就提交一次,而是定期提交,這個定期的周期時間由客戶端參數(shù) auto.commit.interval.ms 配置,默認(rèn)值為 5 秒。
自動提交消費(fèi)位移的方式非常簡便,它免去了復(fù)雜的位移提交邏輯,讓編碼更簡潔,但隨之而來的是重復(fù)消費(fèi)和消息丟失的問題。
自動提交是延時提交,重復(fù)消費(fèi)可以理解,那么消息丟失又是在什么情形下會發(fā)生的呢?

上圖中,拉取線程 A 不斷地拉取消息并存入本地緩存,比如在 BlockingQueue 中,另一個處理線程 B 從緩存中讀取消息并進(jìn)行相應(yīng)的邏輯處 理。假設(shè)目前進(jìn)行到了第 y+1 次拉取,以及第 m 次位移提交的時候,也就是 x+6 之前的位移已經(jīng)確認(rèn)提交了,處理線程 B 卻還正在消費(fèi) x+3 的消息 。 此時如果處理線程 B 發(fā)生了異常,待其恢復(fù)之后會從第 m 此位移提交處,也就是 x+6 的位置開始拉取消息,那么 x+3 至 x+6 之間的消 息就沒有得到相應(yīng)的處理,這樣便發(fā)生消息丟失的現(xiàn)象 。
自動位移提交的方式在正常情況下不會發(fā)生消息丟失或重復(fù)消費(fèi)的現(xiàn)象,但是在編程的世界里異常無可避免,與此同時,自動位移提交也無法做到精確的位移管理。故此,在 Kafka 中還提供了手動位移提交的方式。
手動提交可以細(xì)分為同步提交和異步提交,對應(yīng)于 KafkaConsumer 中的 commitSync()和 commitAsync()兩種類型的方法。
final int minBatchSize = 200;
List<ConsumerRecord> buffer = new ArrayList<>();
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
if (buffer.size() >= minBatchSize) {
// do some logical processing with buffer .
consumer.commitSync();
buffer.clear();
}
}
}
上面的示例中將拉取到的消息存入緩存 buffer,等到積累到足夠多的時候再做相應(yīng)的批量處理,之后再做批量提交。
commitSync()方法會根據(jù) poll()方法拉取的最新位移來進(jìn)行提交,只要沒有發(fā)生不可恢復(fù)的錯誤,它就會阻塞消費(fèi)者線程直至位移提交完成。
如果在業(yè)務(wù)邏輯處理完之后,并且在同步位移提交前,程序出現(xiàn)了崩漬,那么待恢復(fù)之后又只能從上一次位移提交的地方拉取消息,由此在兩次位移提交的窗口中出現(xiàn)了重復(fù)消費(fèi)的現(xiàn)象。 如果想尋求更細(xì)粒度的、更精準(zhǔn)的提交,那么就需要使用 commitSync() 的另一個含參方法:
final int minBatchSize = 200;
List<ConsumerRecord> buffer = new ArrayList<>();
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
if (buffer.size() >= minBatchSize) {
// do some logical processing with buffer .
long offset = record.offset();
TopicPartition partition = new TopicPartition(record.topic(), record.partition());
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
}
}
}
在實(shí)際應(yīng)用中,很少會有這種每消費(fèi)一條消息就提交一次消費(fèi)位移的場景。 commitSync()方法本身是同步執(zhí)行的,會耗費(fèi)一定的性能,而一次一提交的方式會嚴(yán)重拖累性能 。更多時候是按照分區(qū)的粒度劃分提交位移的界限:
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// do some logical processing .
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(
Collections.singletonMap(partition, new OffsetAndMetadata(lastConsumedOffset + 1)));
}
}
}
與 commitSync()方法相反,commitAsync()在執(zhí)行的時候消費(fèi)者線程不會被阻塞,可能在提交消費(fèi)位移的結(jié)果還未返回之前就開始了新一次的拉取操作 。異步提交可以使消費(fèi)者的性能得到一定的增強(qiáng)。
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// do some logical processing.
}
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception == null) {
System.out.println(offsets);
} else {
log.error("fail to commit offsets");
}
}
});
}
commitAsync()提交的時候同樣會有失敗的情況發(fā)生,那么我們應(yīng)該怎么處理呢?最簡單的方式是重試,問題的關(guān)鍵也在這里。 如果某一次異步提交的消費(fèi)位移為 x,但是提交失敗了,然后下一次又異步提交了消費(fèi)位移為 x+y,這次成功了。如果這里引入了重試機(jī)制, 前一次的異步提交的消費(fèi)位移在重試的時候提交成功了,那么此時的消費(fèi)位移又變?yōu)榱?x。如 果此時發(fā)生異常(或者再均衡) , 那么恢復(fù)之后的消費(fèi)者(或者新的消費(fèi)者)就會從 x 處開始 消費(fèi)消息,這樣就發(fā)生了重復(fù)消費(fèi)的問題。
為此我們可以設(shè)置一個遞增的序號來維護(hù)異步提交的順序,每次位移提交之后就增加序號相對應(yīng)的值。在遇到位移提交失敗需要重試的時候,可以檢查所提交的位移和序號的值的大小, 如果前者小于后者,則說明有更大的位移己經(jīng)提交了,不需要再進(jìn)行本次重試;如果兩者相同, 則說明可以進(jìn)行重試提交。除非程序編碼錯誤 ,否則不會出現(xiàn)前者大于后者的情況 。
試想一下,當(dāng)一個新的消費(fèi)組建立的時候,它根本沒有可以查找的消費(fèi)位移,或者消費(fèi)組內(nèi)的一個新消費(fèi)者訂閱了一個新的主題,它也沒有可以查找的消費(fèi)位移。這時候該從哪里開始消費(fèi)呢?

實(shí)際上kafka會根據(jù)消費(fèi)者客戶端參數(shù)auto.offset.reset 來決定,默認(rèn)值為“ latest”,表示從分區(qū)末尾開始消費(fèi)消息。按照默認(rèn)的配置,消費(fèi)者會從 9 開始進(jìn)行消費(fèi),更加確切地說是從 9 開始拉取消息 。如果將 auto.offset.reset 參數(shù)配置為“ earliest”,那么消費(fèi)者會從起始處,也就是 0 開始消費(fèi)。
除了查找不到消費(fèi)位移,位移越界也會觸發(fā) auto.offset.reset 參數(shù)的執(zhí)行。
3、消息回溯
有些時候,我們 需要一種更細(xì)粒度的掌控,可以讓我們從特定的位移處開始拉取消息,而 KafkaConsumer 中的 seek()方法正好提供了這個功能,讓我們得以回溯消費(fèi)。
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {// 如采不為 0,則說明已經(jīng)成功分配到了分區(qū)
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment(); //獲取消費(fèi)者分配到的分區(qū)信息
}
for (TopicPartition tp : assignment) {
consumer.seek(tp, 10); //從每個分區(qū)offset為10的地方開始消費(fèi)
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// consume the record .
}
如果消費(fèi)組內(nèi)的消費(fèi)者在啟動的時候能夠找到消費(fèi)位移,除非發(fā)生位移越界,否 則 auto.offset.reset 參數(shù)并不會奏效, 此時如果想指定從開頭或末尾開始消費(fèi),就需要 seek() 方法的幫助了。
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
consumer.poll(Duration.ofMillis(100));
assignment = consumer.assignment();
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
for (TopicPartition tp : assignment) {
consumer.seek(tp, offsets.get(tp)); // 從每個分區(qū)的末尾開始消費(fèi)
}
// Map<TopicPartition, Long> offsets = consumer.beginningOffsets(assignment);
// for (TopicPartition tp : assignment) {
// consumer.seek(tp, offsets.get(tp)); // 從每個分區(qū)的起點(diǎn)開始消費(fèi)
// }
需要注意的是, 一個分區(qū)的起始位置起初是 0,但并不代表每時每刻都為 0, 因?yàn)槿罩厩謇淼膭幼鲿謇砼f的數(shù)據(jù),所以分區(qū)的起始位置會自然而然地增加。
配合這兩個方法我們就可以從分區(qū)的開頭或末尾開始消費(fèi)。其實(shí) KafkaConsumer 中直接提供了 seekToBeginning() 方法和 seekToEnd()方法來實(shí)現(xiàn)這兩個功能,這兩個方法 的具體定義如下:
public void seekToBeginning(Collection<TopicPartition> partitions);
public void seekToEnd(Collection<TopicPartition> partitions);
有時候我們并不知道特定的消費(fèi)位置,卻知道一個相關(guān)的時間點(diǎn),比如我們想要消費(fèi)昨天 8 點(diǎn)之后的消息,這個需求更符合正常的思維邏輯。此時我們無法直接使用 seek()方法來追溯到相應(yīng)的位置。 KafkaConsumer同樣考慮到了這種情況,它提供了一個 offsetsForTimes()方法,通 過 timestamp 來查詢與此對應(yīng)的分區(qū)位置。
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartitioη, Long> t工mestampsToSearch,
Duration timeout);
Kafka 中的消費(fèi)位移是存儲在一個內(nèi)部主題 中的, 而 seek()方法可 以突破這 一限制:消費(fèi)位移可以保存在任意的存儲介質(zhì)中,例如數(shù)據(jù)庫、文件系統(tǒng)等。以數(shù)據(jù) 庫為例,我們將消費(fèi)位移保存在其中的一個表中,在下次消費(fèi)的時候可以讀取存儲在數(shù)據(jù)表中的消費(fèi)位移并通過 seek()方法指向這個具體的位置:
consumer.subscribe(Arrays.asList(topic)); // 省略 poll ( )方法及 assignment 的邏輯
for (TopicPartition tp : assignment) {
long offset = getOffsetFromDB(tp); // 從 DB 中讀取消費(fèi)位移 consumer.seek(tp, offset) ;
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
// process the record .
long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 將消費(fèi)位移存儲在 DB 中
storeOffsetToDB(partition, lastConsumedOffset + 1);
}
}
}
seek()方法為我們提供從特定位置讀取消息的能力,我們可以通過這個方法來向前跳過若 干消息,也可以通過這個方法來 向后回溯若干消息,這樣為消息的消費(fèi)提供了很大的靈活性。 seek()方法也為我們提供了將消費(fèi)位移保存在外部存儲介質(zhì)中的能力,還可以配合再均衡監(jiān)聽器 來提供更加精準(zhǔn)的消費(fèi)能力。
4、再均衡
再均衡是指分區(qū)的所屬權(quán)從一個消費(fèi)者轉(zhuǎn)移到另一消費(fèi)者的行為,它為消費(fèi)組具備高可用性和伸縮性提供保障,使我們可以既方便又安全地刪除消費(fèi)組內(nèi)的消費(fèi)者或往消費(fèi)組內(nèi)添加消費(fèi)者。
不過在再均衡發(fā)生期間,消費(fèi)組內(nèi)的消費(fèi)者是無法讀取消息的。 也就是說,在再均衡發(fā) 生期間的這一小段時間內(nèi),消費(fèi)組會變得不可用 。
另外,當(dāng) 一個分區(qū)被重新分配給另一個消費(fèi)者時,消費(fèi)者當(dāng)前的狀態(tài)也會丟失。比如消費(fèi)者消費(fèi)完某個分區(qū)中的一部分消息時還沒有來得及提交消費(fèi)位移就發(fā)生了再均衡操作 , 之后這個分區(qū)又被分配給了消費(fèi)組內(nèi)的另一個消費(fèi)者, 原來被消費(fèi)完的那部分消息又被重新消費(fèi)一遍,也就是發(fā)生了重復(fù)消費(fèi)。
kafka提供了再均衡監(jiān)聽器接口,用來設(shè)定發(fā)生再 均衡動作前后的一些準(zhǔn)備或收尾的動作。
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
//這個方法會在再均衡開始之前和消費(fèi)者停止讀取消息之后被調(diào)用
consumer.commitSync(currentOffsets);
currentOffsets.clear();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
//這個方法會在重新分配分區(qū)之后和消費(fèi)者開始讀取消費(fèi)之前被調(diào)用
// do nothing.
}
});
try {
while (isRunning.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
// process the record .
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1));
consumer.commitAsync(currentOffsets, null);
}
}
} finally {
consumer.close();
}
上面的代碼將消費(fèi)位移暫存到一個局部變量 currentOffsets 中,這樣在正常消費(fèi)的時候 可以通過 commitAsync()方法來異步提交消費(fèi)位移,在發(fā)生再均衡動作之前可以通過再均衡監(jiān)聽 器的 onPartitionsRevoked()回調(diào)執(zhí)行 commitSync()方法同步提交消費(fèi)位移,以盡量避免一些不 要的重復(fù)消費(fèi)。
5、消費(fèi)者攔截器
kafka中不僅對生產(chǎn)者配備了攔截器,消費(fèi)者也有相應(yīng)的攔截器的概念。消費(fèi)者攔截器主要在消費(fèi)到消息或在提交消費(fèi)位移時進(jìn)行一些定制化的操作。
下面使用消費(fèi)者攔截器來實(shí)現(xiàn)一個簡單的消息 TTL(Time to Live,即過期時)的功能:
public class ConsumerinterceptorTTL implements ConsumerInterceptor<String, String> {
private static final long EXPIRE_INTERVAL = 10 * 1000;
@Override
public void configure(Map<String, ?> arg0) {
}
@Override
public void close() {
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
//該方法會在會在提交完消費(fèi)位移之后調(diào)用,可以使用這個方法來記錄跟蹤所提交的位移信息
offsets.forEach((tp, offset) -> System.out.println(tp + ": " + offset.offset()));
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
//該方法會在 poll()方法返回之前調(diào)用,可以對消息進(jìn)行相應(yīng)的定制化操作,比如修改返回的消息內(nèi)容、按照某種規(guī)則過濾消息(可能會減少 poll()方法返回的消息的個數(shù)
//如果 onConsume()方法中拋出異常,那么會被捕獲并記錄到日志中,但是異常不會再向上傳遞。
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
if (now - record.timestamp() < EXPIRE_INTERVAL) {
newTpRecords.add(record);
if (!newTpRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
}
}
return new ConsumerRecords<>(newRecords);
}
}
不過使用這種功能時需要注意的是 : 在使用帶參數(shù)的位移提交時,有可能提交了錯誤的位移信息,因?yàn)榭赡芎凶畲笃屏康南幌M(fèi)者攔截器過濾 。
6、線程安全
KatkaProducer 是線程安全的,然而 KafkaConsumer 卻是非線程安全的 。
KafkaConsumer 中定義了一個 acquire()方法 ,用來檢測當(dāng)前是否只有一個線程在操作:
private final AtomicLong currentThread = new Atom工cLong(NO_CURRENT_THREAD); // KafkaConsumer 中的成員變量
private void acquire() {
long threadid = Thread.currentThread().getid();
if (threadid != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadid))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi- threaded access ");
refcount.incrementAndGet();
}
acquire()方法和我們通常所說的鎖(synchronized、 Lock等) 不同,它不會造成阻塞等待, 我們可以將其看作一個輕量級鎖,它僅通過線程操作計數(shù)標(biāo)記的方式來檢測線程是否發(fā)生了并發(fā)操作,以此保證只有一個線程在操作。 acquire()方法和 release()方法成對出現(xiàn),表示相應(yīng)的加鎖和解鎖操作。 release()方法也很簡單, 具體定義如下 :
private void release() {
if (refcount.decrementAndGet() == 0)
currentThread.set(NO_CURRENT_THREAD);
}
7、分區(qū)分配策略
Kafka 提供了消費(fèi)者客戶端參數(shù) partition.assignment.strategy 來設(shè) 置消費(fèi)者與訂閱主題之間的分區(qū)分配策略。默認(rèn)情況下,此參數(shù)的值為 org.apache.kafka.clients.consumer.RangeAssignor,即采用 RangeAssignor分配策略。除此之外, Kafka還提供了另 外兩種分配策略: RoundRobinAssignor 和 StickyAssignor。
RangeAssignor 分配策略的原理是按照消費(fèi)者總數(shù)和分區(qū)總數(shù)進(jìn)行整除運(yùn)算來獲得一個跨度,然后將分區(qū)按照跨度進(jìn)行平均分配, 以保證分區(qū)盡可能均勻地分配 給所 有的消費(fèi)者 。 對于每一個主題 , RangeAssignor 策略會將消費(fèi)組內(nèi)所有訂閱這個主題的消費(fèi)者按照名稱的字典序排序 , 然后為每個消費(fèi)者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費(fèi)者會被多分配一個分區(qū)。
假設(shè) n=分區(qū)數(shù)/消費(fèi)者數(shù)量, m=分區(qū)數(shù)%消費(fèi)者數(shù)量,那么前 m 個消費(fèi)者每個分配 n+l 個 分區(qū),后面的(消費(fèi)者數(shù)量-m)個消費(fèi)者每個分配 n個分區(qū)。
假設(shè)消費(fèi)組內(nèi)有 2個消費(fèi)者 C0和 C1,都訂閱了主題 t0和 t1,并且每個主題都有4個分區(qū), 那么訂閱的所有分區(qū)可以標(biāo)識為 : t0p0、 t0p1、 t0p2、 t0p3、 t1p0、 t1p1、 t1p2、 t1p3。最終的分配結(jié)果為 :
- 消費(fèi)者 C0:t0p0、 t0p1、 t1p1、 t1p1
- 消貨者 C1:t0p2、 t0p3、 t1p2、 t1p3
這樣分配得很均勻,那么這個分配策略能夠一直保持這種良好的特性嗎?我們不妨再來看另一種情況。假設(shè)上面例子中 2個主題都只有 3個分區(qū),那么訂閱的所有分區(qū)可以標(biāo)識為: t0p0、 t0p1、 t0p2、 t1p0、 t1p1、 t1p2。最終的分配結(jié)果為 :
- 消費(fèi)者 C0:t0p0、 t0p1、t1p0、 t1p1
- 消費(fèi)者 C1:t0p2、 t1p2
可以明顯地看到這樣的分配并不均勻,如果將類似的情形擴(kuò)大, 則有可能出現(xiàn)部分消費(fèi)者過載的情況。
對此我們再來看另一種 RoundRobinAssignor策略的分配效果如何。
RoundRobinAssignor 分配策略的原理是將消費(fèi)組內(nèi)所有消費(fèi)者及消費(fèi)者訂閱的所有主題的分區(qū)按照字典序 排序,然后通過輪詢方式逐個將分區(qū)依次分配給每個消費(fèi)者。
如果同一個消費(fèi)組內(nèi)所有的消費(fèi)者的訂閱信息都是相同的,那么 RoundRobinAssignor分配策略的分區(qū)分配會是均勻的。舉個例子,假設(shè)消費(fèi)組中有 2 個消費(fèi)者 C0 和 C1,都訂閱了主題 t0 和 t1,并且每個主題都有 3 個分區(qū) , 那么訂閱的所有分區(qū)可以標(biāo)識為: t0p0、 t0p1、 t0p2、 t1p0、 t1p1、 t1p2。最終的分配結(jié)果為 :
- 消費(fèi)者 C0:t0p0、 t0p2、t1p1
- 消費(fèi)者 C1:t0p1、t1p0、 t1p2
如果同一個消費(fèi)組內(nèi)的消費(fèi)者訂閱的信息是不相同的,那么在執(zhí)行分區(qū)分配的時候就不是完全的輪詢分配,有可能導(dǎo)致分區(qū)分配得不均勻。如果某個消費(fèi)者沒有訂閱消費(fèi)組內(nèi)的某個主 題,那么在分配分區(qū)的時候此消費(fèi)者將分配不到這個主題的任何分區(qū)。
舉個例子,假設(shè)消費(fèi)組內(nèi)有 3個消費(fèi)者,它們共訂閱了 3個主題,這 3個主題分別有 l、 2、 3個分區(qū),即整個消費(fèi)組訂閱了 t0p0、 t1p0、 t1p1、 t2p0、 t2p1、 t2p2這6個分區(qū)。 具體而言,消費(fèi)者C0訂閱的是主題t0,消費(fèi)者C1訂閱的是主題t0和t1。消費(fèi)者 C2 訂閱的是主題 t0、 t1 和 t2, 那么最終的分配結(jié)果為 :
- 消費(fèi)者C0:t0p0
- 消費(fèi)者C1:t1p0
- 消費(fèi)者C2:t1p1、t2p0、t2p1、t2p2
可以看到 RoundRobinAssignor策略也不是十分完美,這樣分配其實(shí)并不是最優(yōu)解,因?yàn)橥耆梢詫⒎謪^(qū) t1p1 分配給消費(fèi)者 C1。
再來看一下 StickyAssignor分配策略,它主要有兩個目的:
- 分區(qū)的分配要盡可能均勻
- 分區(qū)的分配盡可能與上次分配的保持相同
假設(shè)消費(fèi)組內(nèi)有3個消費(fèi)者,它們都訂閱了4個主題,并且每個主題有 2 個分區(qū) 。 也就是說,整個消費(fèi)組訂閱了 t0p0、 t0p1、 t1p0、 t1p1、 t2p0、 t2p1、 t3p0、 t3p1這8個分區(qū)。 最終的分配結(jié)果如下:
- 消費(fèi)者 C0:t0p0、t1p1、t3p0
- 消費(fèi)者 C1:t0p1、t2p0、t3p1
- 消費(fèi)者 C2:t1p0、t2p1
這樣初看上去似乎與采用 RoundRobinAssignor分配策略所分配的結(jié)果相同,但事實(shí)是否真的如此呢?再假設(shè)此時消費(fèi)者 Cl 脫離了消費(fèi)組,那么消費(fèi)組就會執(zhí)行再均衡操作,進(jìn)而消費(fèi)分區(qū)會重新分配。如果采用 RoundRobinAssignor分配策略,那么此時的分配結(jié)果如下:
- 消費(fèi)者 C0:t0p0、 t1p0、 t2p0、t3p0
- 消費(fèi)者 C2:t0p1、 t1p1、 t2p1、t3p1
RoundRobinAssignor分配策略會按照消費(fèi)者 C0和 C2進(jìn)行重新輪詢分配。 如果此時使用的是 StickyAssignor 分配策略,那么分配結(jié)果為:
- 消費(fèi)者 C0:t0p0、 t1p1、t3p0、t2p0
- 消費(fèi)者 C2:t1p0、 t2p1、t0p1、t3p1
可以看到分配結(jié)果中保留了上一次分配中對消費(fèi)者 C0和 C2 的所有分配結(jié)果,并將原來消 費(fèi)者 C1 的負(fù)擔(dān)分配給了剩余的兩個消費(fèi)者 C0 和 C2, 最終 C0 和 C2 的分配還保持了均衡 。
如果發(fā)生分區(qū)重分配,那么對于同一個分區(qū)而言,有可能之前的消費(fèi)者和新指派的消費(fèi)者不是同一個,之前消費(fèi)者進(jìn)行到 一半的處理還要在新指派的消費(fèi)者中再次復(fù)現(xiàn)一遍,這顯然很浪費(fèi)系統(tǒng)資源。 StickyAssignor 分配策略如同其名稱中的“ sticky” 一樣,讓分配策略具備一定 的“粘性”,盡可能地讓前后兩次分配相同,進(jìn)而減少系統(tǒng)資源的損耗及其他異常情況的發(fā)生 。
8、參數(shù)解析
- fetch.min.bytes
該參數(shù)用來配置 Consumer 在一次拉取請求中能從 Kafka 中拉取的最小數(shù)據(jù)量。Kafka在收到 Consumer 的拉取請求時,如果返回給 Consumer 的數(shù) 據(jù)量小于這個參數(shù)所配置的值,那么它就需要進(jìn)行等待,直到數(shù)據(jù)量滿足這個參數(shù)的配置大小。 可以適當(dāng)調(diào)大這個參數(shù)的值以提高一定的吞吐量,不過也會造成額外的延遲,對于延遲敏感的應(yīng)用可能就不可取了。
- fetch.max.bytes
用來配置 Consumer在一次拉取請求中從 Kafka 中拉取的最大數(shù)據(jù)量,默認(rèn)值為 50MB。該參數(shù)設(shè)定的不是絕對的最大值 ,如果在第一個非空分區(qū)中拉取的第一條消息大于該值, 那么該消息將仍然返回,以確保消費(fèi)者繼續(xù)工作。
- fetch.max.wait.ms
這個參數(shù)和 fetch.min.bytes 參數(shù)有關(guān),如果 Kafka 僅僅參考 fetch.min.bytes 參數(shù)的要求,那么有可能會一直阻塞等待而無法發(fā)送響應(yīng)給 Consumer, 顯然這是不合理的 。 fetch.max.wait.ms 參數(shù)用于指定 Kafka 的等待時間,默認(rèn)值為 500 (ms)。 這個參數(shù)的設(shè)定和 Consumer 與 Kafka 之 間的延遲也有關(guān)系, 如果業(yè)務(wù)應(yīng)用對延遲敏感,那么可以適 當(dāng)調(diào)小這個參數(shù) 。
- max.partition.fetch.bytes
這個參數(shù)用來配置從每個分區(qū)里返回給 Consumer的最大數(shù)據(jù)量 ,默認(rèn)值為1M。。
- max.poll.records
這個參數(shù)用來配置 Consumer 在 一 次拉取請求中拉取的最大消息數(shù),默認(rèn)值為 500 (條)。如果消息的大小都比較小,則可以適當(dāng)調(diào)大這個參數(shù)值來提升一定的消費(fèi)速度。
- connections.max.idle.ms
這個參數(shù)用來指定在多久之后關(guān)閉限制的連接,默認(rèn) 9 分鐘。
- exclude.internal.topics
Kafka 中有兩個內(nèi)部的主題:_consumer_offsets 和 _transaction_state, exclude.internal.topics 用來指定 Kafka 中的內(nèi)部主題是否可以向消費(fèi)者公開,默認(rèn)值為 true。如果設(shè)置為 true,那么只 能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式來訂閱內(nèi)部主題,設(shè)置為 false 則沒有這個限制。
- receive.buffer.bytes
這個參數(shù)用來設(shè)置 Socket 接收消息緩沖區(qū)的大小,默認(rèn)為64M。如果設(shè)置為 -1,則使用操作系統(tǒng)的默認(rèn)值。如果Consumer與Kafka處于不同的機(jī)房, 則可以適當(dāng)調(diào)大這個參數(shù)值。
- send.buffer.bytes
這個參數(shù)用來設(shè)置 Socket發(fā)送消息緩沖區(qū),默認(rèn)值為 128M 。如果設(shè)置為 -1,則使用操作系統(tǒng)的默認(rèn)值。
- request.timeout.ms
這個參數(shù)用來配置 Consumer 等待請求響應(yīng)的最長時間,默認(rèn)值為 30000 (ms)。
- isolation.level
這個參數(shù)用來配置消費(fèi)者的事務(wù)隔離級別。字符串類型,有效值為“read uncommitted和 “ read committed",表示消費(fèi)者所消費(fèi)到的位置,如果設(shè)置為“ read committed”,那么消費(fèi)者就會忽略事務(wù)未提交的消息,默認(rèn)情況下為 “read_uncommitted”,即可以消費(fèi)到 HW (High Watermark)處的位置。