【Kafka源碼】KafkaConsumer

[TOC]


KafkaConsumer是從kafka集群消費(fèi)消息的客戶(hù)端。這是kafka的高級(jí)消費(fèi)者,而SimpleConsumer是kafka的低級(jí)消費(fèi)者。何為高級(jí)?何為低級(jí)?

我們所謂的高級(jí),就是可以自動(dòng)處理kafka集群的失敗信息,也可以適應(yīng)kafka集群中消息的分區(qū)遷移。這個(gè)客戶(hù)端也可以與服務(wù)端進(jìn)行交互,使用消費(fèi)者分組負(fù)載平衡消費(fèi),下面我們具體會(huì)講解。

消費(fèi)者與對(duì)應(yīng)的broker保持TCP連接,來(lái)獲取數(shù)據(jù)。使用完成后關(guān)閉消費(fèi)者連接失敗,會(huì)泄露連接。這個(gè)消費(fèi)者不是線(xiàn)程安全的。

一、偏移量和消費(fèi)者位置Offsets And Consumer position

Kafka在分區(qū)中為每條記錄維護(hù)了一個(gè)數(shù)字形式的偏移量。這個(gè)偏移量是數(shù)據(jù)在分區(qū)中的唯一值,也可以表示為消費(fèi)者在分區(qū)中的偏移量。例如,一個(gè)消費(fèi)者的偏移量為5,表示偏移量為0到4的消息已經(jīng)被消費(fèi)過(guò)。關(guān)于消費(fèi)者使用的偏移量,有兩個(gè)比較重要的概念。

1.1 TopicPartition

消費(fèi)者的偏移量表示消費(fèi)者下一個(gè)需要消費(fèi)的消息的偏移量。這個(gè)值會(huì)比當(dāng)前消費(fèi)者在那個(gè)分區(qū)剛剛消費(fèi)的消息偏移量大一。這個(gè)值在下面情況下會(huì)自動(dòng)增長(zhǎng):消費(fèi)者調(diào)用了poll(long)并且獲取到了消息。

1.2 committed position提交偏移量

這個(gè)committed position表示最新的被安全保存的偏移量。如果當(dāng)前過(guò)程中失敗然后重啟了,這個(gè)是重啟后消費(fèi)的偏移量起點(diǎn)。消費(fèi)者有三種方式來(lái)提交偏移量:

  • 自動(dòng)定時(shí)提交
  • 同步提交。手動(dòng)提交偏移量,使用的方法是commitSync(),這個(gè)方法會(huì)一直被阻塞,直到偏移量被成功提交了,或者在提交過(guò)程中發(fā)生了嚴(yán)重的錯(cuò)誤。
  • 異步提交。使用的方法是commitAsync(OffsetCommitCallback),在成功或者發(fā)生嚴(yán)重錯(cuò)誤后,會(huì)觸發(fā)OffsetCommitCallback方法。

二、消費(fèi)分組和主題訂閱Consumer Groups and Topic Subscriptions

Kafka使用消費(fèi)分組的概念,允許一個(gè)處理池來(lái)將消費(fèi)和處理過(guò)程分開(kāi)。這些處理可以在同一臺(tái)機(jī)器上運(yùn)行,也可以分布在多臺(tái)機(jī)器上,來(lái)提供擴(kuò)展性和容錯(cuò)。

每個(gè)Kafka消費(fèi)者都可以配置自己所屬的消費(fèi)分組,并且可以通過(guò)接口subscribe(Collection, ConsumerRebalanceListener)動(dòng)態(tài)設(shè)置訂閱的主題。Kafka會(huì)把每條消息傳遞到分組中的某個(gè)運(yùn)行過(guò)程。這是通過(guò)平衡消費(fèi)分組中的每個(gè)消費(fèi)者對(duì)應(yīng)的分區(qū)來(lái)實(shí)現(xiàn)的,最終實(shí)現(xiàn)的是每個(gè)分組正好被分配到分組的某個(gè)消費(fèi)者。所以如果一個(gè)主題有4個(gè)分區(qū),一個(gè)消費(fèi)分組有兩個(gè)消費(fèi)者process,每個(gè)process會(huì)消費(fèi)兩個(gè)分區(qū)。

消費(fèi)分組中的成員是動(dòng)態(tài)的:如果某個(gè)process掛了,分配給他的分區(qū)會(huì)被分到組中其他的process。類(lèi)似的,如果一個(gè)新的消費(fèi)者加入了分組,分區(qū)會(huì)遷移到新的分組上。這被稱(chēng)為分組平衡。需要注意的是,當(dāng)訂閱的主題中新的分區(qū)出現(xiàn)的時(shí)候,相同的情況也會(huì)出現(xiàn):分組不斷地檢測(cè)新的分區(qū),平衡分組,最終每個(gè)分區(qū)都被分配到某個(gè)組成員上。

