Kafka詳解

Kafka 是一個(gè)java開(kāi)發(fā)的mq中間件,依賴于zookeper,有高可用,高吞吐量等特點(diǎn)。

優(yōu)勢(shì)

  • 可靠性:partition機(jī)制和replication機(jī)制,使消息的傳遞有著很高的可靠性
  • 穩(wěn)定性,支持集群
  • 高性能,高吞吐量,即使在TB的數(shù)據(jù)存儲(chǔ)情況下,仍然表現(xiàn)出很好的穩(wěn)定性
  • 支持消息廣播和單播,可以根據(jù)重設(shè)offset實(shí)現(xiàn)消息的重復(fù)消費(fèi)

角色

  • Broker:kafka集群由一個(gè)或多個(gè)kafka server組成,每個(gè)server即Broker。

  • Topic:邏輯概念。kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,一個(gè)Topic可以認(rèn)為是一類消息。

  • Partition:物理概念。每個(gè)topic將被分成一到多個(gè)partition(分區(qū)),每個(gè)partition在存儲(chǔ)層面就是一個(gè)append log文件。一個(gè)非常大的topic可以分成多個(gè)partition,分布到多個(gè)broker上。kafka只保證按一個(gè)partition中的順序?qū)⑾l(fā)給consumer,不保證一個(gè)topic的整體(多個(gè)partition間)的順序。

  • offset:任何發(fā)布到Partition的消息都會(huì)被直接追加到log文件的尾部,每條消息在文件中的位置稱為offset(偏移量),offset為一個(gè)long型數(shù)字,它是唯一標(biāo)記一條消息。kafka并沒(méi)有提供其他額外的索引機(jī)制來(lái)存儲(chǔ)offset,因此在kafka中幾乎不允許對(duì)消息進(jìn)行“隨機(jī)讀寫(xiě)”。

  • Producer:生成者。Producer將消息發(fā)布到指定的Topic,也可以指定Partition。

  • Consumer:消費(fèi)者。Consumer采用pull的形式從Producer拉取消息

  • Consumer Group:每個(gè) consumer 屬于一個(gè)特定的 consumer group(若不指定 group name 則屬于默認(rèn)的 group)。一個(gè) topic可以有多個(gè)CG,topic的消息會(huì)分發(fā)到所有的CG,但每個(gè)CG只會(huì)把消息發(fā)給該CG中的一個(gè) consumer。如果所有的consumer都具有相同的group, 即單播,消息將會(huì)在consumers之間負(fù)載均衡;如果所有的consumer都具有不同的group,那這就是"發(fā)布-訂閱",每條消息將會(huì)廣播給所有的consumer。

分區(qū)機(jī)制和文件存儲(chǔ)機(jī)制

如圖,kafka中的消息是以topic進(jìn)行分類的,生產(chǎn)者通過(guò)topic向kafka broker發(fā)送消息,消費(fèi)者通過(guò)topic讀取消息。然而topic在物理層面上又能夠以partition進(jìn)行分組,同一個(gè)topic下有多個(gè)不同的partition,每個(gè)partiton在物理上對(duì)應(yīng)一個(gè)目錄(文件夾),以topic名稱+有序序號(hào)的形式命名(序號(hào)從0開(kāi)始計(jì),最大為partition數(shù)-1)。partition是實(shí)際物理上的概念,而topic是邏輯上的概念。Patition 的設(shè)計(jì)使得Kafka的吞吐率可以水平擴(kuò)展。

每個(gè)分區(qū)文件夾下存儲(chǔ)這個(gè)分區(qū)的所有消息(.log)和索引文件(.index)。“.index”索引文件存儲(chǔ)大量的元數(shù)據(jù),“.log”數(shù)據(jù)文件存儲(chǔ)大量的消息,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。其中以“.index”索引文件中的元數(shù)據(jù)[3, 348]為例,在“.log”數(shù)據(jù)文件表示第3個(gè)消息,即在全局partition中表示170410+3=170413個(gè)消息,該消息的物理偏移地址為348。

image.png

