Kafka設(shè)計(jì)解析(二)- Kafka High Availability (上)

Kafka設(shè)計(jì)解析(二)- Kafka High Availability (上)

原創(chuàng)文章,轉(zhuǎn)載請(qǐng)務(wù)必將下面這段話置于文章開頭處。(已授權(quán)InfoQ中文站發(fā)布
本文轉(zhuǎn)發(fā)自技術(shù)世界原文鏈接

Kafka從0.8開始提供High Availability機(jī)制。本文從Data Replication和Leader Election兩方面介紹了Kafka的HA機(jī)制。

Kafka為何需要High Available

為何需要Replication

在Kafka在0.8以前的版本中,是沒(méi)有Replication的,一旦某一個(gè)Broker宕機(jī),則其上所有的Partition數(shù)據(jù)都不可被消費(fèi),這與Kafka數(shù)據(jù)持久性及Delivery Guarantee的設(shè)計(jì)目標(biāo)相悖。同時(shí)Producer都不能再將數(shù)據(jù)存于這些Partition中。

  • 如果Producer使用同步模式則Producer會(huì)在嘗試重新發(fā)送message.send.max.retries(默認(rèn)值為3)次后拋出Exception,用戶可以選擇停止發(fā)送后續(xù)數(shù)據(jù)也可選擇繼續(xù)選擇發(fā)送。而前者會(huì)造成數(shù)據(jù)的阻塞,后者會(huì)造成本應(yīng)發(fā)往該Broker的數(shù)據(jù)的丟失。
數(shù)據(jù)發(fā)送拋出異常后
如果停止發(fā)送后續(xù)數(shù)據(jù):會(huì)造成數(shù)據(jù)阻塞
如果繼續(xù)發(fā)送數(shù)據(jù):后續(xù)發(fā)送的數(shù)據(jù)丟失,因?yàn)榇薆roker已經(jīng)出問(wèn)題
  • 如果Producer使用異步模式,則Producer會(huì)嘗試重新發(fā)送message.send.max.retries(默認(rèn)值為3)次后記錄該異常并繼續(xù)發(fā)送后續(xù)數(shù)據(jù),這會(huì)造成數(shù)據(jù)丟失并且用戶只能通過(guò)日志發(fā)現(xiàn)該問(wèn)題。
數(shù)據(jù)發(fā)送拋出異常、記錄異常后繼續(xù)發(fā)送數(shù)據(jù)
造成數(shù)據(jù)丟失(效果等同于同步異常結(jié)果二)

由此可見,在沒(méi)有Replication的情況下,一旦某機(jī)器宕機(jī)或者某個(gè)Broker停止工作則會(huì)造成整個(gè)系統(tǒng)的可用性降低。隨著集群規(guī)模的增加,整個(gè)集群中出現(xiàn)該類異常的幾率大大增加,因此對(duì)于生產(chǎn)系統(tǒng)而言Replication機(jī)制的引入非常重要。

為何需要Leader Election

(本文所述Leader Election主要指Replica之間的Leader Election)
  引入Replication之后,同一個(gè)Partition可能會(huì)有多個(gè)Replication,而這時(shí)需要在這些Replication之間選出一個(gè)Leader,Producer和Consumer只與這個(gè)Leader交互,其它Replica作為Follower從Leader中復(fù)制數(shù)據(jù)。
  因?yàn)樾枰WC同一個(gè)Partition的多個(gè)Replica之間的數(shù)據(jù)一致性(其中一個(gè)宕機(jī)后其它Replica必須要能繼續(xù)服務(wù)并且即不能造成數(shù)據(jù)重復(fù)也不能造成數(shù)據(jù)丟失)。如果沒(méi)有一個(gè)Leader,所有Replica都可同時(shí)讀/寫數(shù)據(jù),那就需要保證多個(gè)Replica之間互相(N×N條通路)同步數(shù)據(jù),數(shù)據(jù)的一致性和有序性非常難保證,大大增加了Replication實(shí)現(xiàn)的復(fù)雜性,同時(shí)也增加了出現(xiàn)異常的幾率。而引入Leader后,只有Leader負(fù)責(zé)數(shù)據(jù)讀寫,F(xiàn)ollower只向Leader順序Fetch(取)數(shù)據(jù)(N條通路),系統(tǒng)更加簡(jiǎn)單且高效。