概念上,你可以把消費(fèi)分組想象成一個(gè)單獨(dú)的邏輯上的消費(fèi)者,恰巧有多個(gè)消費(fèi)進(jìn)程。作為一個(gè)多訂閱的系統(tǒng),kafka天然支持某個(gè)主題有多個(gè)消費(fèi)分組,而數(shù)據(jù)不會(huì)重復(fù)。

這些功能對(duì)于一個(gè)消息系統(tǒng)來(lái)說(shuō)很普通。和傳統(tǒng)的消息隊(duì)列不同,你可以同時(shí)有很多的分組。在傳統(tǒng)消息系統(tǒng)中,每個(gè)消費(fèi)者都會(huì)有自己的消費(fèi)分組,所以每個(gè)消費(fèi)者會(huì)訂閱主題下的所有記錄,也就是會(huì)收到所有的消息。

而且,當(dāng)分組重新分配自動(dòng)出現(xiàn)時(shí),會(huì)通過(guò)ConsumerRebalanceListener通知消費(fèi)者,然后消費(fèi)者自身處理一些應(yīng)用級(jí)的邏輯,比如狀態(tài)清除,手動(dòng)提交offset等等。

對(duì)消費(fèi)者來(lái)說(shuō),還可以手動(dòng)分配分區(qū),使用的方法是assign(類(lèi)似于SimpleConsumer)。在這種情況下,動(dòng)態(tài)分區(qū)調(diào)整和消費(fèi)分組協(xié)調(diào)功能會(huì)被禁用。

三、檢測(cè)消費(fèi)者失敗Detecting Consumer Failures

訂閱一批主題之后,消費(fèi)者在調(diào)用poll的時(shí)候,會(huì)自動(dòng)加入分組。poll是用于確保消費(fèi)者的存活。只要消費(fèi)者不停地調(diào)用poll,那么他就會(huì)一直存在于分組中,并且不斷地收到對(duì)應(yīng)分區(qū)推送給他的消息。另外,poll方法也會(huì)定時(shí)發(fā)送心跳給服務(wù)端,當(dāng)你停止調(diào)用poll時(shí),心跳也會(huì)停止。如果server超過(guò)session.timeout時(shí)間沒(méi)有收到心跳,消費(fèi)者會(huì)被踢出分組,分區(qū)也會(huì)重新分配。這是為了防止消費(fèi)者掛掉之后,還占用分區(qū)的情況發(fā)生(這種情況下分組中的其他消費(fèi)者無(wú)法消費(fèi)到消息)。為了繼續(xù)存在于分組中,消費(fèi)者必須調(diào)用poll方法證明還活著。

這個(gè)設(shè)計(jì)的目的還在于,一個(gè)poll循環(huán)中的消息處理過(guò)程的時(shí)間必須是有界的,那樣心跳才能在session.timeout之前發(fā)出去。消費(fèi)者提供兩個(gè)配置來(lái)控制這種行為:

  • session.timeout.ms:通過(guò)增加這個(gè)值,消費(fèi)者可以有更多的時(shí)間來(lái)處理poll返回的一批消息。唯一的缺點(diǎn)就是服務(wù)端要耗費(fèi)更多的時(shí)間來(lái)檢測(cè)消費(fèi)者是否存活,這可能會(huì)導(dǎo)致分組平衡的延遲。然后,這不會(huì)影響close方法的調(diào)用,因?yàn)橐坏┱{(diào)用了這個(gè)方法,消費(fèi)者會(huì)發(fā)送一個(gè)明確的消息給服務(wù)端,離開(kāi)分組,分組平衡會(huì)被立即觸發(fā)。
  • max.poll.records:一個(gè)poll循環(huán)的處理時(shí)間應(yīng)該和消息的數(shù)量成正比。所以應(yīng)該設(shè)置一次最多處理多少條數(shù)據(jù)。默認(rèn)情況下,這個(gè)值沒(méi)有限制消息的數(shù)量。

三、舉例

當(dāng)前消費(fèi)客戶(hù)端提供了多種方法來(lái)消費(fèi),下面是幾個(gè)例子。

3.1 自動(dòng)提交Automatic Offset Committing

這個(gè)方法說(shuō)明了kafka消費(fèi)客戶(hù)端的簡(jiǎn)單使用,依賴(lài)于自動(dòng)提交offset。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}

