kafka 是一個(gè)分布式消息中間件,支持多分區(qū),多副本,多訂閱者的,基于zookeeper協(xié)調(diào)的分布式消息系統(tǒng)
特點(diǎn)
1 . 高吞吐量,低延遲: 每秒處理幾十萬數(shù)據(jù),延遲最低只有幾毫秒
2 . 可擴(kuò)展性:kafka集群支持熱擴(kuò)展
3 . 持久性,可靠性: 消息被持久化道本地磁盤,并且支持?jǐn)?shù)據(jù)備份
4 .容錯(cuò)性: 允許集群中節(jié)點(diǎn)失?。ㄈ舾北緮?shù)位n,允許n-1個(gè)節(jié)點(diǎn)失?。?br>
5 . 高并發(fā): 支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫
主要應(yīng)用于大數(shù)據(jù)實(shí)時(shí)處理領(lǐng)域
1 架構(gòu)

producer: 消息生產(chǎn)者,向kafka集群發(fā)送消息的客戶端
consumer: 消息消費(fèi)者,向kafka集群取消息的客戶端
consumer group:消費(fèi)者組,由多個(gè)消費(fèi)者組成。 消費(fèi)者組內(nèi)的每個(gè)消費(fèi)者消費(fèi)不同partiton的數(shù)據(jù),一個(gè)partition只能由一個(gè)組內(nèi)的消費(fèi)者消費(fèi),消費(fèi)者組之間互不影響 消費(fèi)者組在邏輯上就是i一個(gè)訂閱者
broker: 一臺(tái)kafka服務(wù)器就是一個(gè)broker。 一個(gè)集群由多個(gè)broker組成, 一個(gè)broker可以容納多個(gè)topic
topic: Kafka將消息分門別類,每一類的消息稱之為一個(gè)主題(Topic)
partition: 一個(gè)topic可以分不到多個(gè)broker上, 此時(shí)每個(gè)broker 上的topic 就是對(duì)應(yīng)的一個(gè)partition(分區(qū)), 每個(gè)partition是一個(gè)有序的隊(duì)列
replica: 副本, 為了防止集群中的某個(gè)broker故障,導(dǎo)致對(duì)應(yīng)的partition的數(shù)據(jù)丟失 而提供的副本機(jī)制,一個(gè)topic 每一個(gè)partition可以有多個(gè)副本,即一個(gè)leader 對(duì)應(yīng)若干個(gè)follower
leader : 每個(gè)partition 的副本對(duì)應(yīng)的主副本,生產(chǎn)者和消費(fèi)者通信的對(duì)象都是leader
follower: 每個(gè)partition 的副本對(duì)應(yīng)的從副本, 實(shí)時(shí)同步leader數(shù)據(jù),當(dāng)leader發(fā)生故障時(shí),某個(gè)follower就會(huì)成為新的leader
offset: 偏移量, 每條消息都有自己的偏移量,是消息數(shù)據(jù)在對(duì)應(yīng)partition中的唯一標(biāo)識(shí), 也是該消息的索引號(hào)。每個(gè)consumer都會(huì)保存自己消費(fèi)到的offset+1,consumer 的消費(fèi)的offset 保存在broker集群中專屬的topic中(_consumer_offsets); (0.10.x 版本之前保存在zookeeper中), 在kafka 中提交的offset 都是指的下一條待消費(fèi)的數(shù)據(jù), 即已消費(fèi)的offset+1
-
message : 消息, 簡(jiǎn)單來說kafka 中的每個(gè)message 由一對(duì)key-value 組成, 消息結(jié)構(gòu)如下:
message
2. 生產(chǎn)者(producer)
關(guān)鍵參數(shù):
| 參數(shù)名 | 描述 |
|---|---|
| bootstrap.servers | 生產(chǎn)者連接集群所需的broker地址清單,可以是一個(gè)或者多個(gè),用逗號(hào)隔開 |
| key.serializer和value.serializer | 指定key和value的序列化類型 |
| buffer.memory | RecordAccumlator 緩沖區(qū)的總大小,默認(rèn)32M |
| batch.size | 緩沖區(qū)一批次數(shù)據(jù)的最大值,默認(rèn)16k,適當(dāng)增加該值,可以提高吞吐量 |
| linger.ms | 如果數(shù)據(jù)未達(dá)到batch.size, 在設(shè)置的linger.ms 設(shè)置的等待時(shí)間到來后,就會(huì)發(fā)送數(shù)據(jù),默認(rèn)是0ms, 表示沒有延遲,一般設(shè)置為5~10ms |
| ack | broker 接收生產(chǎn)數(shù)據(jù)后的應(yīng)答機(jī)制。 0: 生產(chǎn)者只管發(fā)送,不等待broker的應(yīng)答;1: 生產(chǎn)者發(fā)送完數(shù)據(jù),只等待leader 節(jié)點(diǎn)的應(yīng)答;-1(all): 生產(chǎn)者發(fā)送完數(shù)據(jù),等待leader節(jié)點(diǎn)和ISR隊(duì)列中的所有節(jié)點(diǎn)同步完后應(yīng)答 |
| max.in.flight.requests.per.connection | 指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息(發(fā)送請(qǐng)求的緩存,一個(gè)batch 一個(gè)request)。它的值越高,就會(huì)占用越多的內(nèi)存,不過也會(huì)提升吞吐量。把它設(shè)為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。默認(rèn)是5 |
| retries | 消息發(fā)送失敗后的重發(fā)次數(shù),默認(rèn)是int的最大值 2147483647 |
| retry.backoff.ms | 兩次重試之間的時(shí)間間隔,默認(rèn)是100ms |
| enable.idempotence | 是否開啟冪等性,默認(rèn)是true |
| compression.type | 生產(chǎn)者發(fā)送數(shù)據(jù)的壓縮方式,默認(rèn)是none,不壓縮,支持的壓縮方式:gzip,snappy,lz4,zstd |
2.1 發(fā)送流程

