
前面我們已經(jīng)講述了生產(chǎn)者 http://www.itdecent.cn/p/40afc57471da 消費者 http://www.itdecent.cn/p/44e9e058ec5b 那么今天我們看下主題與分區(qū)。
主題,分區(qū)的作用
主題是對消息進行了歸類,分區(qū)是對消息進行了二次歸類,分區(qū)的劃分不僅為 Kafka 提供了可伸縮性、水平擴展的功能,還通過多副本機制來為 Kafka 提供數(shù)據(jù)冗余以提高數(shù)據(jù)可靠性。
主題作用
對于主題消息隊列都是一樣的kafka也不例外,管理主要包括主題創(chuàng)建,修改,查詢,刪除等操作。
思考??
如果我們像一個沒有創(chuàng)建的主題中發(fā)送一條消息會怎么樣呢?
首先如果我們像沒有創(chuàng)建的主題中發(fā)送一條消息,broker首先會去查看自動創(chuàng)建主題選項是否開啟 auto .cr eate.topics .enable是否位true。當生產(chǎn)者向一個尚未創(chuàng)建的主題發(fā)送消息時,會自動創(chuàng)建一個分區(qū)數(shù)為 num partitions(默認值為1)、副本因子為 default.repl cation.factor (默認值為1)的主題?;蛘弋斎我庖粋€客戶端向未知主題發(fā)送元數(shù)據(jù)請求時,都會按照配置參數(shù) num.partitions default.replicatio .factor值來創(chuàng)建一個相應的主題。
看起來自動創(chuàng)建主題的方式很方便,很好用。但是這種方式大大增加維護成
主題、分區(qū)、副本和Log 之間的關系
我們可以使用命令 創(chuàng)建一個分區(qū)數(shù)為4,副本因子為2的主題。命令如下:
bin/kafka-top cs .sh - zookeeper localhost: 2181/kafka --create --topic top create --partitions 4 --replication-factor 2
創(chuàng)建完腳本后的日志文件展示為:
[root@node1 kafka 2.11] # ls -al /tmp/kafka-logs/ I grep topic- create
drwxr xr x 2 root root 4096 Sep 8 15: 54 topic-create-0
drwxr-xr-x 2 root root 4096 Sep 8 15: 54 topic-create-1
[root@node2 kafka 2.12]# ls -al /tmp/kafka-logs/ lgrep topic- create
drwxr-xr-x 2 root root 4096 Sep 8 15: 49 topic-create-1
drwxr-xr- x 2 root root 4096 Sep 8 15 : 49 topic create-2
drwxr-xr- x 2 root root 4096 Sep 8 15: 49 topic- create-3
[root@node3 kafka 2.13] # ls - al /tmp/kafka-logs/ lgrep topic-create
drwxr-xr- x 2 root root 4096 Sep 8 07 : 54 topic-create-0
drwxr-xr- x 2 root root 4096 Sep 8 07 : 54 topic-create-2
drwxr- xr- x 2 root root 4096 Sep 8 07 : 54 topic-create-3
可以看到 node 1節(jié)點中創(chuàng)建了2個文件夾 topic-create-0和topic-create-1,對應主題topic-create的2個分區(qū)編號為0和1的分區(qū),命名方式可以概括為 topic-partition的結(jié)構(gòu)。
三個broker節(jié)點一共創(chuàng)建了8個文件夾 ,這個數(shù)字 實質(zhì)上是分區(qū)數(shù) 與副本因子 的乘積。每個副本才真正對應了一個命名形式如topic-partition的文件夾。
主題分區(qū)副本分配策略
主題、分區(qū)、副本和 Log (日志)的關系如圖1所示,主題和分區(qū)都是提供給上層用戶的抽象,而在副本層面或更加確切地說是 Log 面才有實際物理上的存在。同一個分區(qū)中的多個副本必須分布在不同的 broker 中,這樣才能提供有效的數(shù)據(jù)冗余。對于例中的分區(qū)數(shù)為4,副本因子為2,broker 數(shù)為3情況下會按照2,3,3 的分區(qū)副本個數(shù)分配給各個 broker 是最優(yōu)的選擇。再比如在分區(qū)數(shù)為3,副本因子3 ,并且 broker 數(shù)同樣為3的情況下,分配3,3,3的分區(qū)副本個數(shù)給各個broker是最優(yōu)的選擇,也就是每個 broker 中都擁有所有分區(qū)的一個個副本。