那么如何從partition中通過(guò)offset查找message呢?以上圖為例,讀取offset=170418的消息,首先查找segment文件,其中 00000000000000000000.index為最開(kāi)始的文件,第二個(gè)文件為00000000000000170410.index(起始偏移為170410+1=170411),而第 三個(gè)文件為00000000000000239430.index(起始偏移為239430+1=239431),所以這個(gè)offset=170418就落到了第二個(gè)文件之中。其他 后續(xù)文件可以依次類推,以其實(shí)偏移量命名并排列這些文件,然后根據(jù)二分查找法就可以快速定位到具體文件位置。其次根據(jù) 00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進(jìn)行讀取。

Kafka中topic的每個(gè)partition有一個(gè)預(yù)寫(xiě)式的日志文件,雖然partition可以繼續(xù)細(xì)分為若干個(gè)segment文件,但是對(duì)于上層應(yīng)用來(lái)說(shuō)可以將 partition看成最小的存儲(chǔ)單元(一個(gè)有多個(gè)segment文件拼接的“巨型”文件),每個(gè)partition都由一些列有序的、不可變的消息組成,這些消息被連續(xù)的追加到partition中。

那如何保證消息均勻的分布到不同的partition中?

生產(chǎn)者在生產(chǎn)數(shù)據(jù)的時(shí)候,可以為每條消息指定Key,這樣消息被發(fā)送到broker時(shí),會(huì)根據(jù)分區(qū)規(guī)則選擇被存儲(chǔ)到哪一個(gè)分區(qū)中,如果分區(qū)規(guī)則設(shè)置的合理,那么所有的消息將會(huì)被均勻的分布到不同的分區(qū)中,這樣就實(shí)現(xiàn)了負(fù)載均衡和水平擴(kuò)展。分區(qū)規(guī)則可以自定義,比如將消息的key做了hashcode,然后和分區(qū)數(shù)(numPartitions)做模運(yùn)算,使得每一個(gè)key都可以分布到一個(gè)分區(qū)中。

高可用(High availability)

kafka的高可用就是依賴于上面的文件存儲(chǔ)結(jié)構(gòu)的,kafka能保證HA的策略有 data replication和leader election。

leader 機(jī)制

為了提高消息的可靠性,Kafka每個(gè)topic的partition有N個(gè)副本(replicas),其中N(大于等于1)是topic的復(fù)制因子(replica fator)的個(gè)數(shù)。這個(gè)時(shí)候每個(gè) partition下面就有可能有多個(gè) replica(replication機(jī)制,相當(dāng)于是partition的副本但是有可能存儲(chǔ)在其他的broker上),但是這多個(gè)replica并不一定分布在一個(gè)broker上,而這時(shí)候?yàn)榱烁玫脑趓eplica之間復(fù)制數(shù)據(jù),此時(shí)會(huì)選出一個(gè)leader,這個(gè)時(shí)候 producer會(huì)push消息到這個(gè)leader(leader機(jī)制),consumer也會(huì)從這個(gè)leader pull 消息,其他的 replica只是作為follower從leader復(fù)制數(shù)據(jù),leader負(fù)責(zé)所有的讀寫(xiě);如果沒(méi)有一個(gè)leader的話,所有的follower都去進(jìn)行讀寫(xiě) 那么NxN(N+1個(gè)replica之間復(fù)制消息)的互相同步數(shù)據(jù)就變得很復(fù)雜而且數(shù)據(jù)的一致性和有序性不能夠保證。

如何將所有Replica均勻分布到整個(gè)集群

為了實(shí)現(xiàn)更高的可用性,推薦在部署kafka的時(shí)候,能夠保證一個(gè)topic的partition數(shù)量大于broker的數(shù)量,而且還需要把follower均勻的分布在所有的broker上,而不是只分布在一個(gè) broker上。zookeeper 會(huì)對(duì)partition的leader follower等進(jìn)行管理。
Kafka分配Replica的算法如下:

將所有Broker(假設(shè)共n個(gè)Broker)和待分配的Partition排序
將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上
將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broke

leader election

當(dāng)Leader宕機(jī)了,怎樣在Follower中選舉出新的Leader?
一種非常常用的Leader Election的方式是“Majority Vote”(“少數(shù)服從多數(shù)”),但Kafka并未采用這種方式。
Kafka在Zookeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in-sync replicas),這個(gè)ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為L(zhǎng)eader的可能。