在消息發(fā)送過程中,涉及到了兩個(gè)線程--main線程和sender線程,在main線程中流程如下:
1 . 將消息數(shù)據(jù)發(fā)送給Interceptors 預(yù)處理( 可選),然后通過Serializer 進(jìn)行序列化處理
2 . 序列化之后的數(shù)據(jù)通過分區(qū)選擇器, 將消息發(fā)送給對(duì)應(yīng)的雙端隊(duì)列(RecordAccumulator, 默認(rèn)是32M)
sender 流程如下:
1 . 從隊(duì)列(RecordAccumulator)中拉取消息, 有兩種拉取策略
- .batch.size:只有隊(duì)列中積累的數(shù)據(jù)量達(dá)到batch.size大小后,sender才會(huì)發(fā)送數(shù)據(jù),默認(rèn)是16K
- . linger.ms:如果數(shù)據(jù)沒達(dá)到數(shù)據(jù)量batch.size, sender會(huì)根據(jù)linger.ms設(shè)置的時(shí)間發(fā)送數(shù)據(jù),默認(rèn)是0ms, 即無延遲發(fā)送
2 . 當(dāng)拉取到數(shù)據(jù)后,就會(huì)將數(shù)據(jù)通過selector 發(fā)送給對(duì)應(yīng)分區(qū)的leder 副本的broker
3 . broker接收到數(shù)據(jù)后,通過發(fā)送ack應(yīng)答,表明數(shù)據(jù)發(fā)送成功。 ack 應(yīng)答機(jī)制有如下的三種:
- .0: 生產(chǎn)者只管發(fā)送,不等待broker的應(yīng)答
- .1: 生產(chǎn)者發(fā)送完數(shù)據(jù),只等待leader 節(jié)點(diǎn)的應(yīng)答
- .-1(all): 生產(chǎn)者發(fā)送完數(shù)據(jù),等待leader節(jié)點(diǎn)和ISR隊(duì)列中的所有節(jié)點(diǎn)同步完后應(yīng)答
4 . 如果生產(chǎn)者沒收到ack應(yīng)答,就將重試發(fā)送
2.2 partition 分區(qū)
2.2.1 分區(qū)的作用
- .便于合理使用存儲(chǔ)資源,每個(gè)partition在一個(gè)broker上存儲(chǔ),切割海量數(shù)據(jù)到不同的的broker上, 合理的控制分區(qū)任務(wù),可以實(shí)現(xiàn)負(fù)載均衡
- . 提高并行度,生產(chǎn)者可以以分區(qū)為單位發(fā)送數(shù)據(jù),消費(fèi)者可以以分區(qū)為單位進(jìn)行消費(fèi)
2.2.2 分區(qū)策略
- 默認(rèn)的分區(qū)策略:
1 . 如果生產(chǎn)者指定了發(fā)送的分區(qū),則按照指定的分區(qū)發(fā)送
2 .如果沒指定分區(qū),則按照消息中的key的hash值對(duì)設(shè)置的分區(qū)數(shù)取模
3 . 如果分區(qū)和key 都沒指定,則選擇粘性分區(qū)(sticky partition,隨機(jī)選擇分區(qū)并不變),直到對(duì)應(yīng)的recordAccumulator的batch.size滿了或者linger.ms 時(shí)間到了,再隨機(jī)選擇其他分區(qū)
- 自定義分區(qū)器:
2.3 數(shù)據(jù)可靠保證
2.3.1 Ack應(yīng)答機(jī)制
Ack 的應(yīng)答機(jī)制確保數(shù)據(jù)不會(huì)丟失,Ack 應(yīng)答機(jī)制有如下的三種, 可靠程度依次提高
0 : 生產(chǎn)者只管發(fā)送,不等待broker的應(yīng)答(不推薦)
缺點(diǎn): 無法保證broker 是否收到了數(shù)據(jù)
1 : 生產(chǎn)者發(fā)送完數(shù)據(jù),只等待leader 節(jié)點(diǎn)的應(yīng)答
缺點(diǎn): 有可能leader 節(jié)點(diǎn)ack 應(yīng)答后,還沒同步就掛了,導(dǎo)致后續(xù)新選舉的leader 節(jié)點(diǎn)丟失了該條消息
-1(all): 生產(chǎn)者發(fā)送完數(shù)據(jù),等待leader節(jié)點(diǎn)和ISR隊(duì)列中的所有follower節(jié)點(diǎn)同步完后應(yīng)答
ISR隊(duì)列(in-sync replica set): 和leader 保持同步的follower和leader節(jié)點(diǎn)的集合(leader:1,isr:1,2,3)
例如: 主題為test的topic 創(chuàng)建了一個(gè)分區(qū),總共有兩個(gè)副本,其中l(wèi)eader 副本對(duì)應(yīng)的broker id=2, follower副本對(duì)應(yīng)的broker id=1,此時(shí)isr隊(duì)列中的集合就是(2,1)
kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --topic test --describe
Topic: test TopicId: MPY45pufQ_m-bW7h_cSULQ PartitionCount: 1 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
如果follewer 節(jié)點(diǎn)長(zhǎng)時(shí)間沒有發(fā)送通信請(qǐng)求或者同步數(shù)據(jù),則將被踢出ISR 隊(duì)列, 該時(shí)間閾值由參數(shù)replica.lag.time.max.ms 設(shè)定,默認(rèn)是30s
總結(jié) 三種應(yīng)答機(jī)制的可靠性依次提高,但是在特殊情況下,-1的應(yīng)答機(jī)制也沒辦法完全保證數(shù)據(jù)不丟失。例如: 分區(qū)副本只有一個(gè),或者isr 中的節(jié)點(diǎn)數(shù)只有一個(gè), 此時(shí)相當(dāng)于ack=1, 仍然有丟失數(shù)據(jù)的風(fēng)險(xiǎn)
確保數(shù)據(jù)不丟失的條件: ack=-1 + 分區(qū)副本>1 + isr隊(duì)列中節(jié)點(diǎn)數(shù)>1
2.3.2 數(shù)據(jù)冪等性
procuder 不論向broker 發(fā)送多少次重復(fù)的數(shù)據(jù), Broker 端都只會(huì)持久化一條數(shù)據(jù),保證數(shù)據(jù)不重復(fù)(去重)
去重標(biāo)準(zhǔn):具有<PID,Partition,SeqNumber> 相同主鍵的消息提交時(shí),Broker只會(huì)持持久化一條,其中PID 是每次producer分配的一個(gè)新的id,Patition 分區(qū)號(hào),Sequence Number 單調(diào)自增

