kafka kafka生產(chǎn)者

起因:在實(shí)際項(xiàng)目開(kāi)發(fā)過(guò)程中,需要使用RabbitMQ來(lái)實(shí)現(xiàn)消息隊(duì)列的功能,在運(yùn)用過(guò)之后,也去學(xué)一學(xué)kafka,了解一下他們之間的差別,吃一吃架構(gòu)方面的相關(guān)內(nèi)容,提升自己。


1. kafka生產(chǎn)者分析

1.1. 生產(chǎn)者分區(qū)的原則

為什么要分區(qū)

  • 提升了水平擴(kuò)展能力

  • 提供并發(fā)能力

分區(qū)的原則

  • 指明partition的情況下,直降將指明的值作為partition的值(指定存放分區(qū))

  • 沒(méi)有指明partition,但有key的情況下,會(huì)將key的hash值與topic的partition數(shù)量進(jìn)行取余得到partition值

  • 什么都沒(méi)有指定,在第一發(fā)消息的時(shí)候,系統(tǒng)會(huì)隨機(jī)生成一個(gè)整數(shù)來(lái)對(duì)topic的partition數(shù)量進(jìn)行取余得到partition值,后面每次都對(duì)這個(gè)已經(jīng)生成的隨機(jī)數(shù)進(jìn)行+1,這就得到了round-robin算法了

1.2. Kafka副本的復(fù)制方案

1.2.1. 副本的復(fù)制方式分析

Kafka內(nèi)部發(fā)送響應(yīng)的機(jī)制:為了保證producer的數(shù)據(jù)能夠可靠的發(fā)送并保存到topic上,topic的每個(gè)partition收到發(fā)送的數(shù)據(jù)后,都需要向生產(chǎn)者發(fā)送ACK,如果生產(chǎn)者收到ACK,就會(huì)進(jìn)行下一輪發(fā)送,如果沒(méi)有收到就會(huì)重新發(fā)送

副本的復(fù)制是如何復(fù)制的?

Producer--->leader(follower1,follower2)

這個(gè)情況下應(yīng)該如何向Producer發(fā)送ACK

方案一:確保半數(shù)以上的follower完成同步,就發(fā)送ACK,優(yōu)點(diǎn)是延遲低,在選舉新的leader的時(shí)候,如果容忍n臺(tái)節(jié)點(diǎn)故障,就需要2n+1個(gè)副本

方案二:完成全部follower的同步,才發(fā)送ACK,缺點(diǎn)是延遲高,在選舉新的leader的時(shí)候,如果容忍n臺(tái)節(jié)點(diǎn)故障,只就需要n+1個(gè)副本

kafka使用方案二作為follower的同步方式

  • 如果選擇方案一:雖然網(wǎng)絡(luò)延遲低,但數(shù)據(jù)一致性無(wú)法保障,而且需要2n+1個(gè)副本,副本過(guò)多就會(huì)導(dǎo)致數(shù)據(jù)冗余過(guò)大,造成很大浪費(fèi)

  • 雖然方案二延遲高,但對(duì)于kafka來(lái)說(shuō)影響不大

1.2.2. 通過(guò)ISR優(yōu)化副本同步

先看一下topic的詳細(xì)信息

Topic: topicfirst       PartitionCount: 5       ReplicationFactor: 1    Configs: 
Topic: topicfirst       Partition: 0    Leader: 11      Replicas: 11    Isr: 11
Topic: topicfirst       Partition: 1    Leader: 11      Replicas: 11    Isr: 11
Topic: topicfirst       Partition: 2    Leader: 11      Replicas: 11    Isr: 11
Topic: topicfirst       Partition: 3    Leader: 11      Replicas: 11    Isr: 11
Topic: topicfirst       Partition: 4    Leader: 11      Replicas: 11    Isr: 11

在kafka采用第二種方案進(jìn)行副本復(fù)制后進(jìn)行ACK響應(yīng),會(huì)等待所有follower完成同步,這個(gè)時(shí)候如果有一個(gè)follower因?yàn)槟撤N原因無(wú)法訪問(wèn),這個(gè)時(shí)候leader就要一直等著這個(gè)follower來(lái)完成同步才能發(fā)ACK給producer

Kafka的解決方案:Leader維護(hù)了一個(gè)動(dòng)態(tài)的in-sync replica set(ISR)

  • 副本同步機(jī)制

    • 作用是和leader保持同步的follower集合

    • 當(dāng)ISR中的follower完成數(shù)據(jù)同步之后,leader就會(huì)給follower發(fā)送ack(數(shù)據(jù)是由folloer主動(dòng)從leader那里拉取過(guò)來(lái)的)

  • ISR是一個(gè)動(dòng)態(tài)同步集合:從ISR中移除follower的條件是當(dāng)follower長(zhǎng)時(shí)間未向leader拉取數(shù)據(jù),該follower就會(huì)被剔除出ISR,時(shí)間閥值由:replica.lag.time.max.ms=10000 決定 單位ms

    replica.lag.max.messages 這個(gè)是leader和follower的消息數(shù)的差,超過(guò)就剔除出ISR,這個(gè)參數(shù)在0.9版本已經(jīng)移除

  • 當(dāng)leader發(fā)生故障了,就會(huì)從ISR中選舉新leader

2. Kafka生產(chǎn)者的ACK機(jī)制(可靠性)

ACK(在rabbitmq里面,我們producer和broker的一個(gè)反饋是什么?callback,return)

對(duì)于kafka不太重要的數(shù)據(jù)是不是就不需要可靠性很高了

