Kafka Zookeeper 結(jié)構(gòu)

Kafka 信息zk結(jié)構(gòu)

image.png

啟動(dòng)server-0 --> zkcli --> ls /

[zk: localhost:2181(CONNECTED) 1] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification, 
controller_epoch, consumers, latest_producer_id_block, config]

cluster

保存集群id和版本信息,broker 啟動(dòng)時(shí)從/cluster/id獲取,如果沒(méi)有broker生成

[zk: localhost:2181(CONNECTED) 14] get /cluster/id
{"version":"1","id":"cw-I6v4cSt2lsNio4v-rUQ"}

controller 集群臨時(shí)節(jié)點(diǎn)

int (broker id of the controller) 存儲(chǔ)center controller中央控制器所在kafka broker的信息
Kafka集群中多個(gè)broker,有一個(gè)會(huì)被選舉為controller leader,負(fù)責(zé)管理整個(gè)集群中分區(qū)和副本的狀態(tài),當(dāng)partition的leader 副本故障,由controller 負(fù)責(zé)為該partition重新選舉新的leader 副本;當(dāng)檢測(cè)到ISR列表發(fā)生變化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某個(gè)topic分區(qū)的時(shí)候也會(huì)由controller管理分區(qū)的重新分配工作。
當(dāng)broker啟動(dòng)的時(shí)候,都會(huì)創(chuàng)建KafkaController對(duì)象,但是集群中只能有一個(gè)leader對(duì)外提供服務(wù),這些每個(gè)節(jié)點(diǎn)上的KafkaController會(huì)在指定的zookeeper路徑下創(chuàng)建臨時(shí)節(jié)點(diǎn),只有第一個(gè)成功創(chuàng)建的節(jié)點(diǎn)的KafkaController才可以成為leader,其余的都是follower。當(dāng)leader故障后,所有的follower會(huì)收到通知,再次競(jìng)爭(zhēng)在該路徑下創(chuàng)建節(jié)點(diǎn)從而選舉新的leader。

Schema:
{
    "version": 版本編號(hào)默認(rèn)為1,
    "brokerid": kafka集群中broker唯一編號(hào),
    "timestamp": kafka broker中央控制器變更時(shí)的時(shí)間戳
}
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":0,"timestamp":"1559195868940"}
關(guān)閉brokerid 0
[zk: localhost:2181(CONNECTED) 9] get /controller
{"version":1,"brokerid":1,"timestamp":"1559232245731"}

brokers [ids, topics, seqid]

ids:/brokers/ids/[0...N] 
每個(gè)broker的配置文件中都需要指定一個(gè)數(shù)字類型的id(全局不可重復(fù)),此節(jié)點(diǎn)為臨時(shí)znode
Schema:
{
    "jmx_port": jmx端口號(hào),
    "timestamp": kafka broker初始啟動(dòng)時(shí)的時(shí)間戳,
    "host": 主機(jī)名或ip地址,
    "version": 版本編號(hào)默認(rèn)為1,
    "port": kafka broker的服務(wù)端端口號(hào),由server.properties中參數(shù)port確定
}
[zk: localhost:2181(CONNECTED) 16] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 17] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},
"endpoints":["PLAINTEXT://192.168.1.103:9092"],
"jmx_port":-1,"host":"192.168.1.103",
"timestamp":"1559232289265","port":9092,"version":4}
seqid:/brokers/seqid
broker啟動(dòng)時(shí)檢查并確保存在, 永久節(jié)點(diǎn)
topics:/brokers/topics/[xxxx] 
kafka 集群中topic 信息
Schema:
{
    "version": "版本編號(hào)目前固定為數(shù)字1",
    "partitions": {
        "partitionId編號(hào)": [
            同步副本組brokerId列表
        ],
        "partitionId編號(hào)": [
            同步副本組brokerId列表
        ],
        .......
    }
}
[zk: localhost:2181(CONNECTED) 25] ls /brokers/topics
[test1]
[zk: localhost:2181(CONNECTED) 27] get /brokers/topics/test1
{"version":1,"partitions":{"2":[2,1],"1":[1,0],"3":[0,1],"0":[0,2]}}
/brokers/topics/[topic]/partitions/[0...N]/state 其中[0..N]表示partition索引號(hào)
Schema:
{
    "controller_epoch": 表示kafka集群中的中央控制器選舉次數(shù),
    "leader": 表示該partition選舉leader的brokerId,
    "version": 版本編號(hào)默認(rèn)為1,
    "leader_epoch": 該partition leader選舉次數(shù),
    "isr": [同步副本組brokerId列表]
}
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":14,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2]}
[zk: localhost:2181(CONNECTED) 11] get /controller_epoch
14
關(guān)閉broker 0 選舉broker2位leader isr 只有broker2節(jié)點(diǎn)
[zk: localhost:2181(CONNECTED) 13] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":14,"leader":2,"version":1,"leader_epoch":1,"isr":[2]}
[zk: localhost:2181(CONNECTED) 20] get /controller_epoch
15
重新啟動(dòng)broker0, 集群epoch 為變化,  p0的 leader epoch 增長(zhǎng)為2
[zk: localhost:2181(CONNECTED) 31] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":15,"leader":0,"version":1,"leader_epoch":2,"isr":[2,0]}
[zk: localhost:2181(CONNECTED) 20] get /controller_epoch
15

