Kafka

kafka原理簡介

Kafka是由LinkedIn開發(fā)的一個(gè)分布式的消息系統(tǒng),使用Scala編寫,它以可水平擴(kuò)展和高吞吐率而被廣泛使用。
目前越來越多的開源分布式處理系統(tǒng)如Cloudera、Apache Storm、Spark 都支持與Kafka集成。

kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者就是Producer,消息接受者就是Consumer,將中間的存儲(chǔ)陣列稱作broker(代理),也稱作一個(gè)kafka實(shí)例。
然后三者都通過ZooKeeper進(jìn)行協(xié)調(diào)。

也即:

  1. 啟動(dòng)zookeeper的server
  2. 啟動(dòng)kafka的server
  3. Producer生產(chǎn)數(shù)據(jù),然后通過zookeeper找到broker,再將數(shù)據(jù)push到broker進(jìn)行保存
  4. Consumer通過zookeeper找到broker,然后再主動(dòng)pull數(shù)據(jù)

kafka存儲(chǔ)是基于硬盤存儲(chǔ)的,然而卻有著快速的讀寫效率,一個(gè) 67200rpm STAT RAID5 的陣列,線性讀寫速度是 300MB/sec,如果是隨機(jī)讀寫,速度則是 50K/sec。
雖然內(nèi)存讀取速度明顯快于硬盤讀寫速度,但是kafka卻通過線性讀寫的方式實(shí)現(xiàn)快速地讀寫。

kafka結(jié)構(gòu)圖

kafka各部分介紹

Producer | Topic | Partition

學(xué)習(xí)kafka一定要理解好Topic。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個(gè)或多個(gè)Partition,每個(gè)Partition在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè)Partition的所有消息和索引文件。
每個(gè)日志文件都是log entrie序列,每個(gè)log entrie代表一條消息,每條消息都有一個(gè)當(dāng)前Partition下唯一的64字節(jié)offset,它指明了這條消息的起始位置。
Producer發(fā)送消息到broker時(shí),會(huì)根據(jù)paritition機(jī)制選擇將其存儲(chǔ)到哪一個(gè)Partition。如果一個(gè)Topic對(duì)應(yīng)一個(gè)文件,那這個(gè)文件所在機(jī)器的IO,將會(huì)成為這個(gè)Topic的性能瓶頸,有了Partition后,不同消息可以并行寫入不同broker的不同partition里,極大提高了吞吐率。


解析topic

每條消息在partition中的位置稱為offset(偏移量),類型為long型數(shù)字。消息即使被消費(fèi)了,也不會(huì)被立即刪除,而是根據(jù)broker里的設(shè)置,保存一定時(shí)間后再清除,比如log文件設(shè)置存儲(chǔ)兩天,則兩天后,不管消息是否被消費(fèi),都清除。
Kafka保證一個(gè)Partition內(nèi)的消息的有序性。

Broker

broker也即中間的存儲(chǔ)隊(duì)列。我們將消息的發(fā)布(publish)暫時(shí)稱作 producer,將消息的訂閱(subscribe)表述為consumer,將中間的存儲(chǔ)陣列稱作 broker(代理)。

Consumer

每個(gè)consumer屬于一個(gè)consumer group。在kafka中,一個(gè)partition的消息只會(huì)被group中的一個(gè)consumer消費(fèi);可以認(rèn)為一個(gè)group就是一個(gè)“訂閱者”。一個(gè)Topic中的每個(gè)partition只會(huì)被一個(gè)“訂閱者”中的一個(gè)consumer消費(fèi)。
發(fā)送到Topic的消息,只會(huì)被訂閱此Topic的每個(gè)group中的一個(gè)consumer消費(fèi)。Kafka 消費(fèi)端采用pull模式從broker拉消息

pull模式的優(yōu)劣勢

pull模式的優(yōu)勢:消費(fèi)端自主控制消息的量,避免網(wǎng)絡(luò)擁塞,因?yàn)橄⒌膐ffset控制在消息端,還能簡化broker的設(shè)計(jì)。服務(wù)端無狀態(tài),設(shè)計(jì)簡單,不容易出錯(cuò)。
pull模式的缺點(diǎn):不能以最快速度傳遞消息。

Zookeeper

kafka集群幾乎不需要維護(hù)任何Consumer和Producer的信息。這些信息由Zookeeper保存。

保證送達(dá)(delivery guarantee)

At most once 消息可能會(huì)丟,但絕不會(huì)重復(fù)傳輸
At least once 消息絕不會(huì)丟,但可能會(huì)重復(fù)傳輸
Exactly once 每條消息肯定會(huì)被傳輸一次且僅傳輸一次

數(shù)據(jù)處理與commit的順序,在很大程度上決定了消息從broker到consumer的delivery guarantee semantic。

at most once

如果讀到消息就提交,則是at most once(至多一次),因?yàn)榧词固幚硎。驗(yàn)橄⒁烟峤?,offset已指向下一個(gè),處理失敗的消息也不會(huì)再處理了。

at least once

如果處理完成功后再提交,則是at least once(至少一次),消息必須處理成功。如果消息處理完,但commit時(shí)出錯(cuò),這會(huì)導(dǎo)致重復(fù)消費(fèi)消息,因此要求消息處理者要保證冪等。