為保證數(shù)據(jù)一致性,使系統(tǒng)更加簡(jiǎn)單高效,需要選擇Leader Election

Kafka HA設(shè)計(jì)解析

如何將所有Replica均勻分布到整個(gè)集群

為了更好的做負(fù)載均衡,Kafka盡量將所有的Partition均勻分配到整個(gè)集群上。一個(gè)典型的部署方式是一個(gè)Topic的Partition數(shù)量大于Broker的數(shù)量。同時(shí)為了提高Kafka的容錯(cuò)能力,也需要將同一個(gè)Partition的Replica盡量分散到不同的機(jī)器。實(shí)際上,如果所有的Replica都在同一個(gè)Broker上,那一旦該Broker宕機(jī),該P(yáng)artition的所有Replica都無(wú)法工作,也就達(dá)不到HA的效果。同時(shí),如果某個(gè)Broker宕機(jī)了,需要保證它上面的負(fù)載可以被均勻的分配到其它幸存的所有Broker上。
  Kafka分配Replica的算法如下:

  1. 將所有Broker(假設(shè)共n個(gè)Broker)和待分配的Partition排序

  2. 將第i個(gè)Partition分配到第(i mod n)個(gè)Broker上

  3. 將第i個(gè)Partition的第j個(gè)Replica分配到第((i + j) mod n)個(gè)Broker上

    Key Hash算法
    

Data Replication

Kafka的Data Replication需要解決如下問(wèn)題:

  • 怎樣Propagate(傳送/發(fā)布)消息
  • 在向Producer發(fā)送ACK(確認(rèn)字符)前需要保證有多少個(gè)Replica已經(jīng)收到該消息
  • 怎樣處理某個(gè)Replica不工作的情況
  • 怎樣處理Failed Replica恢復(fù)回來(lái)的情況

Propagate消息

? Producer在發(fā)布消息到某個(gè)Partition時(shí),先通過(guò)Zookeeper找到該P(yáng)artition的Leader,然后無(wú)論該Topic的Replication Factor(因子)為多少(也即該P(yáng)artition有多少個(gè)Replica),Producer只將該消息發(fā)送到該P(yáng)artition的Leader。Leader會(huì)將該消息寫入其本地Log。每個(gè)Follower都從Leader pull數(shù)據(jù)。這種方式上,F(xiàn)ollower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致。Follower在收到該消息并寫入其Log后,向Leader發(fā)送ACK。一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW(offset低于HW的消息)并且向Producer發(fā)送ACK。

1 Producer發(fā)布消息,通過(guò)Zookeeper找到Partition的Leader
2 Producer將消息發(fā)送給Leader,并記錄Log
3 其他的Follower從Leader復(fù)制數(shù)據(jù)
4 Follower收到數(shù)據(jù),先記錄Log,隨后向Leader發(fā)送ACK,之后繼續(xù)復(fù)制
5 Leader收到ACK,則標(biāo)記已提交,新增HW(標(biāo)記備份OK),再告訴Producer消息發(fā)送成功

? 為了提高性能,每個(gè)Follower在接收到數(shù)據(jù)后就立馬向Leader發(fā)送ACK,而非等到數(shù)據(jù)寫入Log中。因此,對(duì)于已經(jīng)commit的消息,Kafka只能保證它被存于多個(gè)Replica的內(nèi)存中,而不能保證它們被持久化到磁盤中,也就不能完全保證異常發(fā)生后該條消息一定能被Consumer消費(fèi)。但考慮到這種場(chǎng)景非常少見,可以認(rèn)為這種方式在性能和數(shù)據(jù)持久化上做了一個(gè)比較好的平衡。在將來(lái)的版本中,Kafka會(huì)考慮提供更高的持久性。
Consumer讀消息也是從Leader讀取,只有被commit過(guò)的消息(offset低于HW的消息)才會(huì)暴露給Consumer。
? Kafka Replication的數(shù)據(jù)流如下圖所示

ACK前需要保證有多少個(gè)備份

