應(yīng)用程序使用 KafkaConsumer向 Kafka 訂閱 Topic 接收消息,首先理解 Kafka 中消費(fèi)者(consumer)和消費(fèi)者組(consumer group)的概念和特性。
KafkaConsumer
消費(fèi)者和消費(fèi)者組
當(dāng)生產(chǎn)者向 Topic 寫入消息的速度超過了消費(fèi)者(consumer)的處理速度,導(dǎo)致大量的消息在 Kafka 中淤積,此時需要對消費(fèi)者進(jìn)行橫向伸縮,用多個消費(fèi)者從同一個主題讀取消息,對消息進(jìn)行分流。
Kafka 的消費(fèi)者都屬于消費(fèi)者組(consumer group)。一個組中的 consumer 訂閱同樣的 topic,每個 consumer 接收 topic 一些分區(qū)(partition)中的消息。同一個分區(qū)不能被一個組中的多個 consumer 消費(fèi)。
假設(shè)現(xiàn)在有一個 Topic 有4個分區(qū),有一個消費(fèi)者組訂閱了這個 Topic,隨著組中的消費(fèi)者數(shù)量從1個增加到5個時,Topic 中分區(qū)被讀取的情況:

如果組中 consumer 的數(shù)量超過分區(qū)數(shù),多出的 consumer 會被閑置。因此,如果想提高消費(fèi)者的并行處理能力,需要設(shè)置足夠多的 partition 數(shù)量。
除了通過增加 consumer 來橫向伸縮單個應(yīng)用程序外,還會出現(xiàn)多個應(yīng)用程序從同一個 Topic 讀取數(shù)據(jù)的情況。這也是 Kafka 設(shè)計的主要目標(biāo)之一:讓 Topic 中的數(shù)據(jù)能夠滿足各種應(yīng)用場景的需求。
如果要每個應(yīng)用程序都可以獲取到所有的消息,而不只是其中的一部分,只要保證每個應(yīng)用程序有自己的 consumer group,就可以獲取到 Topic 所有的消息:

