- 我們都知道, Kafka的每個(gè)Topic的存儲(chǔ)在邏輯上分成若干個(gè)Partition,每個(gè)Partition又可以設(shè)置自己的副本Replica;
- 這樣的設(shè)計(jì)就引出了幾個(gè)概念:
- Partition: 消息在Kafka上存儲(chǔ)的最小邏輯單元, 在物理上對(duì)應(yīng)在不同的Broker機(jī)器上;
- Replica: 每個(gè)Partition可以設(shè)置自己的副本Partition, 這樣主Partition叫作
Leader, 副本叫作Replica;從災(zāi)備的角度考慮, 在物理上Replica盡量不要與Leader在同一臺(tái)Broker物理機(jī)上; - Ack: 客戶端produce消息時(shí), 可以設(shè)置Kafka服務(wù)端回應(yīng)ack的策略:
3.1 不用回Ack, 客戶端發(fā)送效率最高, 但無法確認(rèn)是否真的發(fā)送成功;
3.2 僅Partition leader回ack, 發(fā)送效率次之, 可以確認(rèn)Leader已經(jīng)接收到消息;在這種情況下,如果leader掛了, 客戶端將無法消費(fèi)到這個(gè)消息;
3.3 所有Replica(實(shí)際上這不是真的)都需要回ack, 發(fā)送效率最差, Replica需要從Leader拉取消息; - ISR:
In Sync Replica, 是所有Replica的一個(gè)子集. Partition的replica可能很多, 針對(duì)上面的3.3,如果需要所有replicat都拉取到消息后再回ack,發(fā)送效率會(huì)很差,因此Kafka用了折衷的辦法, 僅需要ISR中的replica接收了消息即可.ISR中的replica的消息應(yīng)一直與leader同步;
- 既然有
Leader的角色,又有多個(gè)replica, 就存在一個(gè)在選主的問題, 我們就來講下多種情況下的選主策略;
PartitionLeaderSelector
- 所在文件: core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
- 這個(gè)
trait, 各種選主策略類都實(shí)現(xiàn)了它.聲明了如下的方法, 返回LeaderAndIsr類型的request
/**
* @param topicAndPartition The topic and partition whose leader needs to be elected
* @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper
* @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
* @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
* the LeaderAndIsrRequest.
*/
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
OfflinePartitionLeaderSelector
- 所在 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
- 可用于Offline狀態(tài)Partitions的選主,比如Topic剛剛創(chuàng)建后;
- 規(guī)則, 源碼中的注釋
Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
- If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live isr as the new isr.
- Else, if unclean leader election for the topic is disabled, it throws a NoReplicaOnlineException.
- Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
- If no broker in the assigned replica list is alive, it throws a NoReplicaOnlineException
Replicas to receive LeaderAndIsr request = live assigned replicas - Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
- 翻譯成圖:

PartitionLeaderSelector.png
ReassignedPartitionLeaderSelector
- 所在 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
- 用于Partitions再分配后的LeaderSelect;
- 規(guī)則:
- New leader = a live in-sync reassigned replica
- New isr = current isr
- Replicas to receive LeaderAndIsr request = reassigned replicas
ControlledShutdownLeaderSelector
- 所在 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
- 用于ControllerShutdown時(shí)的leader select
- 規(guī)則:
- New leader = replica in isr that's not being shutdown;
- New isr = current isr - shutdown replica;
- Replicas to receive LeaderAndIsr request = live assigned replicas
NoOpLeaderSelector
- 所在 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
- 其實(shí)什么都不作,返回當(dāng)前的Leader, ISR和Replicas;
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
warn("I should never have been asked to perform leader election, returning the current LeaderAndIsr and replica assignment.")
(currentLeaderAndIsr, controllerContext.partitionReplicaAssignment(topicAndPartition))
}