controller_epoch(永久節(jié)點(diǎn),控制年代)

此值為一個(gè)數(shù)字,kafka集群中第一個(gè)broker第一次啟動(dòng)時(shí)為1,以后只要集群中center controller中央控制器所在broker變更或掛掉,就會(huì)重新選舉新的center controller,每次center controller變更c(diǎn)ontroller_epoch值就會(huì) + 1;

get /controller_epoch
7

isr_change_notification

在Kafka 中, Leader 和Follower 的數(shù)據(jù)同步遵循的是"最終一致"原則, 也就是數(shù)據(jù)同步會(huì)有延遲, 但保證最終數(shù)據(jù)的一致性.isr 是'in-sync' replicas 的縮寫(xiě),代表的是與Leader 數(shù)據(jù)已經(jīng)同步過(guò)的replica, 它會(huì)作為重選Leader 時(shí)作為判斷依據(jù),用來(lái)處理ISR集合變更的動(dòng)作.

log_dir_event_notification

consumer注冊(cè)信息

每個(gè)consumer都有一個(gè)唯一的ID(consumerId可以通過(guò)配置文件指定,也可以由系統(tǒng)生成),此id用來(lái)標(biāo)記消費(fèi)者信息

consumerId產(chǎn)生規(guī)則:
 
   StringconsumerUuid = null;
       if(config.consumerId!=null && config.consumerId){
           consumerUuid = consumerId;
       }else {
           String uuid = UUID.randomUUID()
           consumerUuid = "%s-%d-%s".format(
                InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
                uuid.getMostSignificantBits().toHexString.substring(0,8));
 
       }
  String consumerIdString = config.groupId + "_" + consumerUuid; 
 
Schema:
{
    "version": 版本編號(hào)默認(rèn)為1,
    "subscription": { //訂閱topic列表
        "topic名稱": consumer中topic消費(fèi)者線程數(shù)
    },
    "pattern": "static",
    "timestamp": "consumer啟動(dòng)時(shí)的時(shí)間戳"
}
  
Example:
{
    "version":1,
    "subscription":{
        "replicatedtopic":1
    },
    "pattern":"white_list",
    "timestamp":"1452134230082"
}

5.consumer owner

/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引編號(hào)

當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作:

a) 首先進(jìn)行"Consumer Id注冊(cè)";

b) 然后在"Consumer id 注冊(cè)"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)當(dāng)前group中其他consumer的"退出"和"加入";只要此znode path下節(jié)點(diǎn)列表變更,

    都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).

c) 在"Broker id 注冊(cè)"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.

consumer offset

/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)
用來(lái)跟蹤每個(gè)consumer目前所消費(fèi)的partition中最大的offset
此znode為持久節(jié)點(diǎn),可以看出offset跟group_id有關(guān),以表明當(dāng)消費(fèi)者組(consumer group)中一個(gè)消費(fèi)者失效,
重新觸發(fā)balance,其他consumer可以繼續(xù)消費(fèi).

latest_producer_id_block

broker啟動(dòng)時(shí)提前預(yù)分配一段PID,當(dāng)前是0~999,即提前分配出1000個(gè)PID來(lái)

[zk: localhost:2181(CONNECTED) 32] get /latest_producer_id_block
{"version":1,"broker":1,"block_start":"8000","block_end":"8999"}

config

kafka配置信息

[zk: localhost:2181(CONNECTED) 0] ls /config
[changes, clients, brokers, topics, users]

/configs/topics 存放topic的定制化配置信息

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容