橫向伸縮 Kafka 消費(fèi)者和消費(fèi)者群組并不會對性能造成負(fù)面影響。
分區(qū)再均衡
一個消費(fèi)者組內(nèi)的 consumer 共同讀取 Topic 的分區(qū)。
- 當(dāng)一個 consumer 加入組時,讀取的是原本由其他 consumer 讀取的分區(qū)。
- 當(dāng)一個 consumer 離開組時(被關(guān)閉或發(fā)生崩潰),原本由它讀取的分區(qū)將由組里的其他 consumer 來讀取。
- 當(dāng) Topic 發(fā)生變化時,比如添加了新的分區(qū),會發(fā)生分區(qū)重分配。
分區(qū)的所有權(quán)從一個消費(fèi)者轉(zhuǎn)移到另一個消費(fèi)者,這樣的行為被稱為再均衡(rebalance)。再均衡非常重要,為消費(fèi)者組帶來了高可用性和伸縮性,可以放心的增加或移除消費(fèi)者。
再均衡期間,消費(fèi)者無法讀取消息,造成整個 consumer group 一小段時間的不可用。另外,當(dāng)分區(qū)被重新分配給另一個消費(fèi)者時,當(dāng)前的讀取狀態(tài)會丟失。
消費(fèi)者通過向作為組協(xié)調(diào)器(GroupCoordinator)的 broker(不同的組可以有不同的協(xié)調(diào)器)發(fā)送心跳來維持和群組以及分區(qū)的關(guān)系。心跳表明消費(fèi)者在讀取分區(qū)里的消息。消費(fèi)者會在輪詢消息或提交偏移量(offset)時發(fā)送心跳。如果消費(fèi)者停止發(fā)送心跳的時間足夠長,會話就會過期,組協(xié)調(diào)器認(rèn)為消費(fèi)者已經(jīng)死亡,會觸發(fā)一次再均衡。
在 Kafka 0.10.1 的版本中,對心跳行為進(jìn)行了修改,由一個獨立的線程負(fù)責(zé)心跳。
消費(fèi) Kafka
創(chuàng)建 Kafka 消費(fèi)者
在讀取消息之前,需要先創(chuàng)建一個 KafkaConsumer 對象。創(chuàng)建 KafkaConsumer 對象與創(chuàng)建 KafkaProducer 非常相似,創(chuàng)建 KafkaConsumer 示例:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092, broker2:9092");
// group.id,指定了消費(fèi)者所屬群組
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");
KafkaConsumer<String, String> consumer =
new KafkaConsumer<String, String>(props);
訂閱主題
創(chuàng)建了消費(fèi)者之后,需要訂閱 Topic,subscribe() 方法接受一個主題列表作為參數(shù):
// topic name is “customerCountries”
consumer.subscribe(Collections.singletonList("customerCountries"));
subscribe() 也可以接收一個正則表達(dá)式,匹配多個主題(如果有新的名稱匹配的主題創(chuàng)建,會立即觸發(fā)一次再均衡,消費(fèi)者就可以讀取新添加的主題)。在 Kafka 和其他系統(tǒng)之間復(fù)制數(shù)據(jù)時,使用正則表達(dá)式的方式訂閱多個主題是很常見的做法。
// 訂閱所有 test 前綴的 Topic:
consumer.subscribe("test.*");
消息輪詢
消息輪詢是消費(fèi)者的核心,通過輪詢向服務(wù)器請求數(shù)據(jù)。消息輪詢 API 會處理所有的細(xì)節(jié),包括群組協(xié)調(diào)、分區(qū)再均衡、發(fā)送心跳和獲取數(shù)據(jù),開發(fā)者只需要處理從分區(qū)返回的數(shù)據(jù)。消費(fèi)者代碼的主要部分如下所示:
try {
while (true) {
// 100 是超時時間(ms),在該時間內(nèi) poll 會等待服務(wù)器返回數(shù)據(jù)
ConsumerReccords<String, String> records = consumer.poll(100);
// poll 返回一個記錄列表。
// 每條記錄都包含了記錄所屬主題的信息、記錄所在分區(qū)的信息、記錄在分區(qū)里的偏移量,以及記錄的鍵值對。
for (ConsumerReccord<String, String> record : records) {
log.debug("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value() ) + 1;
custCountryMap.put(record.value(), updatedCount);
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString());
}
}
} finally {
// 關(guān)閉消費(fèi)者,網(wǎng)絡(luò)連接和 socket 也會隨之關(guān)閉,并立即觸發(fā)一次再均衡
consumer.close();
}
在第一次調(diào)用新消費(fèi)者的 poll() 方法時,會負(fù)責(zé)查找 GroupCoordinator,然后加入群組,接受分配的分區(qū)。如果發(fā)生了再均衡,整個過程也是在輪詢期間進(jìn)行的。心跳也是從輪詢里發(fā)送出去的。
消費(fèi)者配置
Kafka 與消費(fèi)者相關(guān)的配置大部分參數(shù)都有合理的默認(rèn)值,一般不需要修改,不過有一些參數(shù)與消費(fèi)者的性能和可用性有很大關(guān)系。接下來介紹這些重要的屬性。
1. fetch.min.bytes
指定消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。服務(wù)器在收到消費(fèi)者的數(shù)據(jù)請求時,如果可用的數(shù)據(jù)量小于 fetch.min.bytes,那么會等到有足夠的可用數(shù)據(jù)時才返回給消費(fèi)者。
合理的設(shè)置可以降低消費(fèi)者和 broker 的工作負(fù)載,在 Topic 消息生產(chǎn)不活躍時,減少處理消息次數(shù)。如果沒有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率卻很高,需要調(diào)高該屬性的值。如果消費(fèi)者的數(shù)量比較多,調(diào)高該屬性的值也可以降低 broker 的工作負(fù)載。
2. fetch.max.wait.ms
指定在 broker 中的等待時間,默認(rèn)是500ms。如果沒有足夠的數(shù)據(jù)流入 Kafka,消費(fèi)者獲取的數(shù)據(jù)量的也沒有達(dá)到 fetch.min.bytes,最終導(dǎo)致500ms的延遲。
如果要降低潛在的延遲(提高 SLA),可以調(diào)低該屬性的值。fetch.max.wait.ms 和 fetch.min.bytes 有一個滿足條件就會返回數(shù)據(jù)。
3. max.parition.fetch.bytes
指定了服務(wù)器從每個分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù),默認(rèn)值是1MB。也就是說 KafkaConsumer#poll() 方法從每個分區(qū)里返回的記錄最多不超過 max.parition.fetch.bytes 指定的字節(jié)。
如果一個主題有20個分區(qū)和5個消費(fèi)者(同一個組內(nèi)),那么每個消費(fèi)者需要至少4MB 的可用內(nèi)存(每個消費(fèi)者讀取4個分區(qū))來接收記錄。如果組內(nèi)有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)。
max.parition.fetch.bytes 必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(max.message.size)大,否則消費(fèi)者可能無法讀取這些消息,導(dǎo)致消費(fèi)者一直重試。
另一個需要考慮的因素是消費(fèi)者處理數(shù)據(jù)的時間。消費(fèi)者需要頻繁調(diào)用 poll() 方法來避免會話過期和發(fā)生分區(qū)再均衡,如果單次調(diào)用 poll() 返回的數(shù)據(jù)太多,消費(fèi)者需要更多的時間來處理,可能無法及時進(jìn)行下一個輪詢來避免會話過期。如果出現(xiàn)這種情況,可以把 max.parition.fetch.bytes 值改小或者延長會話過期時間。
4. session.timeout.ms
指定了消費(fèi)者與服務(wù)器斷開連接的最大時間,默認(rèn)是3s。如果消費(fèi)者沒有在指定的時間內(nèi)發(fā)送心跳給 GroupCoordinator,就被認(rèn)為已經(jīng)死亡,會觸發(fā)再均衡,把它的分區(qū)分配給其他消費(fèi)者。
該屬性與 heartbeat.interval.ms 緊密相關(guān),heartbeat.interval.ms 指定了 poll() 方法向協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms 指定了消費(fèi)者最長多久不發(fā)送心跳。所以,一般需要同時修改這兩個屬性,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一,如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s。
調(diào)低屬性的值可以更快地檢測和恢復(fù)崩潰的節(jié)點,不過長時間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡。調(diào)高屬性的值,可以減少意外的再均衡,不過檢測節(jié)點崩潰需要更長的時間。
5. auto.offset.reset
指定了消費(fèi)者在讀取一個沒有偏移量(offset)的分區(qū)或者偏移量無效的情況下(因消費(fèi)者長時間失效,包含偏移量的記錄已經(jīng)過時井被刪除)該作何處理,默認(rèn)值是 latest,表示在 offset 無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)(在消費(fèi)者啟動之后生成的記錄)。
另一個值是 earliest,消費(fèi)者將從起始位置讀取分區(qū)的記錄。
6. enable.auto.commit
指定了消費(fèi)者是否自動提交偏移量,默認(rèn)值是 true,自動提交。
設(shè)為 false 可以程序自己控制何時提交偏移量。如果設(shè)為 true,需要通過配置 auto.commit.interval.ms 屬性來控制提交的頻率。
7. partition.assignment.strategy
分區(qū)分配給組內(nèi)消費(fèi)者的策略,根據(jù)給定的消費(fèi)者和 Topic,決定哪些分區(qū)應(yīng)該被分配給哪個消費(fèi)者。Kafka 有兩個默認(rèn)的分配策略:
Range,把 Topic 的若干個連續(xù)的分區(qū)分配給消費(fèi)者。
假設(shè) consumer1 和 consumer2(c1、c2 代替)訂閱了 topic1 和 topic2(t1、t2 代替),每個 Topic 都有3個分區(qū)。那么 c1 可能分配到 t1-part-0、t1-part-1、t2-part-0 和 t2-part1,而 c2 可能分配到 t1-part-2 和 t2-part-2。只要使用了Range策略,而且分區(qū)數(shù)量無法被消費(fèi)者數(shù)量整除,就會出現(xiàn)這種情況。RoundRobin,把所有分區(qū)逐個分配給消費(fèi)者。
上面的例子如果使用 RoundRobin 策略,那么 c1 可能分配到 t1-part-0、t1-part-2 和 t2-part-1,c2 可能分配到 t1-part-1、t2-part-0 和 t2-part-2。一般來說,RoundRobin策略會給所有消費(fèi)者分配大致相同的分區(qū)數(shù)。
默認(rèn)值是 org.apache.kafka.clients.consumer.RangeAssignor,這個類實現(xiàn)了 Range 策略,org.apache.kafka.clients.consumer.RoundRobinAssignor 是 RoundRobin 策略的實現(xiàn)類。還可以使用自定義策略,屬性值設(shè)為自定義類的名字。
8. client.id
broker 用來標(biāo)識從客戶端發(fā)送過來的消息,可以是任意字符串,通常被用在日志、度量指標(biāo)和配額中。
9. max.poll.records
用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,幫助控制在輪詢里需要處理的數(shù)據(jù)量。
10. receive.buffer.bytes 和 send.buffer.bytes
分別指定了 TCP socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小。如果設(shè)為-1就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當(dāng)增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
提交和偏移量
每次調(diào)用 poll() 方法,總是返回 Kafka 中還沒有被消費(fèi)者讀取過的記錄,使用偏移量(offset)來記錄消費(fèi)者讀取的分區(qū)的位置。
更新分區(qū)當(dāng)前位置的操作叫做“提交(commit)”,消費(fèi)者是如何提交偏移量的呢?
消費(fèi)者向一個特殊的 Topic:_consumer_offset 發(fā)送消息,消息包含每個分區(qū)的偏移量。偏移量只有在消費(fèi)者發(fā)生崩潰或者有新的消費(fèi)者加入群組觸發(fā)再均衡時有用。完成再均衡之后,消費(fèi)者可能分配到新的分區(qū),為了能夠繼續(xù)之前的工作,消費(fèi)者需要讀取每個分區(qū)最后一次提交的 offset,然后從 offset 指定的地方繼續(xù)處理。
如果提交的 offset 大于客戶端處理的最后一個消息偏移量,那么處于兩個偏移量之間的消息會丟失。反之則會消息重復(fù)。