冪等性只能保證在單分區(qū)單會(huì)話內(nèi)的不重復(fù)
冪等性的開啟通過參數(shù):enable.idempoyence= true 設(shè)置
2.3.3 生產(chǎn)者事務(wù)
開啟事務(wù)必須要開啟冪等性

######## 待補(bǔ)充################
2.3.4 數(shù)據(jù)有序
在kafka 1.x版本之前 ,為了保證數(shù)據(jù)的單分區(qū)有序,條件如下:max.in.flight.requests.per.connection=1(不需要考慮是否開啟冪等性)
kafka 1.x及以后版本為了保證單分區(qū)有序,條件如下:
- 未開啟冪等性
max.in.flight.requests.per.connection=1 - 開啟冪等性
max.in.flight.requests.per.connection<5 即可
原因說明, 在kafka1.x以后,kafka broker 端會(huì)緩存producer 發(fā)送的最近5個(gè)request 的元數(shù)據(jù), 同時(shí)在冪等性的前提下, 通過Sequence Number對(duì)請(qǐng)求進(jìn)行排序,此時(shí)就保證了最近5個(gè)request的數(shù)據(jù)有序
2.3.3 數(shù)據(jù)傳遞語義:
- 至少一次(At Least Once):
保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)
ack=-1 + 分區(qū)副本>1 + isr隊(duì)列中節(jié)點(diǎn)數(shù)>1
- 最多一次 (At Most Once):
保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失
冪等性
- 精確一次 (Exactly Once)
確保數(shù)據(jù)不丟失,也不重復(fù)
冪等性 + ack=-1 + 分區(qū)副本>1 + isr隊(duì)列中節(jié)點(diǎn)數(shù)>1
4 Broker
重要參數(shù):
| 參數(shù)名 | 描述 |
|---|---|
| replica.lag.time.max.ms | ISR中,follower 由于長(zhǎng)時(shí)間未與leader 通信而導(dǎo)致被踢出isr的時(shí)間閾值,默認(rèn)30s |
| auto.leader.rebalance.enable | 自動(dòng)leader partiiton平衡,默認(rèn)是trrue,建議關(guān)閉 |
| leader.imbalance.per.broker.percentage | 默認(rèn)是10%,每個(gè)broker 允許的不平衡的leader的比例,超過這個(gè)值,會(huì)觸發(fā)leader自動(dòng)平衡 |
| leader.imbalance.check.interval.seconds | 默認(rèn)300s,檢查leader負(fù)載平衡的時(shí)間間隔 |
| log.segment.bytes | kafka中切割為每一塊數(shù)據(jù)文件的大小,默認(rèn)1g |
| log.index.interval,bytes | 默認(rèn)4k,每當(dāng)寫入4kb大小的數(shù)據(jù)后,就往index文件記錄索引(稀疏索引的數(shù)據(jù)大小閾值) |
| log.retention.hours | kafka 數(shù)據(jù)保存時(shí)間。默認(rèn)7天 |
| log.retention.minutes | kafka 數(shù)據(jù)保存時(shí)間,分鐘級(jí)別,默認(rèn)關(guān)閉 |
| log.retention.ms | kafka 數(shù)據(jù)保存時(shí)間,毫秒級(jí)別,默認(rèn)關(guān)閉 |
| log.retention.check.interval.ms | 檢查數(shù)據(jù)是否超過保存時(shí)間的間隔,默認(rèn)5min |
| log.rentention.bytes | 默認(rèn)-1,表示無窮大,超時(shí)設(shè)置的所有日志的總大小后,刪除最早的segment |
| log.cleanup.policy | 數(shù)據(jù)文件刪除策略,默認(rèn)是delete,如果是compact,表示啟用壓縮 |
| num.io.threads | 默認(rèn)8,負(fù)責(zé)寫磁盤的線程數(shù),參數(shù)值占總核數(shù)的50% |
| num.replica.fetchers | 默認(rèn)1, 副本拉取線程數(shù),參數(shù)值占總核數(shù)的1/3 |
| num.network.threads | 默認(rèn)3,數(shù)據(jù)傳輸線程數(shù),參數(shù)值占總核數(shù)的2/3 |
| log.flush.interval.messages | 強(qiáng)制頁緩存刷寫到磁盤的條數(shù),默認(rèn)是log的最大值,一般不建議修改,系統(tǒng)自己管理 |
| log.flush.interva.ms | 沒隔多久刷寫數(shù)據(jù)到磁盤,默認(rèn)是null,不建議修改 |
4.1 工作流程
4.1.1 zookeeper 存儲(chǔ)的kafka信息
啟動(dòng)zookeeper 終端客戶端:
[root@iZuf6g3hri8hvnuqng6id7Z apache-zookeeper-3.5.7-bin]# ./bin/zkCli.sh
通過ls 命令 查看kafka 相關(guān)信息
[zk: localhost:2181(CONNECTED) 0] ls /kafka
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification]