? 和大部分分布式系統(tǒng)一樣,Kafka處理失敗需要明確定義一個(gè)Broker是否“活著”。對(duì)于Kafka而言,Kafka存活包含兩個(gè)條件,一是它必須維護(hù)與Zookeeper的session(這個(gè)通過(guò)Zookeeper的Heartbeat機(jī)制來(lái)實(shí)現(xiàn))。二是Follower必須能夠及時(shí)將Leader的消息復(fù)制過(guò)來(lái),不能“落后太多”。

通過(guò)HeartBeat機(jī)制和Follower復(fù)制消息時(shí)間為準(zhǔn)則來(lái)判斷Broker是否活著

Leader會(huì)跟蹤與其保持同步的Replica列表,該列表稱為ISR(即in-sync Replica)。如果一個(gè)Follower宕機(jī),或者落后太多,Leader將把它從ISR中移除。這里所描述的“落后太多”指Follower復(fù)制的消息落后于Leader后的條數(shù)超過(guò)預(yù)定值(該值可在$KAFKA_HOME/config/server.properties中通過(guò)replica.lag.max.messages配置,其默認(rèn)值是4000)或者Follower超過(guò)一定時(shí)間(該值可在$KAFKA_HOME/config/server.properties中通過(guò)replica.lag.time.max.ms來(lái)配置,其默認(rèn)值是10000)未向Leader發(fā)送fetch請(qǐng)求。。

ISR:一個(gè)記錄Replica的列表,及時(shí)去除異常的Follower(Replica),作用防止數(shù)據(jù)丟失

Kafka的復(fù)制機(jī)制既不是完全的同步復(fù)制,也不是單純的異步復(fù)制。事實(shí)上,同步復(fù)制要求所有能工作的Follower都復(fù)制完,這條消息才會(huì)被認(rèn)為commit,這種復(fù)制方式極大的影響了吞吐率(高吞吐率是Kafka非常重要的一個(gè)特性)。而異步復(fù)制方式下,F(xiàn)ollower異步的從Leader復(fù)制數(shù)據(jù),數(shù)據(jù)只要被Leader寫入log就被認(rèn)為已經(jīng)commit,這種情況下如果Follower都復(fù)制完都落后于Leader,而如果Leader突然宕機(jī),則會(huì)丟失數(shù)據(jù)。而Kafka的這種使用ISR的方式則很好的均衡了確保數(shù)據(jù)不丟失以及吞吐率。Follower可以批量的從Leader復(fù)制數(shù)據(jù),這樣極大的提高復(fù)制性能(批量寫磁盤),極大減少了Follower與Leader的差距。

同步復(fù)制和異步復(fù)制的優(yōu)缺點(diǎn),保證Kafka的高吞吐率和可靠性

需要說(shuō)明的是,Kafka只解決fail/recover,不處理“Byzantine”(“拜占庭”)問(wèn)題。一條消息只有都被ISR里的所有Follower從Leader復(fù)制過(guò)去才會(huì)被認(rèn)為已提交。這樣就避免了部分?jǐn)?shù)據(jù)被寫進(jìn)了Leader,還沒(méi)來(lái)得及被任何Follower復(fù)制就宕機(jī)了,而造成數(shù)據(jù)丟失(Consumer無(wú)法消費(fèi)這些數(shù)據(jù))。而對(duì)于Producer而言,它可以選擇是否等待消息commit,這可以通過(guò)request.required.acks來(lái)設(shè)置。這種機(jī)制確保了只要ISR有一個(gè)或以上的Follower,一條被commit的消息就不會(huì)丟失。

Leader Election算法