那么如何選取出leader:
最簡(jiǎn)單最直觀的方案是(誰(shuí)寫(xiě)進(jìn)去誰(shuí)就是leader),所有Follower都在Zookeeper上設(shè)置一個(gè)Watch,一旦Leader宕機(jī),其對(duì)應(yīng)的ephemeral znode會(huì)自動(dòng)刪除,此時(shí)所有Follower都嘗試創(chuàng)建該節(jié)點(diǎn),而創(chuàng)建成功者(Zookeeper保證只有一個(gè)能創(chuàng)建成功)即是新的Leader,其它Replica即為Follower。

Data Replication

消息commit

kafka在處理傳播消息的時(shí)候,Producer會(huì)發(fā)布消息到某個(gè)partition上,先通知找到這個(gè)partition的leader replica,無(wú)論這個(gè)partition的 Replica factor是多少,Producer 先把消息發(fā)送給replica的leader,然后Leader在接受到消息后會(huì)寫(xiě)入到Log,這時(shí)候這個(gè)leader的其余follower都會(huì)去leader pull數(shù)據(jù),這樣可保證follower的replica的數(shù)據(jù)順序和leader是一致的,follower在接受到消息之后寫(xiě)入到Log里面(同步),然后向leader發(fā)送ack確認(rèn),一旦Leader接收到了所有的ISR(與leader保持同步的Replica列表)中的follower的ack消息,這個(gè)消息就被認(rèn)為是 commit了,然后leader增加HW并且向producer發(fā)送ack消息,表示消息已經(jīng)發(fā)送完成。但是為了提高性能,每個(gè)follower在接受到消息之后就會(huì)直接返回給leader ack消息,而并非等數(shù)據(jù)寫(xiě)入到log里(異步),所以,可以認(rèn)為對(duì)于已經(jīng)commit的數(shù)據(jù),只可以保證消息已經(jīng)存在與所有的replica的內(nèi)存中,但是不保證已經(jīng)被持久化到磁盤(pán)中,所以進(jìn)而也就不能保證完全發(fā)生異常的時(shí)候,該消息能夠被consumer消費(fèi)掉,如果異常發(fā)生,leader 宕機(jī),而且內(nèi)存數(shù)據(jù)消失,此時(shí)重新選舉leader就會(huì)出現(xiàn)這樣的情況,但是由于考慮大這樣的情況實(shí)屬少見(jiàn),所以這種方式在性能和數(shù)據(jù)持久化上做了一個(gè)相對(duì)的平衡,consumer讀取消息也是從 leader,并且只有已經(jīng)commit之后的消息(offset小于HW)才會(huì)暴露給consumer。

消息確認(rèn)

kafka的存活條件包括兩個(gè)條件:

  1. kafka必須維持著與zookeeper的session(這個(gè)通過(guò)zookeeper的heartbeat機(jī)制來(lái)實(shí)現(xiàn))
  2. follower必須能夠及時(shí)的將數(shù)據(jù)從leader復(fù)制過(guò)去 ,不能夠“落后太多”。leader會(huì)跟蹤與其保持著同步的replica列表簡(jiǎn)稱ISR,(in-sync replica),如果一個(gè)follower宕機(jī)或是落后太多,leader就會(huì)把它從ISR中移除掉。這里指的落后太多是說(shuō) follower復(fù)制的消息落后的超過(guò)了預(yù)設(shè)值,(該值可在KAFKA_HOME/config/server.properties中通過(guò)replica.lag.max.messages配置,其默認(rèn)值是4000),或者follower超過(guò)一定時(shí)間(該值可在KAFKA_HOME/config/server.properties中通過(guò)replica.lag.time.max.ms來(lái)配置,其默認(rèn)值是10000)沒(méi)有向leader發(fā)起fetch請(qǐng)求。

一條消息只有被ISR里的所有Follower都從Leader復(fù)制過(guò)去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫(xiě)進(jìn)了Leader,還沒(méi)來(lái)得及被任何Follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(Consumer無(wú)法消費(fèi)這些數(shù)據(jù))。而對(duì)于Producer而言,它可以選擇是否等待消息commit,這可以通過(guò)request.required.acks來(lái)設(shè)置。

0---表示不進(jìn)行消息接收是否成功的確認(rèn);
1---表示當(dāng)Leader接收成功時(shí)確認(rèn);
-1---表示Leader和Follower都接收成功時(shí)確認(rèn);