4.2分區(qū)副本
4.2.1 副本基本信息
- 副本作用: 提高數(shù)據(jù)的可靠性
- 默認(rèn)一個(gè)副本,生產(chǎn)環(huán)境一般配置為2 個(gè),保證數(shù)據(jù)可靠性, 副本太多會(huì)增加磁盤存儲(chǔ)空間,增加網(wǎng)絡(luò)數(shù)據(jù)傳輸,降低效率
- kafka 中副本分為: leader和follower, 生產(chǎn)者將數(shù)據(jù)發(fā)送給leader, follower 和leader 進(jìn)行同步數(shù)據(jù)
- kafka 分區(qū)所有的副本統(tǒng)稱為AR(Assigned Replicas)
AR= ISR + OSR
ISR: 表示和leader 保持同步的Follower集合,如果follower長(zhǎng)時(shí)間沒有和leader 通信或者同步數(shù)據(jù),則該follower將被踢出ISR, 該時(shí)間戳由: replica.lag.time.max.ms參數(shù)設(shè)定,默認(rèn)30s。 leader發(fā)生故障后,會(huì)從ISR中選舉新的leader
OSR: 表示和follower 與leader同步時(shí),延時(shí)過多的副本集合
4.2.2 副本leader選舉流程
kafka 集群中每個(gè)broker都有對(duì)應(yīng)的controller, 其中有一個(gè)controller 會(huì)被選舉為controller leader( 通過查看zk的/controller 節(jié)點(diǎn),可以知道選舉出來的leader 節(jié)點(diǎn)), 負(fù)責(zé)管理集群broker的上下線和 所有topic的分區(qū)副本leader選舉和分配
controller的信息同步依賴于zookeeper
- broker 啟動(dòng)后就會(huì)在zookeeper中注冊(cè)
- controller leader 的選舉機(jī)制是,先注冊(cè)先成為,同時(shí)由選舉出來的leader 通過zookeeper 去監(jiān)聽所有broker節(jié)點(diǎn)變化
- controller leader同時(shí)負(fù)責(zé)topic的分區(qū)副本leader的選擇,選舉策略如下:
在ISR中的存活為前提,按照AR中排在前的為優(yōu)先,例如:AR[1,0,2], ISR[1,0,2],那么leader 就會(huì)按照[1,0,2]的順序輪訓(xùn)
- controller leader 將選舉的信息上傳到zk 中
-
其他broker 的controller 去zk 中同步呢信息
broker運(yùn)行流程
測(cè)試:
kafka 集群集群中有三個(gè)broker ,id分別為1,2,3, 此時(shí)創(chuàng)建主題名為test1, 分區(qū)數(shù)3,副本數(shù)3的topic,查看此時(shí)topic的詳細(xì)信息如下:
test1分區(qū)0,1,2的leader 節(jié)點(diǎn)id為2,3,1
AR 分別為[2,3,4],[3,1,2],[1,2,3], ISR分別為[2,3,1],[3,1,2],[1,2,3]
[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --create --topic test1 --partitions 3 --replication-factor 3
Created topic test1.
# 查看topic詳細(xì)信息
[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1
Topic: test1 TopicId: pfPkdWsKTyCCsoektoTayQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: test1 Partition: 1 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test1 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
下線節(jié)點(diǎn)broker 3,此時(shí)分區(qū)1 的leader 節(jié)點(diǎn)掛了,需要重新選舉leader,安裝選舉規(guī)則,應(yīng)該選舉 brokerid為1 的節(jié)點(diǎn)作為leader
[root@iZuf6g3hri8hvnuqng6id7Z bin]# ./kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --describe --topic test1
Topic: test1 TopicId: pfPkdWsKTyCCsoektoTayQ PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,3,1 Isr: 2,1
Topic: test1 Partition: 1 Leader: 1 Replicas: 3,1,2 Isr: 1,2
Topic: test1 Partition: 2 Leader: 1 Replicas: 1,2,3 Isr: 1,2
4.2.3 broker 節(jié)點(diǎn)故障處理
概念理解:
LEO(Log End Offset):每個(gè)腹部的最后一個(gè)offset,其實(shí)就是最新的offset +1
HW (High WaterMark): 所有副本中最小的LEO
1 . topic 分區(qū)的follower 對(duì)應(yīng)的一個(gè)節(jié)點(diǎn)掛了

2 . topic 分區(qū)的leader 節(jié)點(diǎn)掛了

4.3 文件存儲(chǔ)
4.3.1 文件存儲(chǔ)機(jī)制
每個(gè)topic 對(duì)應(yīng)的每個(gè)分區(qū)(partition)都對(duì)應(yīng)一個(gè)log 文件,該log文件中的存儲(chǔ)的就是kafka的生產(chǎn)的數(shù)據(jù),producer生產(chǎn)的數(shù)據(jù)會(huì)不斷的追加到文件末尾, 為了防止log 文件過大,導(dǎo)致數(shù)據(jù)查詢效率低下,kafka采用分片和索引機(jī)制。
每個(gè)partition分為多個(gè)segment,每個(gè)segment 文件包括:".index"文件,".log"文件, ".timeindex"等文件, 這些文件位于一個(gè)文件夾下,文件夾命名規(guī)則:topic名+分區(qū)序號(hào)

例如 : 查看test-0 分區(qū)的數(shù)據(jù)文件內(nèi)容
[root@iZuf6g3hri8hvnuqng6id7Z test-0]# ls
00000000000000000000.index 00000000000000000000.timeindex leader-epoch-checkpoint
00000000000000000000.log 00000000000000000001.snapshot partition.metadata


4.3.2 文件刪除機(jī)制
kafka 中的默認(rèn)數(shù)據(jù)保存時(shí)間為7天, 可以通過如下參數(shù)修改:
- log.retention.hours : 最低優(yōu)先級(jí),默認(rèn)7天
- log.retention.minutes: 優(yōu)先級(jí)次之 ,分鐘
- log.retention.ms: 最高優(yōu)先級(jí),毫秒
- log.retention.check.interval.ms : 負(fù)責(zé)設(shè)置檢查周期,默認(rèn)5min
kafka 中的數(shù)據(jù)文件清理策略有 delete和compact
- delete: 將過期數(shù)據(jù)刪除
設(shè)置參數(shù): log.cleanup.policy=delete
基于時(shí)間:默認(rèn)打開,以segment 中所有記錄中的最大時(shí)間戳作為該文件的時(shí)間戳, 即當(dāng)一個(gè)segment中的部分?jǐn)?shù)據(jù)超期了, 此時(shí)會(huì)等待該segment所有數(shù)據(jù)超期后,再刪除
基于數(shù)據(jù)大?。耗J(rèn)關(guān)閉,超過設(shè)置的所有日志的總大小,刪除最早的segment。log.retention.byte=-1 (表示無窮大, 即關(guān)閉基于數(shù)據(jù)大小) - compact: 日志壓縮
對(duì)于相同key的不同value值, 只保留最后一個(gè)版本
設(shè)置參數(shù):log.cleanup.policy=comapct
數(shù)據(jù)壓縮
4.4 高效讀寫數(shù)據(jù)
1 . kafka本身是分布式的集群,采用分區(qū)策略,并行度高
2 . 數(shù)據(jù)存儲(chǔ)采用稀疏索引,可以快速定位要消費(fèi)的數(shù)據(jù)
3 . 順序?qū)懘疟P: kafka 的數(shù)據(jù)寫入過程是文件末尾追加的方式
- 頁緩存 + 零拷貝的方式

