分布式消息處理機(jī)制-kafka

安裝kafka

  1. tar -zxvf
  2. 進(jìn)入到config目錄下修改server.properties
    broker.id
    listeners=PLAINTEXT://192.168.11.140:9092
    zookeeper.connect
  3. 啟動
    sh kafka-server-start.sh -daemon ../config/server.properties
    sh kafka-server-stop.sh

zookeeper上注冊的節(jié)點(diǎn)信息
cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_notification, consumers, latest_producer_id_block, config

controller – 控制節(jié)點(diǎn)
brokers – kafka集群的broker信息 。 topic
consumer ids/owners/offsets

基本操作

image.png
image.png
image.png

http://kafka.apache.org/documentation/#quickstart

kafka的實(shí)現(xiàn)細(xì)節(jié)

消息是kafka中最基本的數(shù)據(jù)單元,消息由一串字節(jié)構(gòu)成,其中主要由key和value構(gòu)成,key和value都是byte數(shù)組。key的主要作用是根據(jù)一定的策略,將消息路由到指定的分區(qū),這樣就可以保證包含同一key的消息全部寫入到同一個分區(qū)內(nèi),key可以是null。為了提高網(wǎng)絡(luò)的存儲和利用率,生產(chǎn)者會批量發(fā)送消息到kafka,并在發(fā)送之前消息進(jìn)行壓縮

topic&partitiion

  • Topic是用于存儲消息的邏輯概念,可以看做一個消息集合。每個topic可以有多個生產(chǎn)者向其推送消息,也可以有任意多個消費(fèi)者消費(fèi)其中的消息
  • 每個topic可以劃分多個分區(qū)(每個Topic至少有一個分區(qū)),同一topic下的不同分區(qū)包含的消息是不同的。每個消息在被添加到分區(qū)時,都會被分配一個offset(稱之為偏移量),他是消息在此分區(qū)中的唯一編號,kafka通過offset保證消息在分區(qū)內(nèi)的順序,offset的順序不跨分區(qū),即kafka只保證在同一分區(qū)內(nèi)的消息是有序的;
image.png
image.png
image.png

Partition是以文件的形式存儲在文件系統(tǒng)中,存儲在kafka-log目錄下,命名規(guī)則是:<topic_name>-<partition_id>

kafka的高吞吐量的因素

  1. 順序?qū)懙姆绞酱鎯?shù)據(jù);
  2. 批量發(fā)送;在異步發(fā)送模式中,kafka允許進(jìn)行批量發(fā)送,也就是先講消息緩存到內(nèi)存中,然后一次請求批量發(fā)送出去。這樣減少了磁盤頻繁IO以及網(wǎng)絡(luò)IO造成的性能瓶頸。
    batch.size 每批次發(fā)送的數(shù)據(jù)大小
    linger.ms 間隔時間
  3. 零拷貝
    消息從發(fā)送到落地保存,broker維護(hù)的消息日志本身就是文件目錄,每個文件都是二進(jìn)制保存,生產(chǎn)者和消費(fèi)者使用相同的格式來處理。在消費(fèi)者獲取消息時,服務(wù)器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動的通過socket發(fā)送給消費(fèi)者,雖然這個操作描述起來很簡單,但實(shí)際上經(jīng)歷了很多步驟
  • 操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存
  • 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
  • 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
  • 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出


    image.png

通過“零拷貝”技術(shù)可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作,同時也會減少上下文切換次數(shù)


image.png

日志策略

日志保留策略

  • 無論消費(fèi)者是否已經(jīng)消費(fèi)了消息,kafka都會一直保存這些消息,但并不會像數(shù)據(jù)庫那樣長期保存,為了避免磁盤被占滿,kafka會配置相應(yīng)的保留策略(retention policy),以實(shí)現(xiàn)周期性地刪除陳舊的消息

kafka有兩種“保留策略”:

  1. 根據(jù)消息保留的時間,當(dāng)消息在kafka中保存的時間超過了指定時間,就可以被刪除;
  2. 根據(jù)topic存儲的數(shù)據(jù)大小,當(dāng)topic所占的日志文件大小大于一個閾值,則可以開始刪除最舊的消息

日志壓縮策略

在很多場景中,消息的key與value的值之間的對應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫中的數(shù)據(jù)會不斷被修改一樣,消費(fèi)者只關(guān)心key對應(yīng)的最新的value。我們可以開啟日志壓縮功能,kafka定期將相同key的消息進(jìn)行合并,只保留最新的value值


image.png

消息可靠性機(jī)制