Exactly once

業(yè)務(wù)需要做事務(wù),保證 Exactly Once 語義
這里業(yè)務(wù)場景被區(qū)分為兩個(gè):

  1. 冪等操作
  2. 業(yè)務(wù)代碼需要自身添加事務(wù)操作

冪等操作

所謂冪等操作就是重復(fù)執(zhí)行不會(huì)產(chǎn)生問題,如果是這種場景下,你不需要額外做任何工作。但如果你的應(yīng)用場景是不允許數(shù)據(jù)被重復(fù)執(zhí)行的,那只能通過業(yè)務(wù)自身的邏輯代碼來解決了。

事務(wù)保證

Kafka能支持分布式事務(wù),保證微服務(wù)事務(wù)的完整性,關(guān)鍵是將偏移量和你要保存的狀態(tài)通過事務(wù)保存到數(shù)據(jù)庫,失敗恢復(fù)時(shí)從這個(gè)偏移量開始從卡夫卡中重新讀取,保證了消息和你的業(yè)務(wù)狀態(tài)數(shù)據(jù)的一致性

有兩種常用的方法在Kafka之上來獲得恰好一次的語義:
1.將偏移量存儲(chǔ)在與派生狀態(tài)相同的DB中,并在事務(wù)中更新兩者。重新啟動(dòng)時(shí),從DB讀取當(dāng)前偏移量,然后從偏移位置開始讀取卡夫卡。
2.以冪等的方式將狀態(tài)更新和偏移量一起寫入。例如,如果您的派生狀態(tài)是一個(gè)key和一個(gè)跟蹤出現(xiàn)次數(shù)的計(jì)數(shù)器,則將偏移量與計(jì)數(shù)值一起存儲(chǔ),并忽略任何偏移量<=當(dāng)前存儲(chǔ)值的任何更新。

Kafka 高可用

為了保證高可用,主要引入了Replication和Leader Election。
一個(gè)Kafka集群包括多個(gè)broker,這些broker在高可用環(huán)境下,可以對(duì)不同topic互為leader和follow。

Leader Election算法

常用的選擇leader方法是少數(shù)服從多數(shù)(Majority Vote),它的優(yōu)勢是為了保證Leader Election能夠進(jìn)行,所容忍的fail的follower個(gè)數(shù)較少,否則就需要更多的Replica,因?yàn)樾枰蟛糠謋ollower都同步有數(shù)據(jù)后,才能提交,這樣的同步會(huì)加大latency(延遲),不適用于需要存儲(chǔ)大量數(shù)據(jù)的系統(tǒng)使用。

Kafka使用了更像微軟PacificA算法。

Kafka在ZooKeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR,ISR里所有Replica都跟上了leader,只有ISR里的成員才能被選為Leader,在這種模式下,對(duì)于f+1個(gè)Replica,一個(gè)Partition能在保證不丟失已經(jīng)commit的消息的前提下容忍f個(gè)Replica的失敗。

Majority Vote相比ISR有不需等待最慢Broker這一優(yōu)勢,因?yàn)镮SR需要等待所有機(jī)器都與Leader完成同步,而Majority Vote只需要最快的f+1個(gè)機(jī)器完成同步即可,但Majority Vote需要更多的機(jī)器(2f+1)才能容忍f個(gè)機(jī)器失敗。

如果所有ISR都不work了,需要在可用性和一致性當(dāng)中做出一個(gè)選擇。
如果一定要等待ISR中的Replica“活”過來,那不可用時(shí)間會(huì)較長。也可選擇第一個(gè)“活”過來的Replica作為Leader,由于這個(gè)Replica可能不是ISR中的Replica,因此它不保證已包含了所有已commit的消息,因此一致性會(huì)受影響。

如果所有replica都在zookeeper上注冊(cè)觀察者,會(huì)產(chǎn)生三個(gè)問題
Split-brain (腦裂),雖然ZooKeeper能保證所有Watch順序觸發(fā),但不能保證同一時(shí)刻所有Replica看到的狀態(tài)是一樣的,會(huì)造成Replica的響應(yīng)順序不一致。
Herd effect(羊群效應(yīng)) 如果宕機(jī)的那個(gè)Broker上的Partition比較多,會(huì)造成多個(gè)Watch被觸發(fā),造成集群內(nèi)大量的調(diào)整。
ZooKeeper負(fù)載過重,每個(gè)Replica都來注冊(cè)一上Watch,集群規(guī)模增加到幾千個(gè)Partition時(shí),ZooKeeper負(fù)載會(huì)過重
Kafka為了避免這樣,在所有broker中選出一個(gè)controller,所有Partition的Leader選舉都由controller決定,controller會(huì)將Leader的改變直接通過RPC通知需要為此作出響應(yīng)的Broker,同時(shí)controller也負(fù)責(zé)增刪Topic以及Replica的重新分配。

通信

Kafka的網(wǎng)絡(luò)通信基于Java NIO開發(fā),采用Reactor模式
1個(gè)Acceptor負(fù)責(zé)接受客戶請(qǐng)求,N個(gè)Processor負(fù)責(zé)讀寫數(shù)據(jù),M個(gè)Handler處理業(yè)務(wù)邏輯。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容