? 上文說(shuō)明了Kafka是如何做Replication的,另外一個(gè)很重要的問(wèn)題是當(dāng)Leader宕機(jī)了,怎樣在Follower中選舉出新的Leader。因?yàn)镕ollower可能落后許多或者crash了,所以必須確保選擇“最新”的Follower作為新的Leader。一個(gè)基本的原則就是,如果Leader不在了,新的Leader必須擁有原來(lái)的Leader commit過(guò)的所有消息。這就需要作一個(gè)折衷,如果Leader在標(biāo)明一條消息被commit前等待更多的Follower確認(rèn),那在它宕機(jī)之后就有更多的Follower可以作為新的Leader,但這也會(huì)造成吞吐率的下降。
  一種非常常用的Leader Election的方式是“Majority Vote”(“少數(shù)服從多數(shù)”),但Kafka并未采用這種方式。這種模式下,如果我們有2f+1個(gè)Replica(包含Leader和Follower),那在commit之前必須保證有f+1個(gè)Replica復(fù)制完消息,為了保證正確選出新的Leader,fail的Replica不能超過(guò)f個(gè)。因?yàn)樵谑O碌娜我鈌+1個(gè)Replica里,至少有一個(gè)Replica包含有最新的所有消息。這種方式有個(gè)很大的優(yōu)勢(shì),系統(tǒng)的latency(延遲)只取決于最快的幾個(gè)Broker,而非最慢那個(gè)。Majority Vote也有一些劣勢(shì),為了保證Leader Election的正常進(jìn)行,它所能容忍的fail的follower個(gè)數(shù)比較少。如果要容忍1個(gè)follower掛掉,必須要有3個(gè)以上的Replica,如果要容忍2個(gè)Follower掛掉,必須要有5個(gè)以上的Replica。也就是說(shuō),在生產(chǎn)環(huán)境下為了保證較高的容錯(cuò)程度,必須要有大量的Replica,而大量的Replica又會(huì)在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降。這就是這種算法更多用在Zookeeper這種共享集群配置的系統(tǒng)中而很少在需要存儲(chǔ)大量數(shù)據(jù)的系統(tǒng)中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的數(shù)據(jù)存儲(chǔ)并沒(méi)有使用這種方式。
  實(shí)際上,Leader Election算法非常多,比如Zookeeper的Zab, RaftViewstamped Replication。而Kafka所使用的Leader Election算法更像微軟的PacificA算法。

Majority Vote:需要較多的Replica,會(huì)導(dǎo)致性能下降,但系統(tǒng)的延遲取決于最快的幾個(gè)Broker

Kafka在Zookeeper中動(dòng)態(tài)維護(hù)了一個(gè)ISR(in-sync replicas),這個(gè)ISR里的所有Replica都跟上了leader,只有ISR里的成員才有被選為L(zhǎng)eader的可能。在這種模式下,對(duì)于f+1個(gè)Replica,一個(gè)Partition能在保證不丟失已經(jīng)commit的消息的前提下容忍f個(gè)Replica的失敗。在大多數(shù)使用場(chǎng)景中,這種模式是非常有利的。事實(shí)上,為了容忍f個(gè)Replica的失敗,Majority Vote和ISR在commit前需要等待的Replica數(shù)量是一樣的,但是ISR需要的總的Replica的個(gè)數(shù)幾乎是Majority Vote的一半。

ISR:需要的Replica個(gè)數(shù)是Majority Vote的一半,但系統(tǒng)的延遲取決于列表里面的Replica對(duì)應(yīng)的Broker,有可能是最慢的Broker

雖然Majority Vote與ISR相比有不需等待最慢的Broker這一優(yōu)勢(shì),但是Kafka作者認(rèn)為Kafka可以通過(guò)Producer選擇是否被commit阻塞來(lái)改善這一問(wèn)題,并且節(jié)省下來(lái)的Replica和磁盤使得ISR模式仍然值得。

如何處理所有Replica都不工作

上文提到,在ISR中至少有一個(gè)follower時(shí),Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某個(gè)Partition的所有Replica都宕機(jī)了,就無(wú)法保證數(shù)據(jù)不丟失了。這種情況下有兩種可行的方案:

  • 等待ISR中的任一個(gè)Replica“活”過(guò)來(lái),并且選它作為L(zhǎng)eader
  • 選擇第一個(gè)“活”過(guò)來(lái)的Replica(不一定是ISR中的)作為L(zhǎng)eader

這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的折衷。如果一定要等待ISR中的Replica“活”過(guò)來(lái),那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果ISR中的所有Replica都無(wú)法“活”過(guò)來(lái)了,或者數(shù)據(jù)都丟失了,這個(gè)Partition將永遠(yuǎn)不可用。選擇第一個(gè)“活”過(guò)來(lái)的Replica作為L(zhǎng)eader,而這個(gè)Replica不是ISR中的Replica,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會(huì)成為L(zhǎng)eader而作為consumer的數(shù)據(jù)源(前文有說(shuō)明,所有讀寫都由Leader完成)。Kafka0.8.*使用了第二種方式。根據(jù)Kafka的文檔,在以后的版本中,Kafka支持用戶通過(guò)配置選擇這兩種方式中的一種,從而根據(jù)不同的使用場(chǎng)景選擇高可用性還是強(qiáng)一致性。