消息可靠性
生產(chǎn)者發(fā)送消息到broker,有三種確認(rèn)方式(request.required.acks)

  • acks = 0:producer不會等待broker(leader)發(fā)送ack。因?yàn)榘l(fā)送消息網(wǎng)絡(luò)超時或broker crash(1.Partition的Leader還沒有commit消息 2. Leader與Follower數(shù)據(jù)不同步),既有可能丟失也可能會重發(fā)。
  • acks = 1:當(dāng)leader接收到消息之后發(fā)送ack,丟會重發(fā),丟的概率很小。
  • acks = -1:當(dāng)所有的follower都同步消息成功后發(fā)送ack,丟失消息可能性比較低。

消息存儲可靠性

  • 每一條消息被發(fā)送到 broker 中,會根據(jù) partition 規(guī)則選擇被存儲到哪一個partition。如果 partition 規(guī)則設(shè)置的合理,所有消息可以均勻分布到不同的 partition 里,這樣就實(shí)現(xiàn)了水平擴(kuò)展。
  • 在創(chuàng)建topic時可以指定這個topic對應(yīng)的 partition 的數(shù)據(jù)。在發(fā)送一條消息時,可以指定這條消息的key, producer 根據(jù)這個 key 和 partition機(jī)制來判斷這個消息發(fā)送到哪個 partition。
  • kafka的高可靠性的保障來自于另一個叫副本(replication)策略,通過設(shè)置副本的相關(guān)參數(shù),可以使kafka在性能和可靠性之間做不同的切換。

高可靠性的副本

sh kafka-topics.sh --create --zookeeper 192.168.11.140:2181 
--replication-factor 2  --partitions 3 --topic sixsix 
  • --replication-factor 表示的副本數(shù)

副本機(jī)制
ISR(副本同步隊(duì)列)
維護(hù)的是有資格的 follower 節(jié)點(diǎn)

  1. 副本的所有節(jié)點(diǎn)都必須要和 zookeeper 保持連接狀態(tài)
  2. 副本的最后一條消息的 offset 和 leader 副本的最后一條消息的 offset 之間的差值不能超過指定的閾值,這個閾值是可以設(shè)置的(replica.lag.max.messages)

HW&LEO
關(guān)于 follower 副本同步的過程中,還有兩個關(guān)鍵的概念,HW(HighWatermark)和 LEO(Log End Offset) 這兩個參數(shù)跟 ISR 集合緊密關(guān)聯(lián)。

  • HW標(biāo)記了一個特殊的 offset,當(dāng)消費(fèi)者處理消息的時候,只能拉取到 HW 之前的消息,HW之后的消息對消費(fèi)者來說是不可見的。也就是說,取 partition 對應(yīng) ISR中最小的 LEO 作為 HW,consumer 最多只能消費(fèi)到 HW所在的位置。每個 replica 都有 HW,leader 和 follower 各自維護(hù)更新自己的 HW 的狀態(tài)。對于leader新寫入的消息,consumer不能立刻消費(fèi),leader 會等待該消息被所有ISR中的 replicas 同步更新 HW ,此時消息才能被 consumer 消費(fèi)。 這樣就保證了如果leader副本損壞,該消息仍然可以從新選舉的leader 中獲取
  • LEO是所有副本都會有的一個offset 標(biāo)記,它指向追加到當(dāng)前副本的最后一個消息的 offset,當(dāng)生產(chǎn)者向 leader 副本追加消息的時候, leader 副本的LEO 標(biāo)記就會遞增; 當(dāng)follower 副本成功從leader副本拉取消息并更新到本地的時候,follower的副本的LEO就會增加

查看kafka數(shù)據(jù)文件內(nèi)容

在使用kafka的過程中有時候需要我們查看產(chǎn)生的消息的信息,這些都被記錄在kafka的log文件中。由于log 文件的特殊格式,需要通過kafka提供的工具來查看

./bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
--files /tmp/kafka-logs*/000**.log  --print-data-log{查看消息內(nèi)容}

文件存儲機(jī)制

存儲機(jī)制
在kafka文件存儲中,同一個topic下有多個不同的partition,每個partition為一個目錄,partition的名稱規(guī)則為:topic名稱 + 有序序號,第一個序號從0開始,最大的序號為partition數(shù)量減1,partition是實(shí)際上物理上的概念,而topic是邏輯上的概念
partition還可以細(xì)分為segment,這個segment是什么呢?假設(shè)kafka以partition為最小存儲單位,那么我們可以想象當(dāng)kafka的producer不斷發(fā)送消息,必然會引起partition文件的無限擴(kuò)張,這樣對于消息文件的維護(hù)以及被消費(fèi)的消息的清理帶來非常大的挑戰(zhàn),所以kafka以segment為單位又把partition進(jìn)行細(xì)分。每個partition相當(dāng)于一個巨型文件被平均分配到多個大小相等的segment數(shù)據(jù)文件中(每個segment文件中的消息不一定相等)。這種特性方便已經(jīng)被消費(fèi)的清理,提高磁盤的利用率
segment file組成:由2大部分組成,分別為index file 和data file,此2個文件一一對應(yīng),成對出現(xiàn),后綴“.index"和“.log”分別表示segment索引文件、數(shù)據(jù)文件。
segment文件命名規(guī)則:partition全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset。數(shù)值最大為64位long大小,19位數(shù)字字符長度,沒有數(shù)字用0填充