配置enable.auto.commit表示是自動(dòng)提交offset,并且提交的頻率為auto.commit.interval.ms。服務(wù)器可以通過(guò)bootstrap.servers來(lái)配置,可以不用配置全部的服務(wù)器,因?yàn)闀?huì)自動(dòng)發(fā)現(xiàn)集群中所有的服務(wù)器。當(dāng)然不建議只配置一個(gè),因?yàn)槿绻@個(gè)掛掉了,就找不到其他的機(jī)器了。

反序列化配置表示如何把二進(jìn)制消息轉(zhuǎn)換為消息對(duì)象。例如,使用string反序列化,表示消息的key和value都是字符串。

3.2 手動(dòng)提交偏移量Manual Offset Control

與自動(dòng)提交不同,我們可以通過(guò)配置來(lái)控制消息什么時(shí)候消費(fèi)完成,并提交偏移量。在一條消息需要多個(gè)任務(wù)處理,所有任務(wù)完成后才能提交偏移量的場(chǎng)景下,需要手動(dòng)提交。在下面的例子中,我們會(huì)一次消費(fèi)一批數(shù)據(jù),然后把他們放到內(nèi)存中,當(dāng)消息達(dá)到一定的數(shù)量時(shí),我們會(huì)把他們插入數(shù)據(jù)庫(kù)中。如果這種情況下,我們配置為自動(dòng)提交,那么就會(huì)出現(xiàn)消息被消費(fèi),但是實(shí)際上并沒(méi)有插入到數(shù)據(jù)庫(kù)的情況。為了避免這種情況,我們必須在消息插入數(shù)據(jù)庫(kù)之后,手動(dòng)提交偏移量。這也會(huì)出現(xiàn)另一種情況,就是消息插入數(shù)據(jù)庫(kù)成功,但是在提交偏移量的過(guò)程中失敗。這種情況下,其他的消費(fèi)者會(huì)繼續(xù)讀取偏移量,然后重新執(zhí)行批量插入數(shù)據(jù)庫(kù)的操作。這么使用的話(huà),kafka提供的是“至少一次”的消息保證,也就是消息至少會(huì)被傳遞一次,但是消費(fèi)失敗的情況下會(huì)重復(fù)消費(fèi)。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        buffer.add(record);
    }
    if (buffer.size() >= minBatchSize) {
        insertIntoDb(buffer);
        consumer.commitSync();
        buffer.clear();
    }
}

上面的例子使用的是同步提交commitSync()方法。在某些情況下,你可能需要對(duì)消息消費(fèi)有更加精確的控制,下面的例子中,我們按照分區(qū)提交偏移量。

try {
    while(running) {
        ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                System.out.println(record.offset() + &quot;: &quot; + record.value());
            }
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }
} finally {
  consumer.close();
}

注意:提交的偏移量應(yīng)該是下一次消費(fèi)的消息的偏移量,所以commitSync(offsets)這個(gè)方法中的內(nèi)容,應(yīng)該是當(dāng)前消息偏移量加上一。

3.3 手動(dòng)分配分區(qū)Manual Partition Assignment

前面的例子中,消費(fèi)者訂閱主題,然后服務(wù)端動(dòng)態(tài)分配了分區(qū)給消費(fèi)者。在某些情況下,我們需要精確控制消費(fèi)的分區(qū),例如:

消費(fèi)者維護(hù)了與分區(qū)相關(guān)的本地狀態(tài)(例如本地磁盤(pán)鍵值存儲(chǔ)),那么他應(yīng)該只讀取特定分區(qū)的數(shù)據(jù)。如果消費(fèi)者本身是高可用的,在掛掉之后會(huì)自動(dòng)重啟(可能正在使用集群管理框架,比如YARN,Mesos或者AWS,或者作為流式處理框架)。這種情況下,kafka沒(méi)必要檢測(cè)消費(fèi)者的存活,重新分配分區(qū),因?yàn)橄M(fèi)進(jìn)程會(huì)在同樣的機(jī)器上重啟。

為了使用這種模式,我們不能使用subscribe(Collection),而是應(yīng)該使用assign(Collection)方法,來(lái)指定一批分區(qū)。

String topic = "foo";                                    
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));  

這種情況下,不會(huì)使用到分組協(xié)調(diào)器,所以消費(fèi)者掛掉的情況,也不會(huì)重新分配分區(qū)。所以每個(gè)消費(fèi)者都是獨(dú)立的,為了避免偏移量提交沖突,每個(gè)消費(fèi)者的分組信息應(yīng)該是唯一的。

3.4 在kafka外存儲(chǔ)偏移量Storing Offsets Outside Kafka