5 消費(fèi)者(consumer)
kafka 中的消費(fèi)者采用pull(拉)模式
重要參數(shù)
| 參數(shù)名 | 描述 |
|---|---|
| bootstrap.servers | 生產(chǎn)者連接集群所需的broker地址清單,可以是一個(gè)或者多個(gè),用逗號(hào)隔開 |
| key.deserializer和value.deserializer | 指定key和value的反序列化類型 |
| gruop.id | 消費(fèi)者所屬的消費(fèi)者組id |
| enable.auto.commit | 默認(rèn)為true,自動(dòng)周期性的提交offset |
| auto.commit.interval.ms | 自動(dòng)提交offset的時(shí)間間隔 |
| auto.offset.reset | 初始化時(shí)偏移量的設(shè)置策略,默認(rèn)latest。1. earliest:自動(dòng)重置偏移量到最早的偏移量;2. latest:自動(dòng)重置偏移量到最新的; 3. none:如果原來的消費(fèi)者組偏移量捕存在,則向消費(fèi)者拋出異常 |
| offsets.topic.num.partitions | _consumer_offsets的分區(qū)數(shù),默認(rèn)50 ,不建議修改 |
| heartbeat.interval.ms | kafka 消費(fèi)者和coordinator之間的心跳時(shí)間,默認(rèn)3s |
| session.timeout.ms | kafka和coodinator之間的連接超時(shí)時(shí)間,默認(rèn)45s,超過該時(shí)間,該消費(fèi)者被移除消費(fèi)者組,消費(fèi)者組會(huì)進(jìn)行再平衡 |
| max.poll.interval.ms | 消費(fèi)者處理消息的最大時(shí)長(zhǎng),默認(rèn)5min,超過該時(shí)間,該消費(fèi)者被移除消費(fèi)者組,消費(fèi)者組會(huì)進(jìn)行再平衡 |
| fetch.mxax.bytes | 默認(rèn)50M,消費(fèi)者向broker一次拉取的最大字節(jié)數(shù),如果服務(wù)器端一批次的數(shù)據(jù)大于該值,仍可以拉取數(shù)據(jù),一批次的大小受到message.max.bytes(broker config)和 max.message.bytes(topic config)影響 |
| max.poll.records | 一次拉取數(shù)據(jù)的最大條數(shù),默認(rèn)500 |
| partition.assignment.strategy | 消費(fèi)者分區(qū)分配策略,包括了 range,roundRobin,sticky,cooperativeSticky |
5.1 總體消費(fèi)流程
1 . 消費(fèi)者按照partition 的offest 按順序依次讀取里面的數(shù)據(jù)
2 . 一個(gè)消費(fèi)者可以消費(fèi)多個(gè)主題的多個(gè)分區(qū)的數(shù)據(jù)
3 . 在消費(fèi)者組中, 每個(gè)分區(qū)只能由該消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi),防止重復(fù)消費(fèi)
4 . 每個(gè)消費(fèi)者的消費(fèi)offset 由消費(fèi)者提交到系統(tǒng)的主題中保存