image.png
image.png

查找方式

以上圖為例,讀取offset=170218的消息,首先查找segment文件,其中00000000000000000000.index為最開始的文件,第二個文件為00000000000000170410.index(起始偏移為170410+1=170411),而第三個文件為00000000000000239430.index(起始偏移為239430+1=239431),所以這個offset=170418就落到了第二個文件之中。其他后續(xù)文件可以依次類推,以其實(shí)偏移量命名并排列這些文件,然后根據(jù)二分查找法就可以快速定位到具體文件位置,其次根據(jù)00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置進(jìn)行讀取。

消息確認(rèn)的幾種方式

自動提交

image.png

手動提交
手動異步提交
consumer. commitASync() //手動異步ack
手動同步提交
consumer. commitSync() //手動異步ack

指定消費(fèi)某個分區(qū)的消息

image.png

消息的消費(fèi)原理

之前kafka存在的一個非常大的性能隱患就是利用ZK來記錄各個Consumer Group的消費(fèi)進(jìn)度(offset)。當(dāng)然JVM Client 幫我們自動做了這些事情,但是Consumer需要和ZK頻繁交互,則利用ZK Client API對ZK頻繁寫入是一個低效的操作,并且從水平擴(kuò)展性上來講也存在問題,所以ZK抖一抖,集群吞吐量就跟著一起抖,嚴(yán)重的時候簡直抖的停不下來。
新版的kafka已推薦將consumer的文藝信息保存在Kfaka內(nèi)部的topic,即_consumer_offsets_topic.通過以下操作來看看_consumer_offsets_topic是怎么存儲消費(fèi)進(jìn)度的,_consumer_offset_topic 默認(rèn)有50個分區(qū)

  1. 計(jì)算 consumer group對應(yīng)的hash值


    image.png

2.獲得 consumer group 的位移信息

bin/kafka-simple-consumer-shell.sh 
--topic __consumer_offsets --partition 15 
-broker-list 192.168.11.140:9092,192.168.11.141:9092,192.168.11.138:9092   
--formatter kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter

kafka的分區(qū)分配策略

在kafka中每個topic一般會有很多個partitions。為了提高消息的消費(fèi)速度,我們可能會啟動多個consumer去消費(fèi);同時,kafka存在consumer group的概念,也就是group.id一樣的consumer,這些consumer屬于一個consumer group,組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題的所有分區(qū),當(dāng)然每一個分區(qū)只能由同一個消費(fèi)組內(nèi)的consumer來消費(fèi),那么同一個consumer group里面的consumer是怎么去分配該消費(fèi)那個分區(qū)里的數(shù)據(jù),這個就涉及到了kafka內(nèi)部分區(qū)分配策略(Partition Assignment Strategy)
在 Kafka內(nèi)部存在兩種默認(rèn)的分區(qū)分配策略:Range(默認(rèn))和 RoundRobin。通過:partition.assignment.strategy指定

consumer rebalance

當(dāng)以下事件發(fā)生時,Kafka將會進(jìn)行一次分區(qū)分配:

  1. 同一個consumer group內(nèi)新增了消費(fèi)者
  2. 消費(fèi)者離開當(dāng)前所屬的consumer group,包括shuts down 或 crashes
  3. 訂閱的主題新增分區(qū)(分區(qū)數(shù)量發(fā)生變化)
  4. 消費(fèi)者主動取消對某個topic的訂閱
  5. 也就是說,把分區(qū)的所有權(quán)從一個消費(fèi)者移動到另外一個消費(fèi)者上,這個是kafka consumer 的 rebalance機(jī)制,如何 rebalance就涉及到前面說的分區(qū)分配策略

兩種分區(qū)策略
Range 策略(默認(rèn))
0 ,1 ,2 ,3 ,4,5,6,7,8,9
c0 [0,3] c1 [4,6] c2 [7,9]
10(partition num/3(consumer num) =3
roundrobin 策略
0 ,1 ,2 ,3 ,4,5,6,7,8,9
c0,c1,c2
c0 [0,3,6,9]
c1 [1,4,7]
c2 [2,5,8]
kafka 的key 為null, 是隨機(jī){一個Metadata的同步周期內(nèi),默認(rèn)是10分鐘}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容