主題其他操作
主題創(chuàng)建
bin/kafka-top cs .sh - zookeeper localhost: 2181/kafka --create --topic top create --partitions 4 --replication-factor 2
主題查詢
通過list指令可以查看當前所有可用的主題,示例如下:
[root@nodel kafka_2 .11-2. 0. 0] # bin/kafka-topics.sh --zookeeper localhost: 2181/
kafka -lis七
consumer offsets
topic-create
topic-demo
topic-config
主題刪除
第一步, 刪除ZooKeeper中的節(jié)點/con巨g/七opics幾opic-delete。
[zk: localhost:2181/kafka (CONNECTED) 7] rm -rf /config/topics/topic-delete
第二步, 刪除ZooKeeper中的節(jié)點/brokers/topics/topic-delete及其子節(jié)點。
[zk: localhost:2181/kafka (CONNECTED) 8] delete /brokers/topics/topic-delete
第三步, 刪除集群中所有與主題topic-delete有關的文件。
#集群中的各個broker節(jié)點中執(zhí)行rm -rf /tmp/kafka吐ogs/topic-delete*命令來刪除與主題
topic-delete有關的文件
[root@nodel kafka_2.ll-2.0.0]# rm -rf /tmp/kafka-logs/topic-delete*
[root@node2 kafka_2.ll-2.0.0]# rm -rf /tmp/kafk /topic-delete*
[root@node3 kafka_2.11-2.0.0]# rm -rf /tmp/kafka-logs/topic-delete*
分區(qū)作用
分區(qū)主要包含了優(yōu)先副本的選舉、 分區(qū)重分配、 復制限流、修改副本因子等功能。
優(yōu)先副本選舉
分區(qū)使用多副本機制來提升可靠性, 但只有l(wèi)eader副本對外提供讀寫服務, 而follower副本只負責在內(nèi)部進行消息的同步。如果一個分區(qū)的leader副本不可用, 那么就意味著整個分區(qū)變得不可用, 此時就需要Kafka從剩余的follower副本中挑選 一個新的leader副本來繼續(xù)對外提供服務。
在創(chuàng)建主題的時候, 該主題的分區(qū)及副本會盡可能均勻地分布到Kafka集群的各個broker節(jié)點上,對應的leader副本的分配也比較均勻。比如我們使用kafka-topics.sh腳本創(chuàng)建一個分區(qū)數(shù)為3、 副本因子為3的主題topic-partitions, 創(chuàng)建之后的分布信息如下:
[root@nodel kafka_2.ll-2.0.0)# bin/kafka-topics.sh --zookeeper localhost:2181/
kafka --describe --topic topic-partitions
Topic:topic-partitions PartitionCount: 3 ReplicationFactor: 3 Con figs:
Topic: topic-par七itions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: topic-partitions Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
可以看到 leader副本均勻分布在 brokerld為0、1、2的broker節(jié)點之中。針對同一個分區(qū)而言,同一個 broker節(jié)點中不可能出現(xiàn)它的多個副本,即Kafka集群的一個 broker中最多只能有它的一個副本,我們 可以將leader副本所在的broker節(jié)點叫作分區(qū)的leader節(jié)點,而 follower副本所在的broker節(jié)點叫作分區(qū)的follower節(jié)點。
隨著時間的更替,Kafka集群的broker節(jié)點不可避免地會遇到宅機或崩潰的問題, 當分區(qū)的leader節(jié)點發(fā)生故障時, 其中 一個follower節(jié)點就會成為新的leader節(jié)點,這樣就會導致集群的負載不均衡,從而影響整體的健壯性和穩(wěn)定性。 當原來的leader節(jié)點恢復之后重新加入集群時,它只能成為 一個新的follower節(jié)點而不再對外提供服務。 后面章節(jié)后講到副本選舉原理
我們將brokerld為2的節(jié)點重啟, 那么主題 topic-partitions新的分布信息如下:
[root@nodel kafka_2.ll-2.0.0]# bin/kafka-topics.sh --zookeeper localhos:2181/
kafka --describe --topic topic-partitions
Topic: topic-partitions PartitionCouht: 3 ReplicationFactor: 3 Con figs:
Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1, 2, 0 Isr: 1, 0, 2
Topic: 七opic-partitions Partition: 1 Leader: 0 Replicas: 2, 0, 1 Isr: 0, 1, 2
Topic: 七opic-partitions Partition: 2 Leader: 0 Replicas: 0, 1, 2 Isr: 0, 1, 2
可以看到原本 分區(qū) l的leader節(jié)點為2, 現(xiàn)在變成了0, 如此一 來原本均衡的負載變成了失衡:節(jié)點0 的負載最高,而節(jié)點1的負載最低。
kafka是怎樣防止重新選舉后的的負載不均衡 ??
為了能夠有效地治理負載失衡的情況,Kafka引入了優(yōu)先副本(preferred replica)的概念。所謂的優(yōu)先副本 是指在 AR 集合列表中的第一個副本 。 比如上面 主題 topic-partitions中 分區(qū) 0的AR集合列表(Replicas)為[1,2, 0], 那么分區(qū)0 的優(yōu)先副本即為1。理想情況下, 優(yōu)先副本就是該分區(qū)的leader副本, 所以也可以稱之為 preferred leader。Kafka要確保所有主題的優(yōu)先副本在Kafka集群中均勻分布, 這樣就保證了所有分區(qū)的leader均衡 分布。 如果leader 分布過于集中,就會造成集群負載不均衡。
所謂的優(yōu)先副本的選舉 是指通過一定的方式促使優(yōu)先副本 選舉為 leader副本, 以此來促進集群的負載均衡, 這 一行為也可以稱為 “分區(qū)平衡 ” 。
注意 分區(qū)平衡,并不意味著Kafka集群的負載均衡, 因為還要考慮集群中的分區(qū)分配是否均衡。 更進一步, 每個分區(qū)的leader副本的負載也是各不相同的,有些leader副本的負載很高,比如需要承載TPS為30000 的負荷, 而有些leader副本只需承載個位數(shù)的負荷 。 也就是說,就算集群中的分區(qū) 分配均衡、leader 分配均衡,也并不能確保整個集群的負載就是均衡的, 還需要其他一 些硬性的指標來做進一步的衡量, 這個后面章節(jié)會講到。
在Kafka中可以提供分區(qū)自動平衡的功能, 與此對應的broker端參數(shù)是auto.leader.rebalance.enable, 此參數(shù)的默認值為true, 即默認情況下此功能是開啟的。 如果開啟分區(qū)自動平衡的功能, 則Kafka的控制器會啟動一個定時任務, 這個定時任務會輪詢所有的broker節(jié)點, 計算每個broker節(jié)點的分區(qū)不平衡率(broker中的不平衡率=非優(yōu)先副本的leader個數(shù)/分區(qū)總數(shù))是否超過leader.imbalance.per.broker.percentage參數(shù)配置的比值,默認值為10%, 如果超過設定的比值則會自動執(zhí)行優(yōu)先副本的選舉動作以求分區(qū)平衡。 執(zhí)行周期由參數(shù)leader.imbalance.check.interval.seconds控制, 默認值為300秒, 即
5分鐘。
不過在生產(chǎn)環(huán)境中不建議將autua.leader.rebalance.enable 設置為默認的true, 因為這 可能引起負面的性能問題, 也有可能引起客戶端 一 定時間的阻塞。 因為執(zhí)行的時間無法自主掌控, 如果在關鍵時期(比如電商大促波峰期)執(zhí)行關鍵任務的關卡上執(zhí)行優(yōu)先副本的自動選舉操作, 勢必會有業(yè)務阻塞、 頻繁超時之類的風險。
思考?? 分區(qū)數(shù)越多是否越好
leader 副本的轉(zhuǎn)移也是一項高成本的工作, 如果要執(zhí)行的分區(qū)數(shù)很多, 那么必然會對客戶端造成一定的影響。如果集群中包含大量的分區(qū), 那么上面的這種使用方式有可能會失效。在優(yōu)先副本的選舉過程中, 具體的元數(shù)據(jù)信息會被存入ZooKeeper 的 /admin/preferred_replica_election節(jié)點, 如果這些數(shù)據(jù)超過了ZooKeeper節(jié)點所允許的大小, 那么選舉就會失敗。 默認情況下ZooKeeper所允許的節(jié)點數(shù)據(jù)大小為1MB。
分區(qū)重分配
當集群中的一個節(jié)點突然宅機下線時,如果節(jié)點上的分區(qū)是單副本的, 那么這些分區(qū)就變得不可用了,在節(jié)點恢復前,相應的數(shù)據(jù)也就處于丟失狀態(tài);如果節(jié)點上的分區(qū)是多副本的,那么位于這個節(jié)點上的leader副本的角色會轉(zhuǎn)交到集群的其他 follower副本中。 總而言之, 這個節(jié)點上的分區(qū)副本都已經(jīng)處于功能失效的狀態(tài),Kafka并不會將這些失效的分區(qū)副本自動地遷移到集群中剩余的可用broker節(jié)點上,如果放任不管, 則不僅會影響整個集群的均衡負載,還會影響整體服務的可用性和可靠性。
當要對集群中的一個節(jié)點進行有計劃的下線操作時, 為了保證分區(qū)及副本的合理分配, 我們也希望通過某種方式能夠?qū)⒃摴?jié)點上的分區(qū)副本遷移到其他的可用節(jié)點上。當集群中新增broker節(jié)點時, 只有新創(chuàng)建的主題分區(qū)才有可能被分配到這個節(jié)點上, 而之前的主題分區(qū)并不會自動分配到新加入的節(jié)點中, 因為在它們被創(chuàng)建時還沒有這個新節(jié)點, 這樣新節(jié)點的負載和原先節(jié)點的負載之間嚴重不均衡。
kafka提供的重分區(qū)策略主要是在集群擴容和broker節(jié)點失效的情況下對分區(qū)進行遷移。
kakfa主要是通過腳本實現(xiàn)該方案,方案主要分為3步驟:
第一步:首先創(chuàng)建需要一個包含主題清單的JSON文件, 其次根據(jù)主題清單和broker節(jié)點清單生成一份重分配方案, 最后根據(jù)這份方案執(zhí)行具體的重分配動作。
[root@nodel kafka_2.ll-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/
kafka --describe --topic topic-demo
Topic:topic-reassign Parti七ionCount:4 ReplicationFactor: 2 Con figs:
Topic: topic-demo Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0, 2
Topic: topic-demo Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: topic-demo Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: topic-demo Partition: 3 Leader: 0 Replicas: 0,1 Isr: 0,1
我們可以觀察到主題topic-reassign在3個節(jié)點中都有相應的分區(qū)副本分布。由于某種原因,我們想要下線brokerld為1的broker節(jié)點, 在此之前, 我們要做的就是將其上的分區(qū)副本遷移出去。使用 kafka-reassign-partitions.sh腳本的第一步就是要創(chuàng)建一 個JSON文件如下:
{
"topics":[
{
"topic":"topic-demo"
}
],
"version":1
}
第二步就是根據(jù)這個JSON文件和指定所要分配的broker節(jié)點列表來生成 一 份候選的重分配方案, 具體內(nèi)容參考如下:
[root@nodel kafka 2. 11-2.0.0]# bin/kafka-reassign-partitions.sh --zookeeper
localhost:2181/kafka --generate --topics一 to-move-json-file reassign. json
--broker-list 0,2
Current partition replica assignment
{"version": 1, "partitions": [ {"topic": "topic-demo", "partition":2,"replicas
": [2, 1], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 1, "repli
cas": [ 1, 0], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 3, "re
plicas": [ 0, 1], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "parti七ion":0,
"replicas": [ 0, 2], "log_dirs": ["any", "any"]}]}
Proposed partition reassignment configuration
{"version":l,"parti七ions": [ {"七opic":"topic-demo","parti七ion":2,"replicas
": [2, 0], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 1, "repli
cas": [0, 2], "log_dirs": ["any", "any"]}, {"七opic":"topic-demo","partition":3,"re
plicas": [ 0, 2], "log_dirs": ["any", "any"] } , {"topic": "topic-demo", "partition": 0,
"replicas": [2, 0, "log_di工 s": ["any", "any"]}]}
第三步執(zhí)行具體的重分配動作, 詳細參考如下:
[root@nodel kafka_2 .11-2. 0. 0] # bin/kafka-reassign-parti tions. sh --zookeeper
localhost:2181/kafk a --execute --reassigrunent-json-file project.json
修改副本因子
修改副本因子的使用場景也很多, 比如在創(chuàng)建主題時填寫了錯誤的副本因子數(shù)而需要修改, 再比如運行一段時間之后想要通過增加副本因子數(shù)來提高容錯性和可靠性。修改副本因子也可以按照分區(qū)重分配的方式實現(xiàn)。
思考?? 分區(qū)數(shù)越多吞吐量就越高嗎
拋開硬件資源的影響, 消息寫入的吞吐量還會受到消息大小 、 消息壓縮方式、 消息發(fā)送方式(同步/異步)、 消息確認類型(acks)、 副本因子等參數(shù)的影響, 消息消費的吞吐量還會受到應用邏輯處理速度的影響的情況下。
實驗證明分區(qū)數(shù)為1時吞吐量最低, 隨著分區(qū)數(shù)的增長, 相應的吞吐量也跟著上漲。 一旦分區(qū)數(shù)超過了某個闕值之后, 整體的吞吐量是不升反降的。
分區(qū)可以保證有時序嗎
可以,一般我們在做業(yè)務的時候可以根據(jù)消息的key進行分區(qū),這樣相同的key就會被分到同一個分區(qū),然后消費者從同一個分區(qū)消費就可以保證消息的有序性。
該文章是我讀了深入理解kafka核心設計與實踐原理的感受和總結(jié)希望可以幫到大家