5.2 消費(fèi)者組
Consumer Group :由多個(gè)consumer 組成, 組內(nèi)成員擁有共同的groupId
- 消費(fèi)者組內(nèi)的每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),一個(gè)分區(qū)只能由一個(gè)組內(nèi)的消費(fèi)者消費(fèi)
- 消費(fèi)者組之間互不影響
- 如果消費(fèi)者組中的消費(fèi)者數(shù)量多于主題分區(qū)的數(shù)量,則會(huì)有一部分的消費(fèi)者處于閑置狀態(tài),不消費(fèi)任何數(shù)據(jù)
5.2.1 消費(fèi)者組初始化流程
概念:
coordinator: broker 中的組件,負(fù)責(zé)輔助實(shí)現(xiàn)消費(fèi)者組的初始化和分區(qū)的分配
coordinator節(jié)點(diǎn)選擇= groupid的hashcode %50
注:50 指的是系統(tǒng)主題_consumer_offsets的分區(qū)數(shù)量
例如: groupId 的hashcode=1,1%50=1, 此時(shí)間_consumer_offsets主題的1號(hào)分區(qū)所在的broker 節(jié)點(diǎn)的coordinator就是這個(gè)消費(fèi)者組的老大,此時(shí)初始化流程如下:
- 消費(fèi)者的每個(gè)消費(fèi)者發(fā)送joinGroup請(qǐng)求給coordinator
- coordinator選出一個(gè)consumer 作為leader
- coordinator把要消費(fèi)的topic情況發(fā)送給leader消費(fèi)者
- leader 消費(fèi)者指定消費(fèi)方案并發(fā)送給coordinator
- coordinator再分發(fā)消費(fèi)方案給各個(gè)消費(fèi)者
- 每個(gè)消費(fèi)者和coordinator 保持心跳(默認(rèn)3s), 一旦超時(shí)(session.timeout.ms=45s),該消費(fèi)者就會(huì)從消費(fèi)者組中移除,并觸發(fā)再平衡;或者消費(fèi)者的處理消息時(shí)間過長(zhǎng)(max.poll.interval.ms=5min),也會(huì)觸發(fā)再平衡