持久性

kafka使用文件存儲(chǔ)消息,這就直接決定kafka在性能上嚴(yán)重依賴文件系統(tǒng)的本身特性。且無(wú)論任何 OS 下,對(duì)文件系統(tǒng)本身的優(yōu)化幾乎沒(méi)有可能。文件緩存/直接內(nèi)存映射等是常用的手段。 因?yàn)?kafka 是對(duì)日志文件進(jìn)行 append 操作,因此磁盤(pán)檢索的開(kāi)支是較小的;同時(shí)為了減少磁盤(pán)寫(xiě)入的次數(shù),broker會(huì)將消息暫時(shí)buffer起來(lái),當(dāng)消息的個(gè)數(shù)(或尺寸)達(dá)到一定閥值時(shí),再flush到磁盤(pán),這樣減少了磁盤(pán)IO調(diào)用的次數(shù)。

producer

指定partition

producer將會(huì)和Topic下所有partition leader保持socket連接;消息由producer直接通過(guò)socket發(fā)送到broker,中間不會(huì)經(jīng)過(guò)任何"路由層".事實(shí)上,消息被路由到哪個(gè)partition上,有producer決定.比如可以采用"random""key-hash""輪詢"等,如果一個(gè)topic中有多個(gè)partitions,那么在producer端實(shí)現(xiàn)"消息均衡分發(fā)"是必要的.

異步發(fā)送

producer.type的默認(rèn)值是sync,即同步的方式。這個(gè)參數(shù)指定了在后臺(tái)線程中消息的發(fā)送方式是同步的還是異步的。如果設(shè)置成異步的模式,可以運(yùn)行生產(chǎn)者以batch的形式push數(shù)據(jù),這樣會(huì)極大的提高broker的性能,但是這樣會(huì)增加丟失數(shù)據(jù)的風(fēng)險(xiǎn)。

對(duì)于異步模式,還有4個(gè)配套的參數(shù),如下:

  • queue.buffering.max.ms 5000 啟用異步模式時(shí),producer緩存消息的時(shí)間。比如我們?cè)O(shè)置成1000時(shí),它會(huì)緩存1s的數(shù)據(jù)再一次發(fā)送出去,這樣可以極大的增加broker吞吐量,但也會(huì)造成時(shí)效性的降低。
  • queue.buffering.max.messages 10000 啟用異步模式時(shí),producer緩存隊(duì)列里最大緩存的消息數(shù)量,如果超過(guò)這個(gè)值,producer就會(huì)阻塞或者丟掉消息。
  • queue.enqueue.timeout.ms -1 當(dāng)達(dá)到上面參數(shù)時(shí)producer會(huì)阻塞等待的時(shí)間。如果設(shè)置為0,buffer隊(duì)列滿時(shí)producer不會(huì)阻塞,消息直接被丟掉;若設(shè)置為-1,producer會(huì)被阻塞,不會(huì)丟消息。
  • batch.num.messages 200 啟用異步模式時(shí),一個(gè)batch緩存的消息數(shù)量。達(dá)到這個(gè)數(shù)值時(shí),producer才會(huì)發(fā)送消息。(每次批量發(fā)送的數(shù)量)

以batch的方式推送數(shù)據(jù)可以極大的提高處理效率,kafka producer可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè)batch發(fā)送請(qǐng)求。batch的數(shù)量大小可以通過(guò)producer的參數(shù)(batch.num.messages)控制。通過(guò)增加batch的大小,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤(pán)IO的次數(shù),當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡。在比較新的版本中還有batch.size這個(gè)參數(shù)。

