1 消費(fèi)者概念
1.1 消費(fèi)者與消費(fèi)者組
應(yīng)用程序--->kafka--->應(yīng)用程序
生產(chǎn)者 ? ? 主題 ? ?? 消費(fèi)者
1. 上游應(yīng)用程序?qū)?shù)據(jù)發(fā)送到主題中再由下游應(yīng)用程序讀取、驗(yàn)證數(shù)據(jù)。
2. 出現(xiàn)的可能性情況:生產(chǎn)者生產(chǎn)數(shù)據(jù)的速度超過消費(fèi)者驗(yàn)證數(shù)據(jù)的速度
這個(gè)時(shí)候就可以使用消費(fèi)者組,由消費(fèi)者組訂閱主題,消費(fèi)者組中的每個(gè)消費(fèi)者分別消費(fèi)部分分區(qū)的數(shù)據(jù),實(shí)現(xiàn)消費(fèi)端的橫向擴(kuò)展
3. 需要注意的地方:消費(fèi)者組中的消費(fèi)者數(shù)量應(yīng)不超過分區(qū)數(shù),避免造成資源浪費(fèi)
1.2 消費(fèi)者組和分區(qū)再均衡
1. 什么是再均衡?
? ? 分區(qū)的所有權(quán)限由一個(gè)消費(fèi)者轉(zhuǎn)向另一個(gè)消費(fèi)者,這種行為就是再均衡。
2. 優(yōu)勢(shì)?
? ? 實(shí)現(xiàn)了消費(fèi)者組的高可用性和伸縮性
3. 不足?
? ? 再均衡期間,消費(fèi)者無法讀取消息,造成整個(gè)群組一小段時(shí)間處于不可用的狀態(tài)
? ? 當(dāng)分區(qū)被重新分給另一個(gè)消費(fèi)者時(shí),消費(fèi)者當(dāng)前的讀取狀態(tài)會(huì)丟失,有可能還需要刷新緩存,從而拖慢應(yīng)用程序
4. 消費(fèi)者與消費(fèi)者組的通信
? ? 消費(fèi)者通過向被指派為`群組協(xié)調(diào)器`的broker發(fā)送心跳來維持他們和群組的從屬關(guān)系以及他們對(duì)分區(qū)的所有權(quán)關(guān)系。只要消費(fèi)者以正常時(shí)間間隔發(fā)送心跳,就認(rèn)為活躍,否則就會(huì)進(jìn)行再均衡。
5. 如果一個(gè)消費(fèi)者發(fā)生崩潰,并停止讀取消息,群組協(xié)調(diào)器會(huì)等待幾秒鐘,確認(rèn)他死亡了才會(huì)觸發(fā)再均衡。在此期間,死掉的消費(fèi)者不會(huì)讀取分區(qū)中的消息。在清理消費(fèi)者是,消費(fèi)者會(huì)通知協(xié)調(diào)器,此時(shí)會(huì)立即觸發(fā)一次再均衡。
6. 如何分配分區(qū)?
? ? 當(dāng)有消費(fèi)者加入群組時(shí),他會(huì)向群組協(xié)調(diào)器發(fā)送一個(gè)JoinGroup請(qǐng)求。第一個(gè)加入群組的消費(fèi)者成為“群主”。群主從協(xié)調(diào)器那里獲得群組的成員列表,并負(fù)責(zé)給每個(gè)消費(fèi)者分配分區(qū)。
2 創(chuàng)建消費(fèi)者
2.1 創(chuàng)建所需的必選屬性
屬性描述
bootstrap.serversbroker的地址清單,地址格式:host:port
key.serializer鍵的序列化器
value.serializer值的序列化器
2.2 創(chuàng)建代碼示例
producer.properties
bootstrap.servers=linux121:9092,linux122:9092,linux123:9092
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka_customer
Propertiesprops=newProperties();
props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);
2.3 訂閱主題
consumer.subscribe(Collections.singletonList("linux-test-01"));
2.4 輪詢
while(true) {
ConsumerRecords<String,String>records=consumer.poll(100);
for(ConsumerRecord<String,String>record:records)
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());
}
2.5 配置信息
配置描述
fetch.min.bytes指定消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker在收到消費(fèi)者的數(shù)據(jù)請(qǐng)求時(shí),如果可用的數(shù)據(jù)量小于該配置指定的數(shù)據(jù)量,會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才會(huì)把他返回給消費(fèi)者。好處:可以降低消費(fèi)者和broker的負(fù)載。
fetch.max.wait.ms指定broker的等待時(shí)間。默認(rèn)500ms如果沒有足夠的數(shù)據(jù)流入kafka,消費(fèi)者獲取最小數(shù)據(jù)量又無法得到滿足,最導(dǎo)致500ms的延遲。
max.partition.fetch.bytes指定服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)。默認(rèn)為1M,即poll方法從每個(gè)分區(qū)里返回的記錄最多不超過該屬性的指定值。在位消費(fèi)者分配內(nèi)存時(shí),可以給他多分配一些,因?yàn)槿绻航M中的消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū),而該值又必須大于brocker可以接受的最大消息的字節(jié)數(shù),否則會(huì)使消費(fèi)者一直處于掛起狀態(tài)。另外,還需要考慮消費(fèi)者處理數(shù)據(jù)的時(shí)間。消費(fèi)者需要頻繁調(diào)用poll方法來避免會(huì)話過期和發(fā)生分區(qū)在均衡。如果單次調(diào)用poll返回的數(shù)據(jù)太多,消費(fèi)者需要用更多的時(shí)間來處理數(shù)據(jù),可能無法及時(shí)進(jìn)行下一次輪詢來避免會(huì)話過期。
session.timeout.ms指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間。需要同時(shí)修改該屬性和heartbeat.interval.ms,heartbeat.interval.ms必須小于該屬性,一般是該屬性的1/3。該屬性的值越小,可以越快的檢測(cè)和恢復(fù)崩潰的節(jié)點(diǎn),不過長(zhǎng)時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的再均衡。把該屬性的值設(shè)置的大一些,可以減少意外的再均衡,不過檢測(cè)節(jié)點(diǎn)崩潰需要更長(zhǎng)的時(shí)間。
auto.offset.reset指定消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效(因消費(fèi)者長(zhǎng)時(shí)間失效,報(bào)刊偏移量的記錄已經(jīng)過時(shí)并被刪除)的情況下該作何處理,值{latest(默認(rèn),從最近的消息開始讀?。?,earliest(從開始讀?。﹠
enable.auto.commit指定了消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)為true,可以通過auto.commit.interval.ms來控制提交頻率
partition.assignment.strategy設(shè)置選擇分區(qū)策略。Range:把主題的若干個(gè)連續(xù)的分區(qū)分配給消費(fèi)者。RoundRobin:把主題中所有分區(qū)逐個(gè)分配給消費(fèi)者。
max.poll.records用于控制單詞調(diào)用call()方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢里需要處理的數(shù)據(jù)量。
receive.buffer.bytes和send.buffer.bytes設(shè)置socket在讀寫數(shù)據(jù)時(shí)用到的TCP緩沖區(qū)的大小。如果為-1,則使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或者消費(fèi)者與broker處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和較低的寬帶。
2.6 提交和偏移量
2.6.1 自動(dòng)提交
poll方法中返回的是生產(chǎn)者寫進(jìn)去但是還沒有被消費(fèi)者所消費(fèi)的這部分?jǐn)?shù)據(jù)
消費(fèi)者發(fā)消息(包含分區(qū)的偏移量)給_consumer_offset這個(gè)主題
自動(dòng)提交設(shè)置:enable.auto.commit-->true
自動(dòng)提交產(chǎn)生的問題:當(dāng)當(dāng)前處理的數(shù)據(jù)偏移量大于提交的數(shù)據(jù)偏移量的話會(huì)造成數(shù)據(jù)重復(fù)
解決辦法:縮短提交偏移量的時(shí)間差,auto.commit.interval.ms
2.6.2 提交當(dāng)前偏移量
publicstaticvoidmain(String[]args)throwsIOException{
Propertiesprops=newProperties();
props.load(Demo1_Kafka_Consumer.class.getClassLoader().getResourceAsStream("consumer.properties"));
KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("linux-test-01"));
while(true) {
ConsumerRecords<String,String>records=consumer.poll(100);
for(ConsumerRecord<String,String>record:records) {
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());
try{
consumer.commitSync();
}catch(Exceptione) {
e.printStackTrace();
? ? ? ? ?? }
? ? ?? }
?? }
}
不足:在broker對(duì)提交請(qǐng)求做出回應(yīng)之前,應(yīng)用程序會(huì)一直阻塞,這樣會(huì)限制應(yīng)用程序的吞吐量。
2.6.3 異步提交
while(true) {
ConsumerRecords<String,String>records=consumer.poll(100);
for(ConsumerRecord<String,String>record:records) {
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());
?? }
// 提交最后一個(gè)偏移量,然后繼續(xù)做其他的事
consumer.commitAsync();
}
在成功提交后碰到無法恢復(fù)的錯(cuò)誤之前,commitSync()會(huì)一直重試,但是commitAsync不會(huì),這個(gè)也是他的不足之處。
不重試的原因:他收到服務(wù)器相應(yīng)的時(shí)候可能有一個(gè)更大的偏移量已經(jīng)提交成功了。
commitAsync支持回調(diào),在broker做出響應(yīng)時(shí)執(zhí)行回調(diào),經(jīng)常被用于記錄提交錯(cuò)誤或生成度量指標(biāo)。
while(true) {
ConsumerRecords<String,String>records=consumer.poll(100);
for(ConsumerRecord<String,String>record:records) {
System.out.printf("offset = %d, key = %s, value = %s ,partition = %d%n",record.offset(),record.key(),record.value(),record.partition());
?? }
// consumer.commitAsync();
consumer.commitAsync((offsets,exception)->{
if(exception!=null) {
System.out.println("Commit failed for offsets {}"+offsets+exception);
? ? ?? }
?? });
}