副本機(jī)制 主分片--副本分片

producer發(fā)送給broker-->partition(leader)-->replication(2)

這個(gè)時(shí)候,我們思考一個(gè)生產(chǎn)者的ACK機(jī)制,p roducer通過(guò)一個(gè)配置項(xiàng)目ACKS

  • acks = 0 : producer只要給到broker就返回ack,當(dāng)broker接收到數(shù)據(jù)后,如果broker故障導(dǎo)致數(shù)據(jù)丟失

  • acks =1 : partition的leader落盤(pán)成功后才返回ACK,不關(guān)心follower,當(dāng)我們的partition的leader掛掉后數(shù)據(jù)就無(wú)法同步到follower(leader掛了,要選舉生成新的leader)

  • acks = -1 : 所有ISR中的分區(qū)都同步成功才會(huì)返回ACK給producer

kafka的producer在沒(méi)有接收到ACK后會(huì)重試,這個(gè)重試是有次數(shù)的,這個(gè)次數(shù)是你配置的

3. Kafka分布式保存數(shù)據(jù)一致性問(wèn)題

producer有一個(gè)重試機(jī)制,如果數(shù)據(jù)沒(méi)有接收到ACK的情況下,重新再次發(fā)送

場(chǎng)景分析:如果有一個(gè)leader,兩個(gè)follower,當(dāng)leader宕機(jī)了

[圖片上傳失敗...(image-655708-1602662879949)]

LEO(Log End Offset):每個(gè)副本最后一個(gè)offset

HW(High Watermark):所有副本中最小的那個(gè)LEO(7)

數(shù)據(jù)一致性的執(zhí)行細(xì)節(jié):

1、follower故障

follower發(fā)生故障就會(huì)被剔除出ISR,待follower恢復(fù)后,follower會(huì)讀取本次磁盤(pán)上上次記錄的HW(7),將log文件中高于HW部分截取掉,從HW開(kāi)始向leader進(jìn)行同步,待follower的LEO大于等于Partition副本的HW,當(dāng)follower追上leader以后,就可以重入ISR

2、leader故障

leader故障之后,會(huì)從ISR中選一個(gè)follower成為leader,為保證多個(gè)副本間的數(shù)據(jù)一致,將所有的副本各自的高于HW的數(shù)據(jù)部分截取掉,從新的leader同步數(shù)據(jù)

注意:這個(gè)只能保證數(shù)據(jù)一致性,不能保證數(shù)據(jù)不丟失或者不重復(fù)

4. Kafka的Exactly Once實(shí)現(xiàn)

將producer的ack設(shè)置為-1,保證數(shù)據(jù)producer到partitons的數(shù)據(jù)不丟失,就是At Least Once,如果將ack設(shè)置為0,可以保證每條消息只會(huì)發(fā)送一次,即At Most Once

At Least Once可以保證數(shù)據(jù)不丟失,但不能保證數(shù)據(jù)不重復(fù),At Most Once可以保證數(shù)據(jù)不重復(fù),但不能保證數(shù)據(jù)不丟失

Exactly Once = At Least Once + 冪等性

At Least Once 可以通過(guò)Producer的ACKS設(shè)置為-1來(lái)解決,在kafka的v0.11(含)之后引入了一個(gè)新特性:producer端的冪等性,無(wú)論P(yáng)roducer發(fā)給broker多少次,只要數(shù)據(jù)重復(fù),broker只會(huì)持久化一條給到topic

在Producer端通過(guò)參數(shù) enable.idempotence 設(shè)置為true即可,相當(dāng)于開(kāi)起了producer端的冪等性:Producer在初始化的時(shí)候會(huì)被分配一個(gè)PID,發(fā)往同一個(gè)Partition的消息會(huì)附帶Sequence Number。broker端會(huì)對(duì)

<PID,Partition,Sequence Number>做主鍵緩存,當(dāng)有相同主鍵信息只會(huì)持久化一條了

但是:系統(tǒng)只要重啟就會(huì)更新PID,在不同的Partition上會(huì)有不同的主鍵,所以Producer的冪等無(wú)法保證跨分區(qū)跨會(huì)話的Exactly Once

5. Kafka生產(chǎn)者的事務(wù)機(jī)制

kafka的數(shù)據(jù)可以有很多的partition

場(chǎng)景:當(dāng)producer個(gè)p0,p1,p2寫(xiě)入數(shù)據(jù),0-10,1-15,正要給2分區(qū)寫(xiě)數(shù)據(jù)broker掛了,如果acks=1,有主分區(qū)沒(méi)有寫(xiě)入完成,producer會(huì)重試發(fā)送

在kafka的v0.11版本引入了transactionID,將transactionID和PID綁定并保存事務(wù)狀態(tài)到一個(gè)內(nèi)部的topic中,當(dāng)服務(wù)重啟后該事務(wù)狀態(tài)還能獲取

6. Kafka發(fā)送消息的流程

kafka的producer發(fā)送消息采用的是異步發(fā)送模式,一個(gè)main一個(gè)sender還有一個(gè)線程共享變量(RecordAccumulator)

[圖片上傳失敗...(image-4ad6e3-1602662879948)]

batch.size : 數(shù)據(jù)積累到多大以后,sender才會(huì)發(fā)送

linger.ms : 如果一直沒(méi)有達(dá)到batch.size,sender會(huì)等待linger.ms時(shí)間后就發(fā)送


不要以為每天把功能完成了就行了,這種思想是要不得的,互勉~!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容