消費(fèi)者應(yīng)用可能不想把offset存到kafka中,所以kafka也提供了把offset存儲(chǔ)到其他地方的接口。這種情況下,就能自己實(shí)現(xiàn)消息只消費(fèi)一次的場(chǎng)景,比至少一次強(qiáng)很多。那么我們?nèi)绾问褂媚兀?/p>

  • 首先需要配置enable.auto.commit=false
  • 使用ConsumerRecord提供的偏移量來(lái)存儲(chǔ)你的offset
  • 重啟時(shí)使用seek(TopicPartition, long)來(lái)發(fā)現(xiàn)重啟前的offset

這種場(chǎng)景在手動(dòng)分配分區(qū)的情況下很簡(jiǎn)單。如果分區(qū)分配是自動(dòng)的,我們需要特殊處理分區(qū)分配改變的情況。這個(gè)可以通過(guò)提供ConsumerRebalanceListener實(shí)例,在調(diào)用subscribe(Collection, ConsumerRebalanceListener)和subscribe(Pattern, ConsumerRebalanceListener)時(shí)實(shí)現(xiàn)。

3.5 控制消費(fèi)者位點(diǎn)Controlling The Consumer's Position

大多數(shù)情況下,消費(fèi)者會(huì)簡(jiǎn)單的從頭到尾消費(fèi)消息,定時(shí)提交位點(diǎn)(自動(dòng)或手動(dòng))。然后,kafka允許消費(fèi)者手動(dòng)控制位點(diǎn),可以設(shè)置位點(diǎn)的位置。這意味著消費(fèi)者可以消費(fèi)已經(jīng)消費(fèi)過(guò)的消息,也可以跳過(guò)最新的消息。

kafka可以通過(guò)seek(TopicPartition, long)方法來(lái)指定消費(fèi)起點(diǎn),尋找早的或者新的位點(diǎn),也可以通過(guò)seekToBeginning(Collection)和seekToEnd(Collection)來(lái)指定。

3.6 消費(fèi)流量控制

如果一個(gè)消費(fèi)者被分配到了多個(gè)分區(qū),他會(huì)嘗試同時(shí)消費(fèi)所有的分區(qū),所有的分區(qū)的權(quán)重一樣。然而,在某些情況下,消費(fèi)者需要首先全速消費(fèi)某些特定的分區(qū),當(dāng)這個(gè)分區(qū)沒(méi)有消息后再消費(fèi)其他的分區(qū)。

例如,流式處理。消費(fèi)者同時(shí)從兩個(gè)主題消費(fèi),然后把消息合并。當(dāng)某個(gè)主題落后于另一個(gè)主題很多時(shí),消費(fèi)者應(yīng)該停止消費(fèi)快的那個(gè)主題,等慢的那個(gè)趕上來(lái)。再比如,有個(gè)主題有很多歷史數(shù)據(jù)需要被消費(fèi),這種情況下,消費(fèi)者應(yīng)該優(yōu)先消費(fèi)那些有最新消息的主題。

kafka支持動(dòng)態(tài)控制消費(fèi)流量,通過(guò)pause(Collection)和resume(Collection)方法。

四、多線(xiàn)程消費(fèi)Multi-threaded Processing

kafka消費(fèi)者不是線(xiàn)程安全的。所有的網(wǎng)絡(luò)IO操作都在發(fā)起調(diào)用的一個(gè)線(xiàn)程中執(zhí)行。他需要保證多線(xiàn)程時(shí)的線(xiàn)程安全。不同的操作會(huì)引起ConcurrentModificationException。

我們?cè)谕獠烤€(xiàn)程中可以調(diào)用wakeup()方法來(lái)停止當(dāng)前的操作。這種情況下,可能會(huì)從阻塞操作的線(xiàn)程拋出org.apache.kafka.common.errors.WakeupException異常。這可以用于在另一個(gè)線(xiàn)程中停止當(dāng)前的消費(fèi)者。

public class KafkaConsumerRunner implements Runnable {
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer consumer;

    public void run() {
        try {
            consumer.subscribe(Arrays.asList("topic"));
            while (!closed.get()) {
                ConsumerRecords records = consumer.poll(10000);
                // Handle new records
            }
        } catch (WakeupException e) {
            // Ignore exception if closing
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
        }
    }

    // Shutdown hook which can be called from a separate thread
    public void shutdown() {
        closed.set(true);
        consumer.wakeup();
    }
}

在另一個(gè)線(xiàn)程中,可以通過(guò)closed標(biāo)識(shí)來(lái)關(guān)閉或者啟動(dòng)消費(fèi)者。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容