可用性和一致性的權(quán)衡,個(gè)人建議在第一種方案中設(shè)置時(shí)間,超過(guò)固定時(shí)間沒(méi)有活過(guò)來(lái)的Peplica,則采用第二種方案

如何選舉Leader

最簡(jiǎn)單最直觀的方案是,所有Follower都在Zookeeper上設(shè)置一個(gè)Watch,一旦Leader宕機(jī),其對(duì)應(yīng)的ephemeral znode(暫時(shí)zookeeper節(jié)點(diǎn))會(huì)自動(dòng)刪除,此時(shí)所有Follower都嘗試創(chuàng)建該節(jié)點(diǎn),而創(chuàng)建成功者(Zookeeper保證只有一個(gè)能創(chuàng)建成功)即是新的Leader,其它Replica即為Follower。

Leader宕機(jī)--->刪除ephemeral znode--->Follower競(jìng)爭(zhēng)創(chuàng)建此節(jié)點(diǎn)

但是該方法會(huì)有3個(gè)問(wèn)題:

  • split-brain 這是由Zookeeper的特性引起的,雖然Zookeeper能保證所有Watch按順序觸發(fā),但并不能保證同一時(shí)刻所有Replica“看”到的狀態(tài)是一樣的,這就可能造成不同Replica的響應(yīng)不一致
  • herd effect 如果宕機(jī)的那個(gè)Broker上的Partition比較多,會(huì)造成多個(gè)Watch被觸發(fā),造成集群內(nèi)大量的調(diào)整
  • Zookeeper負(fù)載過(guò)重 每個(gè)Replica都要為此在Zookeeper上注冊(cè)一個(gè)Watch,當(dāng)集群規(guī)模增加到幾千個(gè)Partition時(shí)Zookeeper負(fù)載會(huì)過(guò)重。

Kafka 0.8.*的Leader Election方案解決了上述問(wèn)題,它在所有broker中選出一個(gè)controller,所有Partition的Leader選舉都由controller決定。controller會(huì)將Leader的改變直接通過(guò)RPC的方式(比Zookeeper Queue的方式更高效)通知需為此作出響應(yīng)的Broker。同時(shí)controller也負(fù)責(zé)增刪Topic以及Replica的重新分配。

HA相關(guān)Zookeeper結(jié)構(gòu)

(本節(jié)所示Zookeeper結(jié)構(gòu)中,實(shí)線框代表路徑名是固定的,而虛線框代表路徑名與業(yè)務(wù)相關(guān))
  admin (該目錄下znode只有在有相關(guān)操作時(shí)才會(huì)存在,操作結(jié)束時(shí)會(huì)將其刪除)

1 preferred_replica_election

/admin/preferred_replica_election數(shù)據(jù)結(jié)構(gòu)

Schema:
{
      "fields":[
         {
            "name":"version",
            "type":"int",
            "doc":"version id"
         },
         {
            "name":"partitions",
            "type":{
               "type":"array",
               "items":{
                  "fields":[
                     {
                        "name":"topic",
                        "type":"string",
                        "doc":"topic of the partition for which preferred replica election should be triggered"
                     },
                     {
                        "name":"partition",
                        "type":"int",
                        "doc":"the partition for which preferred replica election should be triggered"
                     }
                  ],
               }
               "doc":"an array of partitions for which preferred replica election should be triggered"
            }
         }
      ]
   }
    
Example:     
{
     "version": 1,
     "partitions":
        [
           {
               "topic": "topic1",
               "partition": 8         
           },
           {
               "topic": "topic2",
               "partition": 16        
           }
        ]            
}

2 reassign_partitions

/admin/reassign_partitions用于將一些Partition分配到不同的broker集合上。對(duì)于每個(gè)待重新分配的Partition,Kafka會(huì)在該znode上存儲(chǔ)其所有的Replica和相應(yīng)的Broker id。該znode由管理進(jìn)程創(chuàng)建并且一旦重新分配成功它將會(huì)被自動(dòng)移除。其數(shù)據(jù)結(jié)構(gòu)如下