所以,處理偏移量的方式對應(yīng)用程序會有很大的影響。KafkaConsumer API 提供了多種方式來提交偏移量。
自動提交
最簡單的方式是消費(fèi)者自動提交偏移量。如果 enable.auto.commit 設(shè)為 true,那 么每過一定時間間隔,消費(fèi)者會自動把從 poll() 方法接收到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認(rèn)是5s。
自動提交是在輪詢里進(jìn)行的。消費(fèi)者每次在進(jìn)行輪詢時會檢查是否需要提交偏移量,如果是,那么會提交從上一次輪詢返回的偏移量。
假設(shè)我們使用默認(rèn)的5s提交時間間隔,在最近一次提交之后的3s發(fā)生了再均衡,再 均衡之后,消費(fèi)者從最后一次提交的偏移量位置開始讀取消息。這個時候偏移量已經(jīng)落后了3s,這3s內(nèi)的數(shù)據(jù)已經(jīng)處理過,再次消費(fèi)是還會獲取到。通過調(diào)低提交時間間隔來更頻繁地提交偏移量,減小可能出現(xiàn)重復(fù)消費(fèi)的時間窗,不過這種情況是無法完全避免的。
在使用自動提交時,每次調(diào)用輪詢方法都會把上一次調(diào)用返回的偏移量提交上去,并不 知道具體哪些消息已經(jīng)被處理了,所以在再次調(diào)用之前最好確保所有當(dāng)前調(diào)用返回的消息都已經(jīng)處理完畢(在調(diào)用 close() 方法前也會進(jìn)行自動提交)。
在處理異?;蛱崆巴顺鲚喸儠r要格外小心。自動提交雖然方便,不過并沒有為開發(fā)者留有余地來避免重復(fù)處理消息。
提交當(dāng)前偏移量
KafkaConsumer API 提供的另一種提交偏移量的方式,程序主動觸發(fā)提交當(dāng)前偏移量,而不是基于時間間隔自動提交。
把 auto.commit.offset 設(shè)為 false,使用 commitSync() 方法提交偏移量最簡單也最可靠,該方法會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
需要注意,commitSync() 將會提交 poll() 返回的最新偏移量,在處理完所有記錄后調(diào)用 commitSync(),否則還是會有丟失消息的風(fēng)險。
commitSync() 提交偏移量的例子:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// 處理消息的邏輯省略
}
try {
// poll 的數(shù)據(jù)全部處理完提交
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
只要沒有發(fā)生不可恢復(fù)的錯誤,commitSync() 會一直嘗試直至提交成功。如果提交 失敗會拋出 CommitFailedException 異常。
異步提交
手動提交有一個不足之處,在 broker 對提交請求作出回應(yīng)之前,應(yīng)用程序會阻塞,這會影響應(yīng)用程序的吞吐量??梢允褂卯惒教峤坏姆绞剑坏却?broker 的響應(yīng)。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
// 異步提交
consumer.commitAsync();
}
在成功提交或發(fā)生無法恢復(fù)的錯誤之前,commitSync() 會一直嘗試直至提交成功,但是 commitAsync() 不會,這也是該方法的一個問題。之所以不進(jìn)行重試,是因為在收到服務(wù)器響應(yīng)之前,可能有一個更大的偏移量已經(jīng)提交成功。
假設(shè)我們發(fā)出一個請求提交偏移量2000,這個時候發(fā)生了短暫的通信問題,服務(wù)器收不到請求,與此同時,程序處理了另外一批消息,并成功提交了偏移量3000。如果 commitAsync() 重新嘗試提交偏移量2000,有可能將偏移量3000改為2000,這個時候如果發(fā)生再均衡,就會出現(xiàn)重復(fù)消息。
commitAsync() 支持回調(diào),在 broker 作出響應(yīng)時會執(zhí)行回調(diào)。回調(diào)經(jīng)常被用于記錄提交錯誤或生成度量指標(biāo),如果要用它來進(jìn)行重試,一定要注意提交的順序。
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
// 提交完成時回回調(diào)此函數(shù)
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
重試異步提交
可以使用一個單調(diào)遞增的序列號來維護(hù)異步提交的順序。在每次提交偏移量之后或在回調(diào)里提交偏移量時遞增序列號。在進(jìn)行重試前,先檢查回調(diào)的序列號和即將提交的偏移量是否相等,如果相等,說明沒有新的提交,那么可以安全地進(jìn)行重試。如果序列號比較大,說明有一個新的提交已經(jīng)發(fā)送出去了,放棄重試。
同步與異步混合提交
一般情況下,針對偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會有太大問題,如果因為臨時網(wǎng)絡(luò)問題導(dǎo)致的,那么后續(xù)的提交總會有成功的。但如果這是發(fā)生在關(guān)閉消費(fèi)者或再均衡前的最后一次提交,就要確保能夠提交成功。
在消費(fèi)者關(guān)閉前一般會組合使用 commitAsync() 和 commitSync():
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
// 異步提交
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// 同步提交
consumer.commitSync();
} finally {
consumer.close();
}
}
在正常處理流程中,使用異步提交來提高性能,最后使用同步提交來保證位移提交成功。
提交特定的偏移量
一般提交偏移量的頻率與處理消息批次的頻率是一樣的。如果想要更頻繁地提交怎么辦?如果 poll() 方法返回一大批數(shù)據(jù),為了避免因再均衡引起的重復(fù)處理整批消息,想要在批次中間提交偏移量該怎么辦?
這種情況無法通過調(diào)用 commitSync() 或 commitAsync() 來實現(xiàn),只會提交最后一個偏移量,而此時該批次里的消息還沒有處理完。
KafkaConsumer API 允許在調(diào)用 commitSync() 和 commitAsync() 方法時傳進(jìn)去希望提交的分區(qū)和偏移量的 map。因為消費(fèi)者可能不只讀取一個分區(qū),需要跟蹤所有分區(qū)的偏移量,所以在這個層面上控制偏移量的提交會讓代碼變復(fù)雜。
// 記錄分區(qū)的 offset 信息
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
int count = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic=%s, partition=%s, offset=%d, customer=%s, country=%s",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
// 省略消息處理邏輯 ...
// 記錄分區(qū)的 offset
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no metadata")
);
// 最多每處理 1000 條記錄就提交一次偏移量
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}
再均衡監(jiān)聽器
消費(fèi)者在退出和進(jìn)行分區(qū)再均衡之前,如果消費(fèi)者知道要失去對一個分區(qū)的所有權(quán),它可能需要提交最后一個已處理記錄的偏移量。KafakConsumer API 可以在消費(fèi)者新增分區(qū)或者失去分區(qū)時進(jìn)行處理,在調(diào)用 subscribe() 方法時傳入 ConsumerRebalanceListener 對象,該對象有兩個方法:
public void onPartitionRevoked(Collection partitions)
在消費(fèi)者停止消費(fèi)消費(fèi)后,在再均衡開始前調(diào)用。public void onPartitionAssigned(Collection partitions)
在分區(qū)分配給消費(fèi)者后,在消費(fèi)者開始讀取消息前調(diào)用。
下面來看一個的例子,在消費(fèi)者失去某個分區(qū)時提交 offset,以便其他消費(fèi)者可以接著消費(fèi)消息并處理:
// 記錄分區(qū)的 offset 信息
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
// 如果發(fā)生再均衡,即將失去分區(qū)所有權(quán)時提交偏移量。
// 提交的是最近處理過的偏移量,而不是批次中還在處理的最后一個偏移量。
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. Committing current offsets:"
+ currentOffsets);
consumer.commitSync(currentOffsets);
}
}
// ...
try {
// 把 ConsumerRebalanceListener 對象傳給 subscribe() 方法
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()+1, "no metadata")
);
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
}
}
從指定位移開始消費(fèi)
除了讀取最近一次提交的位置開始消費(fèi)數(shù)據(jù),有時候也需要從特定的偏移量處開始讀取消息。
如果想從分區(qū)起始位置開始消費(fèi),可以使用 seekToBeginning(TopicPartition tp);如果想從分區(qū)的最末端消費(fèi)最新的消息,可以使用 seekToEnd(TopicPartition tp)。Kafka 還支持從指定 offset 處開始消費(fèi)。最典型的一個是:offset 維護(hù)在其他系統(tǒng)(例如數(shù)據(jù)庫)中,并且以其他系統(tǒng)的值為準(zhǔn)。
考慮下面的場景:從 Kafka 中讀取消息進(jìn)行處理,最后把結(jié)果寫入數(shù)據(jù)庫,可能會按如下邏輯處理:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
看似正確的邏輯要注意的是,在持久化到數(shù)據(jù)庫成功后,提交位移到 Kafka 可能會失敗,出現(xiàn)不一致的情況,那么這可能會導(dǎo)致消息會重復(fù)處理。對于這種情況,我們需要將持久化到數(shù)據(jù)庫與提交 offset 實現(xiàn)為原子性操作,最簡單的做法,在保存記錄到數(shù)據(jù)庫的同時保存 offset 信息,在消費(fèi)者開始消費(fèi)時指定數(shù)據(jù)庫的 offset 開始消費(fèi)。
只需要通過 seek() 來指定分區(qū)位移開始消費(fèi)即可:
class SaveOffsetsOnRebalance implements ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
// 在分區(qū)被回收前提交數(shù)據(jù)庫事務(wù),保存消費(fèi)的記錄和位移
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// 在開始消費(fèi)前,從數(shù)據(jù)庫中獲取分區(qū)的位移,使用 seek() 指定開始消費(fèi)的偏移量
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
// ...
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
// 調(diào)用一次 poll() 方怯,讓消費(fèi)者加入到消費(fèi)者群組里,并獲取分配到的分區(qū)
consumer.poll(0);
// 然后馬上調(diào)用 seek() 方法定位分區(qū)的偏移量。
// seek() 方法只更新我們正在使用的位置,在下一次調(diào)用 poll() 時就可以獲得正確的消息。
// 如果 seek() 發(fā)生錯誤, poll() 就會拋出異常。
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
// 保存記錄結(jié)果
storeRecordInDB(record);
// 保存位移信息
storeOffsetInDB(record.topic(), record.partition(), record.offset());
}
// 提交數(shù)據(jù)庫事務(wù)
commitDBTransaction();
}
優(yōu)雅退出
一般情況下,在主線程中循環(huán) poll() 消息并進(jìn)行處理。當(dāng)需要退出循環(huán)時,使用另一個線程調(diào)用 consumer.wakeup(),會使得 poll() 拋出 WakeupException。如果主線程正在處理消息,那么在下一次主線程調(diào)用 poll() 時會拋出異常。樣例代碼:
// 注冊 JVM 關(guān)閉時的回調(diào),當(dāng) JVM 關(guān)閉時調(diào)用
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
// 調(diào)用消費(fèi)者的 wakeup 方法通知主線程退出
consumer.wakeup();
try {
// 等待主線程退出
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
...
// 消費(fèi)主線程
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
// ...
}
consumer.commitSync();
}
} catch (WakeupException e) {
// ignore
} finally {
consumer.close();
}
消息序列化
Kafka 生產(chǎn)者將對象序列化成字節(jié)數(shù)組并發(fā)送到服務(wù)器,消費(fèi)者需要將字節(jié)數(shù)組轉(zhuǎn)換成對象(反序列化)。序列化與反序列化需要匹配,與生產(chǎn)者類似,推薦使用 Avro 序列化方式。
使用 Avro 反序列化
樣例代碼如下(與生產(chǎn)者實現(xiàn)類似):
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
// 這里使用之前生產(chǎn)者使用的Avro生成的Customer類
ConsumerRecords<String, Customer> records = consumer.poll(1000);
for (ConsumerRecord<String, Customer> record: records) {
System.out.println("Current customer name is: " + record.value().getName());
}
consumer.commitSync();
}
獨立消費(fèi)者
一般情況下都是使用消費(fèi)者組(即使只有一個消費(fèi)者)來消費(fèi)消息的,這樣可以在增加或減少消費(fèi)者時自動進(jìn)行分區(qū)重平衡,這種方式是推薦的。
在知道主題和分區(qū)的情況下,也可以使用單個消費(fèi)者來進(jìn)行消費(fèi),需要實現(xiàn)給消費(fèi)者分配分區(qū),而不是讓消費(fèi)者訂閱主題。代碼樣例:
// 獲取主題下所有的分區(qū)
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
// 為消費(fèi)者指定分區(qū)
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {
// ...
}
consumer.commitSync();
}
}
除了需要主動獲取分區(qū)以及沒有分區(qū)重平衡,其他的處理邏輯是一樣的。需要注意的是,如果添加了新的分區(qū),這個消費(fèi)者是感知不到的,需要通過 consumer.partitionsFor() 來重新獲取分區(qū)。
《Kafka權(quán)威指南》