consumer

  • consumer 采用pull的方式 從broker拉取數(shù)據(jù)。采用pull方式的優(yōu)點(diǎn)有consumer端可以根據(jù)自己的消費(fèi)能力適時(shí)的去fetch消息并處理,且可以控制消息消費(fèi)的進(jìn)度(offset);此外,消費(fèi)者可以良好的控制消息消費(fèi)的數(shù)量,batch fetch.

  • consumer端向broker發(fā)送fetch請(qǐng)求,并告知其獲取消息的offset;此后consumer將會(huì)獲得一定條數(shù)的消息;consumer端也可以重置offset來(lái)重新消費(fèi)消息.

  • kafka和JMS(Java Message Service)實(shí)現(xiàn)(activeMQ)不同的是:即使消息被消費(fèi),消息仍然不會(huì)被立即刪除.日志文件將會(huì)根據(jù)broker中的配置要求,保留一定的時(shí)間之后刪除;比如log文件保留2天,那么兩天后,文件會(huì)被清除,無(wú)論其中的消息是否被消費(fèi).kafka通過(guò)這種簡(jiǎn)單的手段,來(lái)釋放磁盤(pán)空間,以及減少消息消費(fèi)之后對(duì)文件內(nèi)容改動(dòng)的磁盤(pán)IO開(kāi)支。
    對(duì)于consumer而言,它需要保存消費(fèi)消息的offset,對(duì)于offset的保存和使用,有consumer來(lái)控制;當(dāng)consumer正常消費(fèi)消息時(shí),offset將會(huì)"線性"的向前驅(qū)動(dòng),即消息將依次順序被消費(fèi).事實(shí)上consumer可以使用任意順序消費(fèi)消息,它只需要將offset重置為任意值,offset將會(huì)保存在zookeeper中。
    kafka集群幾乎不需要維護(hù)任何consumer和producer狀態(tài)信息,這些信息有zookeeper保存;因此producer和consumer的客戶端實(shí)現(xiàn)非常輕量級(jí),它們可以隨意離開(kāi),而不會(huì)對(duì)集群造成額外的影響。

  • at most once: 消費(fèi)者fetch消息,然后保存offset,然后處理消息;當(dāng)client保存offset之后,但是在消息處理過(guò)程中出現(xiàn)了異常,導(dǎo)致部分消息未能繼續(xù)處理.那么此后"未處理"的消息將不能被fetch到,這就是"at most once".

  • at least once: 消費(fèi)者fetch消息,然后處理消息,然后保存offset.如果消息處理成功之后,但是在保存offset階段zookeeper異常導(dǎo)致保存操作未能執(zhí)行成功,這就導(dǎo)致接下來(lái)再次fetch時(shí)可能獲得上次已經(jīng)處理過(guò)的消息,這就是"at least once",原因offset沒(méi)有及時(shí)的提交給zookeeper,zookeeper恢復(fù)正常還是之前offset狀態(tài).

消息的順序性

Kafka分布式的單位是partition,同一個(gè)partition用一個(gè)log文件(追加寫(xiě)、offset讀),所以可以保證FIFO的順序。但是在多個(gè)Partition時(shí),不能保證Topic級(jí)別的數(shù)據(jù)有序性,除非創(chuàng)建Topic只指定1個(gè)partition,但這樣做就磨滅kafka高吞吐量的優(yōu)秀特性。

kafka為了提高Topic的并發(fā)吞吐能力,可以提高Topic的partition數(shù),并通過(guò)設(shè)置partition的replica來(lái)保證數(shù)據(jù)高可靠。

Kafka 中發(fā)送1條消息的時(shí)候,可以指定(topic, partition, key) 3個(gè)參數(shù),業(yè)務(wù)放使用producer插入數(shù)據(jù)時(shí),可以控制同一Key發(fā)到同一Partition,從而保證消息有序性。一個(gè)partition的消息只能被一個(gè)consumer消費(fèi)。

安裝

詳情參見(jiàn)官網(wǎng)http://kafka.apache.org/
安裝會(huì)依賴java、zookeeper。

brew install kafka

//安裝的配置文件位置
/usr/local/etc/kafka/server.properties
/usr/local/etc/kafka/zookeeper.properties

//啟動(dòng)zookeeper -daemon 守護(hù)模式
zookeeper-server-start  /usr/local/etc/kafka/zookeeper.properties &

//啟動(dòng)kafka
kafka-server-start /usr/local/etc/kafka/server.properties &

//創(chuàng)建topic  創(chuàng)建單分區(qū)單副本的 topic test:
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

//查看創(chuàng)建的topic
kafka-topics --list --zookeeper localhost:2181

//發(fā)送消息客戶端
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 

//消費(fèi)消息
kafka-console-consumer --bootstrap-server localhost:9092 --topic test --from-beginning

參考文章:https://blog.csdn.net/gongzhiyao3739124/article/details/79688813

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容