Schema:
{
      "fields":[
         {
            "name":"version",
            "type":"int",
            "doc":"version id"
         },
         {
            "name":"partitions",
            "type":{
               "type":"array",
               "items":{
                  "fields":[
                     {
                        "name":"topic",
                        "type":"string",
                        "doc":"topic of the partition to be reassigned"
                     },
                     {
                        "name":"partition",
                        "type":"int",
                        "doc":"the partition to be reassigned"
                     },
                     {
                        "name":"replicas",
                        "type":"array",
                        "items":"int",
                        "doc":"a list of replica ids"
                     }
                  ],
               }
               "doc":"an array of partitions to be reassigned to new replicas"
            }
         }
      ]
   }

   Example:
   {
     "version": 1,
     "partitions":
        [
           {
               "topic": "topic3",
               "partition": 1,
               "replicas": [1, 2, 3]
           }
        ]            
   }

3 delete_topics

/admin/delete_topics數(shù)據(jù)結(jié)構(gòu)

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "topics",
       "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"}
      } ]
}
 
Example:
{
  "version": 1,
  "topics": ["topic4", "topic5"]
}

brokers

broker(即/brokers/ids/[brokerId])存儲(chǔ)“活著”的Broker信息。數(shù)據(jù)結(jié)構(gòu)如下

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "host", "type": "string", "doc": "ip address or host name of the broker"},
      {"name": "port", "type": "int", "doc": "port of the broker"},
      {"name": "jmx_port", "type": "int", "doc": "port for jmx"}
    ]
}
 
Example:
{
    "jmx_port":-1,
    "host":"node1",
    "version":1,
    "port":9092
}

4 topic注冊(cè)信息

topic注冊(cè)信息(/brokers/topics/[topic]),存儲(chǔ)該Topic的所有Partition的所有Replica所在的Broker id,第一個(gè)Replica即為Preferred Replica,對(duì)一個(gè)給定的Partition,它在同一個(gè)Broker上最多只有一個(gè)Replica,因此Broker id可作為Replica id。數(shù)據(jù)結(jié)構(gòu)如下

Schema:
{ "fields" :
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "partitions",
       "type": {"type": "map",
                "values": {"type": "array", "items": "int", "doc": "a list of replica ids"},
                "doc": "a map from partition id to replica list"},
      }
    ]
}
Example:
{
    "version":1,
    "partitions":
        {"12":[6],
        "8":[2],
        "4":[6],
        "11":[5],
        "9":[3],
        "5":[7],
        "10":[4],
        "6":[8],
        "1":[3],
        "0":[2],
        "2":[4],
        "7":[1],
        "3":[5]}
}

5 partition state

partition state(/brokers/topics/[topic]/partitions/[partitionId]/state) 結(jié)構(gòu)如下

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "isr",
       "type": {"type": "array",
                "items": "int",
                "doc": "an array of the id of replicas in isr"}
      },
      {"name": "leader", "type": "int", "doc": "id of the leader replica"},
      {"name": "controller_epoch", "type": "int", "doc": "epoch of the controller that last updated the leader and isr info"},
      {"name": "leader_epoch", "type": "int", "doc": "epoch of the leader"}
    ]
}
 
Example:
{
    "controller_epoch":29,
    "leader":2,
    "version":1,
    "leader_epoch":48,
    "isr":[2]
}

6 controller

/controller -> int (broker id of the controller)存儲(chǔ)當(dāng)前controller的信息

Schema:
{ "fields":
    [ {"name": "version", "type": "int", "doc": "version id"},
      {"name": "brokerid", "type": "int", "doc": "broker id of the controller"}
    ]
}
Example:
{
    "version":1,
  "brokerid":8
}

/controller_epoch -> int (epoch)直接以整數(shù)形式存儲(chǔ)controller epoch,而非像其它znode一樣以JSON字符串形式存儲(chǔ)。

