Kafka消費(fèi)者

1 消費(fèi)者概念

1.1 消費(fèi)者與消費(fèi)者組

應(yīng)用程序--->kafka--->應(yīng)用程序

生產(chǎn)者 ? ? 主題 ? ?? 消費(fèi)者

1. 上游應(yīng)用程序?qū)?shù)據(jù)發(fā)送到主題中再由下游應(yīng)用程序讀取、驗(yàn)證數(shù)據(jù)。

2. 出現(xiàn)的可能性情況:生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度超過消費(fèi)者驗(yàn)證數(shù)據(jù)的速度

這個(gè)時(shí)候就可以使用消費(fèi)者組,由消費(fèi)者組訂閱主題,消費(fèi)者組中的每個(gè)消費(fèi)者分別消費(fèi)部分分區(qū)的數(shù)據(jù),實(shí)現(xiàn)消費(fèi)端的橫向擴(kuò)展

3. 需要注意的地方:消費(fèi)者組中的消費(fèi)者數(shù)量應(yīng)不超過分區(qū)數(shù),避免造成資源浪費(fèi)

1.2 消費(fèi)者組和分區(qū)再均衡

1. 什么是再均衡?

? ? 分區(qū)的所有權(quán)限由一個(gè)消費(fèi)者轉(zhuǎn)向另一個(gè)消費(fèi)者,這種行為就是再均衡。

2. 優(yōu)勢(shì)?

? ? 實(shí)現(xiàn)了消費(fèi)者組的高可用性和伸縮性

3. 不足?

? ? 再均衡期間,消費(fèi)者無法讀取消息,造成整個(gè)群組一小段時(shí)間處于不可用的狀態(tài)

? ? 當(dāng)分區(qū)被重新分給另一個(gè)消費(fèi)者時(shí),消費(fèi)者當(dāng)前的讀取狀態(tài)會(huì)丟失,有可能還需要刷新緩存,從而拖慢應(yīng)用程序

4. 消費(fèi)者與消費(fèi)者組的通信

? ? 消費(fèi)者通過向被指派為`群組協(xié)調(diào)器`的broker發(fā)送心跳來維持他們和群組的從屬關(guān)系以及他們對(duì)分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常時(shí)間間隔發(fā)送心跳,就認(rèn)為活躍,否則就會(huì)進(jìn)行再均衡。

5. 如果一個(gè)消費(fèi)者發(fā)生崩潰,并停止讀取消息,群組協(xié)調(diào)器會(huì)等待幾秒鐘,確認(rèn)他死亡了才會(huì)觸發(fā)再均衡。在此期間,死掉的消費(fèi)者不會(huì)讀取分區(qū)中的消息。在清理消費(fèi)者是,消費(fèi)者會(huì)通知協(xié)調(diào)器,此時(shí)會(huì)立即觸發(fā)一次再均衡。

6. 如何分配分區(qū)?

? ? 當(dāng)有消費(fèi)者加入群組時(shí),他會(huì)向群組協(xié)調(diào)器發(fā)送一個(gè)JoinGroup請(qǐng)求。第一個(gè)加入群組的消費(fèi)者成為“群主”。群主從協(xié)調(diào)器那里獲得群組的成員列表,并負(fù)責(zé)給每個(gè)消費(fèi)者分配分區(qū)。

2 創(chuàng)建消費(fèi)者

2.1 創(chuàng)建所需的必選屬性

屬性描述

bootstrap.serversbroker的地址清單,地址格式:host:port

key.serializer鍵的序列化器

value.serializer值的序列化器

2.2 創(chuàng)建代碼示例

producer.properties

bootstrap.servers=linux121:9092,linux122:9092,linux123:9092

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

kafka_customer

Propertiesprops=newProperties();

props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

2.3 訂閱主題

consumer.subscribe(Collections.singletonList("linux-test-01"));

2.4 輪詢

while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records)

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

}

2.5 配置信息

配置描述

fetch.min.bytes指定消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker在收到消費(fèi)者的數(shù)據(jù)請(qǐng)求時(shí),如果可用的數(shù)據(jù)量小于該配置指定的數(shù)據(jù)量,會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才會(huì)把他返回給消費(fèi)者。好處:可以降低消費(fèi)者和broker的負(fù)載。

fetch.max.wait.ms指定broker的等待時(shí)間。默認(rèn)500ms如果沒有足夠的數(shù)據(jù)流入kafka,消費(fèi)者獲取最小數(shù)據(jù)量又無法得到滿足,最導(dǎo)致500ms的延遲。

max.partition.fetch.bytes指定服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)。默認(rèn)為1M,即poll方法從每個(gè)分區(qū)里返回的記錄最多不超過該屬性的指定值。在位消費(fèi)者分配內(nèi)存時(shí),可以給他多分配一些,因?yàn)槿绻航M中的消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū),而該值又必須大于brocker可以接受的最大消息的字節(jié)數(shù),否則會(huì)使消費(fèi)者一直處于掛起狀態(tài)。另外,還需要考慮消費(fèi)者處理數(shù)據(jù)的時(shí)間。消費(fèi)者需要頻繁調(diào)用poll方法來避免會(huì)話過期和發(fā)生分區(qū)在均衡。如果單次調(diào)用poll返回的數(shù)據(jù)太多,消費(fèi)者需要用更多的時(shí)間來處理數(shù)據(jù),可能無法及時(shí)進(jìn)行下一次輪詢來避免會(huì)話過期。