kafka 有專門的分區(qū)策略來支持在消費(fèi)者組中對(duì)消費(fèi)分區(qū)的分配
主要策略有 Range,RoundRobin,Sticky,CooperativeSticky(3.0 版本新增), 通過配置參數(shù): partition.assignment.strategy來設(shè)置,默認(rèn)是Range 策略
-
Range 策略
Range 策略是針對(duì)于每個(gè)topic 而言的
1 首先對(duì)同一個(gè)topic 中的分區(qū)按照序號(hào)排序,并對(duì)消費(fèi)者組中的消費(fèi)者按照client.id字典排序
2 通過 partition 數(shù)/ consumer 數(shù) 得到商n 和余數(shù)m,則每個(gè)消費(fèi)者至少分到n個(gè)分區(qū),然后前m 個(gè)消費(fèi)者多分一個(gè)分區(qū)例如: 現(xiàn)在有7個(gè)分區(qū)(0,1,2,3,4,5,6), 消費(fèi)者組中有三個(gè)消費(fèi)者(c0,c1,c2), 7/3=2---1, 那么c0就會(huì)多消費(fèi)一個(gè)分。此時(shí)c0 消費(fèi)0,1,2 分區(qū);c1消費(fèi)3,4分區(qū) ;c2 消費(fèi)5,6分區(qū)。
如果8個(gè)分區(qū)的話,8/3=2----2, c0,c1就會(huì)多消費(fèi)一個(gè)
range 分區(qū)
注意: 如果只是針對(duì)一個(gè)topic,c0多消費(fèi)一個(gè)分區(qū)影響不大,但是如果有N個(gè)topic, 那么c0就將多消費(fèi)每一個(gè)topic的分區(qū),容易產(chǎn)生數(shù)據(jù)傾斜
2 . RoundRobin策略