Broker failover過(guò)程簡(jiǎn)介

  1. Controller在Zookeeper注冊(cè)Watch,一旦有Broker宕機(jī)(這是用宕機(jī)代表任何讓系統(tǒng)認(rèn)為其die的情景,包括但不限于機(jī)器斷電,網(wǎng)絡(luò)不可用,GC導(dǎo)致的Stop The World,進(jìn)程crash等),其在Zookeeper對(duì)應(yīng)的znode會(huì)自動(dòng)被刪除,Zookeeper會(huì)fire Controller(激活)注冊(cè)的watch,Controller讀取最新的幸存的Broker
  2. Controller決定set_p(set partition),該集合包含了宕機(jī)的所有Broker上的所有Partition
  3. 對(duì)set_p中的每一個(gè)Partition
    3.1 從/brokers/topics/[topic]/partitions/[partition]/state讀取該P(yáng)artition當(dāng)前的ISR
      3.2 決定該P(yáng)artition的新Leader。如果當(dāng)前ISR中有至少一個(gè)Replica還幸存,則選擇其中一個(gè)作為新Leader,新的ISR則包含當(dāng)前ISR中所有幸存的Replica。否則選擇該P(yáng)artition中任意一個(gè)幸存的Replica作為新的Leader以及ISR(該場(chǎng)景下可能會(huì)有潛在的數(shù)據(jù)丟失)。如果該P(yáng)artition的所有Replica都宕機(jī)了,則將新的Leader設(shè)置為-1。
      3.3 將新的Leader,ISR和新的leader_epochcontroller_epoch寫入/brokers/topics/[topic]/partitions/[partition]/state。注意,該操作只有其version在3.1至3.3的過(guò)程中無(wú)變化時(shí)才會(huì)執(zhí)行,否則跳轉(zhuǎn)到3.1
  4. 直接通過(guò)RPC向set_p相關(guān)的Broker發(fā)送LeaderAndISRRequest命令。Controller可以在一個(gè)RPC操作中發(fā)送多個(gè)命令從而提高效率。
  5. Broker failover(Broker故障轉(zhuǎn)移)順序圖如下所示。
    broker failover sequence diagram

關(guān)鍵字

Acknowledgement:確認(rèn)字符

Broker:代理 (一臺(tái)kafka服務(wù)器就是一個(gè)broker)

ISR:in-sync replicas 同步副本

HW:high watermark 高水位標(biāo)志

offset:位移 偏差

znode:zookeeper node

split-brain:腦裂

本來(lái)一個(gè)大腦的兩半球互相配合,變成了分裂成兩個(gè)獨(dú)立的大腦,都認(rèn)為對(duì)方已死。在集群環(huán)境中,有這么幾種可能造成"Split-Brain"現(xiàn)象:

? 1、在集群環(huán)境中的節(jié)點(diǎn)間的心跳線同時(shí)斷掉后,集群系統(tǒng)所處于的一種特殊狀態(tài)。例如節(jié)點(diǎn)1和2組成一個(gè)集群,突然1和2間的心跳同時(shí)都斷了,如果此前節(jié)點(diǎn)1正在運(yùn)行應(yīng)用,心跳都斷掉后2開始去接管應(yīng)用,強(qiáng)行加載數(shù)據(jù),此時(shí)就是split-brain。

? 2、集群中節(jié)點(diǎn)因?yàn)樘幚砥髅蛘咂渌驎簳r(shí)停止響應(yīng)時(shí),其他節(jié)點(diǎn)可能誤認(rèn)為該節(jié)點(diǎn)“已死”。

后果:節(jié)點(diǎn)間爭(zhēng)奪共享磁盤(即資源)的訪問(wèn)權(quán),都對(duì)共享文件系統(tǒng)產(chǎn)生讀寫操作,從而導(dǎo)致共享磁盤文件系統(tǒng)損壞。

解決辦法:使用硬盤心跳,scsi reservation,以及最極端的power fence。

Kafka系列文章

Kafka設(shè)計(jì)解析(一)- Kafka簡(jiǎn)介及架構(gòu)介紹
Kafka設(shè)計(jì)解析(二)- Kafka High Availability (上)
Kafka設(shè)計(jì)解析(三)- Kafka High Availability (下)
Kafka設(shè)計(jì)解析(四)- Kafka Consumer設(shè)計(jì)解析
Kafka設(shè)計(jì)解析(五)- Kafka性能測(cè)試方法及Benchmark報(bào)告
Kafka設(shè)計(jì)解析(六)- Kafka高性能架構(gòu)之道
Kafka設(shè)計(jì)解析(七)- Kafka Stream

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容