Kafka 信息zk結(jié)構(gòu)

啟動(dòng)server-0 --> zkcli --> ls /
[zk: localhost:2181(CONNECTED) 1] ls /
[cluster, controller, brokers, zookeeper, admin, isr_change_notification, log_dir_event_notification,
controller_epoch, consumers, latest_producer_id_block, config]
cluster
保存集群id和版本信息,broker 啟動(dòng)時(shí)從/cluster/id獲取,如果沒(méi)有broker生成
[zk: localhost:2181(CONNECTED) 14] get /cluster/id
{"version":"1","id":"cw-I6v4cSt2lsNio4v-rUQ"}
controller 集群臨時(shí)節(jié)點(diǎn)
int (broker id of the controller) 存儲(chǔ)center controller中央控制器所在kafka broker的信息
Kafka集群中多個(gè)broker,有一個(gè)會(huì)被選舉為controller leader,負(fù)責(zé)管理整個(gè)集群中分區(qū)和副本的狀態(tài),當(dāng)partition的leader 副本故障,由controller 負(fù)責(zé)為該partition重新選舉新的leader 副本;當(dāng)檢測(cè)到ISR列表發(fā)生變化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某個(gè)topic分區(qū)的時(shí)候也會(huì)由controller管理分區(qū)的重新分配工作。
當(dāng)broker啟動(dòng)的時(shí)候,都會(huì)創(chuàng)建KafkaController對(duì)象,但是集群中只能有一個(gè)leader對(duì)外提供服務(wù),這些每個(gè)節(jié)點(diǎn)上的KafkaController會(huì)在指定的zookeeper路徑下創(chuàng)建臨時(shí)節(jié)點(diǎn),只有第一個(gè)成功創(chuàng)建的節(jié)點(diǎn)的KafkaController才可以成為leader,其余的都是follower。當(dāng)leader故障后,所有的follower會(huì)收到通知,再次競(jìng)爭(zhēng)在該路徑下創(chuàng)建節(jié)點(diǎn)從而選舉新的leader。
Schema:
{
"version": 版本編號(hào)默認(rèn)為1,
"brokerid": kafka集群中broker唯一編號(hào),
"timestamp": kafka broker中央控制器變更時(shí)的時(shí)間戳
}
[zk: localhost:2181(CONNECTED) 17] get /controller
{"version":1,"brokerid":0,"timestamp":"1559195868940"}
關(guān)閉brokerid 0
[zk: localhost:2181(CONNECTED) 9] get /controller
{"version":1,"brokerid":1,"timestamp":"1559232245731"}
brokers [ids, topics, seqid]
ids:/brokers/ids/[0...N]
每個(gè)broker的配置文件中都需要指定一個(gè)數(shù)字類型的id(全局不可重復(fù)),此節(jié)點(diǎn)為臨時(shí)znode
Schema:
{
"jmx_port": jmx端口號(hào),
"timestamp": kafka broker初始啟動(dòng)時(shí)的時(shí)間戳,
"host": 主機(jī)名或ip地址,
"version": 版本編號(hào)默認(rèn)為1,
"port": kafka broker的服務(wù)端端口號(hào),由server.properties中參數(shù)port確定
}
[zk: localhost:2181(CONNECTED) 16] ls /brokers/ids
[0, 1, 2]
[zk: localhost:2181(CONNECTED) 17] get /brokers/ids/0
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},
"endpoints":["PLAINTEXT://192.168.1.103:9092"],
"jmx_port":-1,"host":"192.168.1.103",
"timestamp":"1559232289265","port":9092,"version":4}
seqid:/brokers/seqid
broker啟動(dòng)時(shí)檢查并確保存在, 永久節(jié)點(diǎn)
topics:/brokers/topics/[xxxx]
kafka 集群中topic 信息
Schema:
{
"version": "版本編號(hào)目前固定為數(shù)字1",
"partitions": {
"partitionId編號(hào)": [
同步副本組brokerId列表
],
"partitionId編號(hào)": [
同步副本組brokerId列表
],
.......
}
}
[zk: localhost:2181(CONNECTED) 25] ls /brokers/topics
[test1]
[zk: localhost:2181(CONNECTED) 27] get /brokers/topics/test1
{"version":1,"partitions":{"2":[2,1],"1":[1,0],"3":[0,1],"0":[0,2]}}
/brokers/topics/[topic]/partitions/[0...N]/state 其中[0..N]表示partition索引號(hào)
Schema:
{
"controller_epoch": 表示kafka集群中的中央控制器選舉次數(shù),
"leader": 表示該partition選舉leader的brokerId,
"version": 版本編號(hào)默認(rèn)為1,
"leader_epoch": 該partition leader選舉次數(shù),
"isr": [同步副本組brokerId列表]
}
[zk: localhost:2181(CONNECTED) 8] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":14,"leader":0,"version":1,"leader_epoch":0,"isr":[0,2]}
[zk: localhost:2181(CONNECTED) 11] get /controller_epoch
14
關(guān)閉broker 0 選舉broker2位leader isr 只有broker2節(jié)點(diǎn)
[zk: localhost:2181(CONNECTED) 13] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":14,"leader":2,"version":1,"leader_epoch":1,"isr":[2]}
[zk: localhost:2181(CONNECTED) 20] get /controller_epoch
15
重新啟動(dòng)broker0, 集群epoch 為變化, p0的 leader epoch 增長(zhǎng)為2
[zk: localhost:2181(CONNECTED) 31] get /brokers/topics/test1/partitions/0/state
{"controller_epoch":15,"leader":0,"version":1,"leader_epoch":2,"isr":[2,0]}
[zk: localhost:2181(CONNECTED) 20] get /controller_epoch
15
controller_epoch(永久節(jié)點(diǎn),控制年代)
此值為一個(gè)數(shù)字,kafka集群中第一個(gè)broker第一次啟動(dòng)時(shí)為1,以后只要集群中center controller中央控制器所在broker變更或掛掉,就會(huì)重新選舉新的center controller,每次center controller變更c(diǎn)ontroller_epoch值就會(huì) + 1;
get /controller_epoch
7
isr_change_notification
在Kafka 中, Leader 和Follower 的數(shù)據(jù)同步遵循的是"最終一致"原則, 也就是數(shù)據(jù)同步會(huì)有延遲, 但保證最終數(shù)據(jù)的一致性.isr 是'in-sync' replicas 的縮寫(xiě),代表的是與Leader 數(shù)據(jù)已經(jīng)同步過(guò)的replica, 它會(huì)作為重選Leader 時(shí)作為判斷依據(jù),用來(lái)處理ISR集合變更的動(dòng)作.
log_dir_event_notification
consumer注冊(cè)信息
每個(gè)consumer都有一個(gè)唯一的ID(consumerId可以通過(guò)配置文件指定,也可以由系統(tǒng)生成),此id用來(lái)標(biāo)記消費(fèi)者信息
consumerId產(chǎn)生規(guī)則:
StringconsumerUuid = null;
if(config.consumerId!=null && config.consumerId){
consumerUuid = consumerId;
}else {
String uuid = UUID.randomUUID()
consumerUuid = "%s-%d-%s".format(
InetAddress.getLocalHost.getHostName, System.currentTimeMillis,
uuid.getMostSignificantBits().toHexString.substring(0,8));
}
String consumerIdString = config.groupId + "_" + consumerUuid;
Schema:
{
"version": 版本編號(hào)默認(rèn)為1,
"subscription": { //訂閱topic列表
"topic名稱": consumer中topic消費(fèi)者線程數(shù)
},
"pattern": "static",
"timestamp": "consumer啟動(dòng)時(shí)的時(shí)間戳"
}
Example:
{
"version":1,
"subscription":{
"replicatedtopic":1
},
"pattern":"white_list",
"timestamp":"1452134230082"
}
5.consumer owner
/consumers/[groupId]/owners/[topic]/[partitionId] -> consumerIdString + threadId索引編號(hào)
當(dāng)consumer啟動(dòng)時(shí),所觸發(fā)的操作:
a) 首先進(jìn)行"Consumer Id注冊(cè)";
b) 然后在"Consumer id 注冊(cè)"節(jié)點(diǎn)下注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)當(dāng)前group中其他consumer的"退出"和"加入";只要此znode path下節(jié)點(diǎn)列表變更,
都會(huì)觸發(fā)此group下consumer的負(fù)載均衡.(比如一個(gè)consumer失效,那么其他consumer接管partitions).
c) 在"Broker id 注冊(cè)"節(jié)點(diǎn)下,注冊(cè)一個(gè)watch用來(lái)監(jiān)聽(tīng)broker的存活情況;如果broker列表變更,將會(huì)觸發(fā)所有的groups下的consumer重新balance.
consumer offset
/consumers/[groupId]/offsets/[topic]/[partitionId] -> long (offset)
用來(lái)跟蹤每個(gè)consumer目前所消費(fèi)的partition中最大的offset
此znode為持久節(jié)點(diǎn),可以看出offset跟group_id有關(guān),以表明當(dāng)消費(fèi)者組(consumer group)中一個(gè)消費(fèi)者失效,
重新觸發(fā)balance,其他consumer可以繼續(xù)消費(fèi).
latest_producer_id_block
broker啟動(dòng)時(shí)提前預(yù)分配一段PID,當(dāng)前是0~999,即提前分配出1000個(gè)PID來(lái)
[zk: localhost:2181(CONNECTED) 32] get /latest_producer_id_block
{"version":1,"broker":1,"block_start":"8000","block_end":"8999"}
config
kafka配置信息
[zk: localhost:2181(CONNECTED) 0] ls /config
[changes, clients, brokers, topics, users]
/configs/topics 存放topic的定制化配置信息