- 將所有訂閱的topic和partition 組成topicAndPartition 列表,并按hashcode 進(jìn)行排序,最后以輪訓(xùn)的方式分配給消費(fèi)者
3 . Sticky策略
可以理解為分區(qū)的分配結(jié)果帶有粘性,即在執(zhí)行一次新的分配之前,會(huì)考慮上一次的分配結(jié)果,盡量減少調(diào)整改動(dòng),節(jié)省開銷
Sticky策略是在kafka 0.11.x版本之后引入的,首先會(huì)盡量均勻的分配分區(qū),類似圖range策略,在消費(fèi)者組中的某一消費(fèi)者出現(xiàn)問題時(shí),會(huì)盡量其他消費(fèi)者的原有分區(qū)不變
例如 0,1,2,3,4,5,6 分區(qū),被一消費(fèi)者組的三個(gè)消費(fèi)者c0,c1,c2 消費(fèi), 初始化分配時(shí)按照均勻分配的原則,將所有分區(qū)隨機(jī)均勻分配給消費(fèi)者, 假如c0 分配到0,2,3;c1 分配到1,4 ;c2 分配到5,6;此時(shí)c0 掛了,觸發(fā)再分配策略,此時(shí)將c0分分區(qū)分配給c1,c2, 結(jié)果;c1 分配到的分區(qū)為1,2,4;c2 分配到0,3,5,6
5.2.2 消費(fèi)者組詳細(xì)消費(fèi)流程

5.3 offset 偏移量
5.3.1 默認(rèn)維護(hù)位置
從kafka0.9版本之后,consumer默認(rèn)將消費(fèi)的offset 存儲(chǔ)在kafka 內(nèi)置的主題_consumer_offsets中
_consumer_offsets里采用了key和value 的形式存儲(chǔ)數(shù)據(jù),key=group.id+topic+partition.id, value 就是對(duì)應(yīng)的offset,每隔一段時(shí)間,kafka 都會(huì)對(duì)這個(gè)內(nèi)置的topic 進(jìn)行compact 壓縮處理,是的每個(gè)key的value值都保留最新的數(shù)據(jù)

5.3.2 自動(dòng)提交offset

5.3.3 手動(dòng)提交offset
手動(dòng)提交offset 首先需要配置參數(shù) enable.auto.commit=false

5.3.4 指定offset 消費(fèi)
根據(jù)配置參數(shù)可以設(shè)定offset 的初始偏移量
auto.offset.reset= earliest | latest | none(默認(rèn)是latest)
- earlist: 自動(dòng)將offset 設(shè)置為最早的偏移量, --from-beginning
- latest: 自動(dòng)將偏移量重置為最新的偏移量
- none: 如果未找到消費(fèi)者組的先前的偏移量,則向消費(fèi)者拋出異常
5.3.5 漏消費(fèi)和重復(fù)消費(fèi)


6 壓力測(cè)試
kafka 自帶壓力測(cè)試腳本
- 生產(chǎn)者壓測(cè)腳本: kafka-producer-perf-test-sh
- 消費(fèi)者壓測(cè)腳本: kafka-consumer-pref-test-sh
6.1 生產(chǎn)者壓測(cè)
例:
- 創(chuàng)建名為test的主題,設(shè)置分區(qū)3個(gè),副本3個(gè)
kafka-topic.sh --bootstrap-server 127.0.0.1:9092 --create --replication-factor 3 --partitions 3 --topic test
- 開始測(cè)試:
kafka-producer-pref-test.sh --topic test --record-size 1024 --num-records 1000000 --throughput 100000 --producer-props bootstrap-server=127.0.0.1:9092 batch.size=16384 linger.ms=0
參數(shù)說明:
- record-size : 一條消息的信息量大小,單位字節(jié)
- num-records: 總共發(fā)送的消息的數(shù)量
- throughput:每秒發(fā)送的消息數(shù)量, 設(shè)置為-1 表示不進(jìn)行限制
- producer-props 生產(chǎn)者參數(shù)的相關(guān)信息
6.2 消費(fèi)者壓測(cè)
例:
kafka- consumer-pref-test.sh --bootstrap-server 127.0.0.1:9092 --topic test --messages 1000000 --consumer.config config/consumber.propertier
參數(shù)說明
- messages : 總共消費(fèi)的數(shù)據(jù)數(shù)目
- consumer.config: 消費(fèi)者的配置文件,可以修改配置文件中的參數(shù)來做壓力測(cè)試,查看不同參數(shù)對(duì)壓測(cè)的影響