session.timeout.ms指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間。需要同時(shí)修改該屬性和heartbeat.interval.ms,heartbeat.interval.ms必須小于該屬性,一般是該屬性的1/3。該屬性的值越小,可以越快的檢測(cè)和恢復(fù)崩潰的節(jié)點(diǎn),不過長(zhǎng)時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡。把該屬性的值設(shè)置的大一些,可以減少意外的再均衡,不過檢測(cè)節(jié)點(diǎn)崩潰需要更長(zhǎng)的時(shí)間。

auto.offset.reset指定消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效(因消費(fèi)者長(zhǎng)時(shí)間失效,報(bào)刊偏移量的記錄已經(jīng)過時(shí)并被刪除)的情況下該作何處理,值{latest(默認(rèn),從最近的消息開始讀?。?,earliest(從開始讀?。﹠

enable.auto.commit指定了消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)為true,可以通過auto.commit.interval.ms來控制提交頻率

partition.assignment.strategy設(shè)置選擇分區(qū)策略。Range:把主題的若干個(gè)連續(xù)的分區(qū)分配給消費(fèi)者。RoundRobin:把主題中所有分區(qū)逐個(gè)分配給消費(fèi)者。

max.poll.records用于控制單詞調(diào)用call()方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量。

receive.buffer.bytes和send.buffer.bytes設(shè)置socket在讀寫數(shù)據(jù)時(shí)用到的TCP緩沖區(qū)的大小。如果為-1,則使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或者消費(fèi)者與broker處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和較低的寬帶。

2.6 提交和偏移量

2.6.1 自動(dòng)提交

poll方法中返回的是生產(chǎn)者寫進(jìn)去但是還沒有被消費(fèi)者所消費(fèi)的這部分?jǐn)?shù)據(jù)

消費(fèi)者發(fā)消息(包含分區(qū)的偏移量)給_consumer_offset這個(gè)主題

自動(dòng)提交設(shè)置:enable.auto.commit-->true

自動(dòng)提交產(chǎn)生的問題:當(dāng)當(dāng)前處理的數(shù)據(jù)偏移量大于提交的數(shù)據(jù)偏移量的話會(huì)造成數(shù)據(jù)重復(fù)

解決辦法:縮短提交偏移量的時(shí)間差,auto.commit.interval.ms

2.6.2 提交當(dāng)前偏移量

publicstaticvoidmain(String[]args)throwsIOException{

Propertiesprops=newProperties();

props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("linux-test-01"));

while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records) {

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

try{

consumer.commitSync();

}catch(Exceptione) {

e.printStackTrace();

? ? ? ? ?? }

? ? ?? }

?? }

}

不足:在broker對(duì)提交請(qǐng)求做出回應(yīng)之前,應(yīng)用程序會(huì)一直阻塞,這樣會(huì)限制應(yīng)用程序的吞吐量。

2.6.3 異步提交

while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records) {

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

?? }

// 提交最后一個(gè)偏移量,然后繼續(xù)做其他的事

consumer.commitAsync();

}

在成功提交后碰到無法恢復(fù)的錯(cuò)誤之前,commitSync()會(huì)一直重試,但是commitAsync不會(huì),這個(gè)也是他的不足之處。


不重試的原因:他收到服務(wù)器相應(yīng)的時(shí)候可能有一個(gè)更大的偏移量已經(jīng)提交成功了。

commitAsync支持回調(diào),在broker做出響應(yīng)時(shí)執(zhí)行回調(diào),經(jīng)常被用于記錄提交錯(cuò)誤或生成度量指標(biāo)。


while(true) {

ConsumerRecords<String,String>records=consumer.poll(100);

for(ConsumerRecord<String,String>record:records) {

System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());

?? }

// consumer.commitAsync();

consumer.commitAsync((offsets,exception)->{

if(exception!=null) {

System.out.println("Commit failed for offsets {}"+offsets+exception);

? ? ?? }

?? });

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 此篇開始進(jìn)入kafka的另外一側(cè):消費(fèi)者。kafka中的消費(fèi)者比生產(chǎn)者要復(fù)雜的多,里面涉及到的消費(fèi)組,偏移量等概念...
    紹圣閱讀 2,052評(píng)論 0 0
  • 1、消費(fèi)者和消費(fèi)者組 消費(fèi)者負(fù)責(zé)訂閱Kafka中的主題,并從訂閱的主題中拉取消息。與其他消息中間件不同的是:Kaf...
    rookie_yuqi閱讀 719評(píng)論 0 0
  • 上一節(jié)講到了如何通過構(gòu)建ProducerRecord對(duì)象,選擇對(duì)應(yīng)的序列化方式發(fā)送數(shù)據(jù)到Kafka,這一節(jié)我們來講...
    二向箔與歌者閱讀 622評(píng)論 0 0
  • commit offset時(shí)可以附帶一個(gè)string類型的metadata用于添加一些有關(guān)信息 也可以附帶一個(gè)lo...
    意夢(mèng)春秋閱讀 1,009評(píng)論 0 1
  • 前言 讀完本文,你將了解到如下知識(shí)點(diǎn): kafka 的消費(fèi)者 和 消費(fèi)者組 如何正確使用kafka consume...
    zwb_jianshu閱讀 909評(píng)論 0 0

友情鏈接更多精彩內(nèi)容