kafka筆記

kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Activity

Stream)和運營數(shù)據(jù)處理管道(Pipeline)的基礎(chǔ),具有高水平擴展和高吞吐量。

應(yīng)用領(lǐng)域:已被多家不同類型的公司作為多種類型的數(shù)據(jù)管道和消息系統(tǒng)使用。如:

淘寶,支付寶,百度,twitter等

目前越來越多的開源分布式處理系統(tǒng)如Apache

flume、Apache

Storm、Spark,elasticsearch都支持與Kafka集成。

ActiveMQ

RabbitMQ

Kafka

所屬社區(qū)/公司

Apache

Mozilla

Public LicenseApache/LinkedIn

開發(fā)語言

Java

Erlang

Java

支持的協(xié)議

OpenWire、STOMP、

REST、XMPP、AMQPAMQP

仿AMQP

事物

支持

不支持

不支持

集群

支持

支持

支持

負載均衡

支持

支持

支持

動態(tài)擴容

不支持

不支持

支持(zk)

ActiveMQ還是支持JMS的一種消息中間件

Kafka的動態(tài)擴容目前是通過zookeeper來完成的

阿里巴巴的metaq,rocketmq都有kafka的影子

AMQP協(xié)議

消費者(Consumer):從消息隊列中請求消息的客戶端應(yīng)用程序;

生產(chǎn)者(Producer):向broker發(fā)布消息的客戶端應(yīng)用程序;

AMQP服務(wù)器端(broker):用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列;

Kafka客戶端支持當前大部分主流語言,包括:

C、C++、Erlang、Java、.net、perl、PHP、Python、Ruby、Go、Javascript。

可以使用以上任何一種語言和kafka服務(wù)器進行通信(即編寫自己的consumer和producer程序)

kafka的架構(gòu)

主題(Topic):一個主題類似新聞中的體育、娛樂、教育等分類概念,在實際工程中通常一個業(yè)務(wù)一個主題;

分區(qū)(Partition):一個topic中的消息數(shù)據(jù)按照多個分區(qū)組織,分區(qū)是kafka消息隊列組織的最小單位,一個分區(qū)可以看做是一個FIFO的隊列;

備份(Replication):為了保證分布式可靠性,kafka0.8開始對每個分區(qū)的數(shù)據(jù)進行備份(不同Broker上),防止其中一個Broker宕機造成分區(qū)數(shù)據(jù)不可用

zookeeper:一個提供分布式狀態(tài)管理、分布式配置管理、分布式鎖服務(wù)等的集群

Kafka編程實例

消費者編程模型

分區(qū)消費模型

一個分區(qū) 一個消費者實例

分區(qū)消費偽代碼描述

Main()

分區(qū)模型消費的分區(qū)數(shù)是多少 即? 獲取分區(qū)的size

針對每個分區(qū)創(chuàng)建一個線程或進程 – 消費者實例

For

index = 0 to size

Create

thread(or process) consumer(index)

第index個線程(進程) –每個消費者實例的工作

Consumer(index)

創(chuàng)建到kafka

broker的連接,KafkaClient(host,port)

指定消費參數(shù)構(gòu)建consumer,SimpleConsumer(topic,partitions)

設(shè)置消費offset:consumer.seek(offset,0)

While(true)

{

消費指定topic第index個分區(qū)的數(shù)據(jù)

處理

}

記錄當前消息offset(可選)

提交當前offset(可選)(kafka集群默認做兩個操作Java客戶端)

組消費模型

組消費可以復(fù)制消費,kafka集群的一條消息會同時發(fā)送給各個組消費者,每組都會拿到一個全量的數(shù)據(jù)。

組消費偽代碼:

Main()

設(shè)置需要創(chuàng)建的流數(shù)N

---每個consumer組里有多少個consumer實例

For

index = 0 to N

Create

thread comsumer(index)

第index個線程(進程) –每個消費者實例的工作

Consumer(index)

創(chuàng)建到kafka

broker的連接,KafkaClient(host,port)

指定消費參數(shù)構(gòu)建consumer,SimpleConsumer(topic,partitions)

設(shè)置從頭消費還是從最新的消費(smallest或largest)offset

While(true)

{

消費指定topic第index個流的數(shù)據(jù)

處理

}

(offset自動提交給zookeeper)

Consumer分配算法:

假設(shè)一個組消費者包含兩個消費者實例,某一topic擁有4個分區(qū),則如何進行consumer分配?

針對kafka集群下的某一topic的所有分區(qū)進行排序,part0.part1,part2,part3

針對客戶端實例進行排序consumer1

consumer2

用partition數(shù)量除以consumer數(shù)量N

將I

* N to? (i+1) * N傳遞給消費者consumer(i)

將當前分配關(guān)系注冊到集群

For

each topic T that Ci subscribes to

let

PT be all partitions producing topic T

let

CG be all consumers in the same group as Ci that consume topic T

sort

PT (so partitions on the same broker are clustered together)

sort

CG

let

i be the index position of Ci in CG and let N = size(PT)/size(CG)

assign

partitions from i*N to (i+1)*N - 1 to consumer Ci

remove

current entries owned by Ci from the partition owner registry

add

newly assigned partitions to the partition owner registry

(we

may need to re-try this until the original partition owner releases

its

ownership)

消費模型對比

分區(qū)消費模型更加靈活,但需要自己處理各種異常情況

需要自己管理offset以實現(xiàn)消息傳遞的其他語義—至少一次(kafka)發(fā)送至少一次,不會丟;

至多一次,只會發(fā)送一次,可能丟消息,可能會重復(fù)

準確一次(保存偏移量):

組消費者模型簡單但不靈活:

不需要自己處理異常情況,不需要自己管理offset

只能實現(xiàn)kafka默認的最少一次消息傳遞語義

Maven-assembly-plugin

Jar-with-dependencies

Kafka參數(shù)調(diào)優(yōu)

FetchSize從服務(wù)器獲取單包大小 –每次從服務(wù)器端獲取的tcp大小

bufferSize

kafka客戶端緩沖區(qū)大小 –一次最多獲取多少數(shù)據(jù)再返回給用戶 一個bufersize由多個fetchsize組成 異國傳輸帶寬不一,盡可能設(shè)置大客戶端緩沖區(qū)

group.id分組消費時分組名--分組消費時指定不同的id可以實現(xiàn)復(fù)制消費的目的

生產(chǎn)者編程模型

同步生產(chǎn)模型

Kafka客戶端發(fā)送一條消息后處于阻塞狀態(tài),等待kafka服務(wù)器的確認消息,如果沒有等到確認消息(等了一段時間),則生產(chǎn)線程sleep一段時間后,繼續(xù)重新發(fā)送,直到到達最大發(fā)送次數(shù),直接結(jié)束發(fā)送。

Kafka至少一次 否則程序會中止

異步生產(chǎn)模型

Kafka客戶端發(fā)送一條消息,直接將該消息發(fā)送至客戶端的緩沖隊列中,直到達到客戶端的緩沖隊列消息數(shù)目達到預(yù)先配置的數(shù)目或者kafka消息隊列的消息累計時間到了預(yù)先配置的時間,kafka生產(chǎn)者客戶端的 消息隊列打包一次性發(fā)送給kafka服務(wù)器,kafka客戶端消息隊列也是存在kafka集群,并不是kafka

Server.? Kafka集群代碼包括kafka

Server和kafka

Client,維持一個隊列。

偽代碼描述

Main()

創(chuàng)建到kafka

broker的連接,KafkaClient(host,port)

選擇或者自定義生產(chǎn)者負載均衡算法partitioner

設(shè)置生產(chǎn)者參數(shù)

根據(jù)負責均衡算法(默認hash輪詢? 隨機) 和 設(shè)置得生產(chǎn)者參數(shù)(緩沖隊列長度累計時間等等)構(gòu)造producer對象

While

true

getMessage從上游獲得一條消息

按照kafka要求的消息格式構(gòu)造(kafka自定義的)kafka消息

根據(jù)分區(qū)算法得到分區(qū)

發(fā)送消息

處理異常

兩種生產(chǎn)模型對比

同步:

低消息丟失率

高消息重復(fù)率(等待時間較長,由于網(wǎng)絡(luò)原因,確認長時間未收到,導(dǎo)致多次重發(fā))

高延遲

異步生產(chǎn)模型:(每秒一個分區(qū)50萬記錄)

低延遲

高發(fā)送性能

高消息丟失率(無確認機制,發(fā)送端隊列滿時,無法傳送消息。 隊列發(fā)送kafka服務(wù)器期間,可能造成整個消息隊列的丟失。)

生產(chǎn)者編程

req_acks:發(fā)送失敗次數(shù)

ack_timeout:未接到確認,認為發(fā)送失敗的時間

async:是否異步發(fā)送

batch_send_every_n:異步發(fā)送時,累計最大消息數(shù)

batch_send_every_t:異步發(fā)送時,累計最大時間

同步需要設(shè)置ack參數(shù)--默認為同步傳輸? 不指定發(fā)送類型的情況下

在發(fā)送消息時需要制定topic

key(可能根據(jù)key進行partition)以及value

即便自己的分區(qū)算法未使用到仍然需要制定key不能將其設(shè)置為null或者“”“”

Proprs.put(“serializer.class”,”kafka.serializer.StringEncoder”)

//默認encoder

–字節(jié)序列化

Proprs.put(“partitioner.class”,”kafka.proceducer.partition.SimplePartitioner”)

//自己實現(xiàn)的分區(qū)算法? 默認的是針對key進行hash

//

kafka.proceducer.partition.SimplePartitioner為自己實現(xiàn)的實現(xiàn)了Partitioner的接口

實現(xiàn)了partition方法,其參數(shù)為key和分區(qū)數(shù)(當前topic的分區(qū)數(shù),系統(tǒng)調(diào)用時,自動傳參)key來自于EventKey(傳遞的Message中的key)

Proprs.put(“request.required.acks”,”1”)

//

0絕不等確認,1:leader的一個副本收到這一消息,并發(fā)回確認-1:leader的所有副本收到這一消息,并發(fā)回確認

KeyedMessage

data = new KeyedMessage(eventTopic, eventKey,

eventvalue)

//

eventKey必須有(即便自己的分區(qū)算法用不到這個key,也不能為null或者””),否則自己的分區(qū)算法根本得不到調(diào)用,

異步不需要Proprs.put(“request.required.acks”,”1”)設(shè)置了也沒有用? 需要制定類型為異步

Props.put(“producer.type”,”async”)

// 1:async? ? 2:sync

Java客戶端參數(shù)調(diào)優(yōu)

Message.send.max.retries發(fā)送失敗重試次數(shù)? 同步

Retry.backoff.ms:未接到確認,認為發(fā)送失敗的時間? 同步

Producer.type:同步發(fā)送或者異步發(fā)送

Batch.num.messages:異步發(fā)送時,累計最大消息數(shù)

Queue.buffering.max.ms:異步發(fā)送時,累計最大時間

Kafka消息組織原理

磁盤重認識

根據(jù)數(shù)據(jù)的局部性原理kafka

預(yù)讀或者提前讀

讀取當前字節(jié) 讀取下一塊數(shù)據(jù)

合并寫

多個邏輯上的寫操作 合并成一個大的物理寫操作

順序讀寫 不需要尋道時間

很少旋轉(zhuǎn)時間

一般而言 順序?qū)懕?隨機寫? 相差很多(這是kafka設(shè)計的關(guān)鍵考慮)速度相差萬倍 線性每秒300M隨機寫每秒50k

Kafka特性:gree

Kafka消息的寫入原理

一般的將數(shù)據(jù)從文件傳到套接字的路徑;

Os將數(shù)據(jù)從磁盤讀取到內(nèi)核空間中的頁緩存

應(yīng)用將數(shù)據(jù)從內(nèi)核空間讀到用戶空間的緩存中

應(yīng)用將數(shù)據(jù)寫回到內(nèi)存空間的套接字緩存中

Os將數(shù)據(jù)從套接字緩存寫到網(wǎng)卡緩存中,以便數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出

以上效率很低,經(jīng)歷了四次拷貝(磁盤空間到內(nèi)核空間,內(nèi)核空間到用戶空間,用戶空間到套接字空間,套接字空間到網(wǎng)卡空間;)和兩次系統(tǒng)調(diào)用(磁盤到內(nèi)核空間;套接字到網(wǎng)卡空間)。

使用sendFile(linux系統(tǒng)調(diào)用)FileChannel.transferTo

api,兩次拷貝可以被避免:允許os將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡(luò)上(保留1/4)。優(yōu)化后,只需要最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中。

數(shù)據(jù)寫入和讀出的Byte

Zero Copy

生產(chǎn):

網(wǎng)絡(luò)------------------------

pagecache(生產(chǎn)一次消費多次)---------------------------磁盤

消費:

磁盤-------------------------網(wǎng)絡(luò)

以上兩種都沒有使用用戶空間

Kafka設(shè)計目標是生產(chǎn)一次消費多次? 生產(chǎn)的時候?qū)懭雙agecache消費者可以從pagecache中直接讀取? 不需要再磁盤讀?。樞蜃x雖然快但還是比不上主存的讀?。?/p>

Pagecache滿了或者存儲的時間到了,寫入進磁盤可以增加整體開發(fā)的性能

以上設(shè)計 就是數(shù)據(jù)寫入和讀出的零字節(jié)拷貝Byte

Zero? Copy(不用拷貝到用戶空間,所以稱為零字節(jié)拷貝)

基于的原理:

順序?qū)懕入S機寫快很多

數(shù)據(jù)的零字節(jié)拷貝

不用拷貝到用戶空間中

Isr:1,0

處于同步中的broker

不在同步中broker:

假如當前有兩個broker

broker0與broker1假設(shè)broker0掛了則broker0為不在同步中

假設(shè)slave的消費進度比leader的消費進度 快很多? 超過了某個閾值 這個時候也是不在同步中slave中的數(shù)據(jù)不是最新的數(shù)據(jù) 假如leader掛了 數(shù)據(jù)不是最新的? 不會選擇該slave作為新的leader會默認選擇正在同步中的broker為leader即數(shù)據(jù)達到了最新 數(shù)據(jù)一致性可以保證

Kafka消息文件存儲

每個分區(qū)都是以index

log文件進行存儲的

Kafka消息的刪除原理

從最久的日志段

開始刪除? (按日志段為單位進行刪除),然后逐步向前推進,知道某個日志段不滿足條件為止,刪除條件:

滿足給定條件:(kafka/config/server.properties) 一般默認保留7天

Predicate配置項:log.retention.{ms,minutes,hours}和log.retention.bytes指定

駐留時間 每條消息存放在kafka里的最長時間

消息的

最大字節(jié)數(shù)(某個分區(qū)最大存儲空間)? 某一分區(qū)非常大,

不能是當前激活日志段(盡管日期過了7天,但是僅有的當前日志段為當前激活日志段,不會被刪除。)kafka保證topic當前有一個日志段

大小不能小于日志段的大小(log.segment.bytes配置,這種情況只存在于當前激活日志段,因為是一個日志段寫滿后 才會繼續(xù)寫入后續(xù)的日志段)

要刪除的是否是所有日志段,如果是,直接調(diào)用roll方法進行切分,kafka至少保留一個日志段

Scheduler:

刪除log.retention.check.interval.ms指定間隔

刷盤log.flush.scheduler.interval.ms

記錄checkpoint

log.flush.offset.checkpoint.interval.ms

壓縮(如果有)一直運行(log.cleaner.enable指定是否開啟)

Kafka消息檢索原理– 消費時快速定位

Kafka消息的segment

file的組成和物理結(jié)構(gòu)

--(.log

file)

分區(qū)文件=

log文件+

index文件

Segment

file? =? Seq[Message]

Message

= 8 byte Offset (當前partition的第幾條消息)+

4 byte message size(Meaasge大小)

+ 4 byte CRC32(CRC32校驗)+

1 byte “magic” (本次發(fā)布kafka服務(wù)程序協(xié)議版本號)

+ 1 byte “attributes”(獨立版本,或標識壓縮類型或編碼類型)+

4byte key length(key的長度,當key為-1時,K

byte key字段不填)+

K byte key(Key可選)+

value bytes length(實際消息數(shù)據(jù))+

payload (實際的消息)

Partition

file存儲方式

Segment

file結(jié)構(gòu)

為加快kafka檢索、消費以及生產(chǎn)速度,以log文件保留消息,并通過index文件保存,首先根據(jù)索引的offset(是當前分區(qū)的第ofset個消息),根據(jù)分段log文件的消息條數(shù)范圍,進行二分查找,確定當前offset消息所在的分段log。然后根據(jù)index的索引(二分索引)(index的key,value中的key指的是當前分段的第key個消息,value為當前分段的第key個消息對應(yīng)于當前分段的字節(jié)偏移量),進行二分查找,如果正好匹配,則直接讀取對應(yīng)字節(jié)偏移量的消息,否則,在相近的二分查找所找到的索引出的字節(jié)偏移量后續(xù)進行順序讀知道找到知道消息個數(shù)偏移量的消息。

查找第368776條消息(二分查找根據(jù)消息偏移量確定分段文件、二分查找根據(jù)分段索引確定相對于當前分段的字節(jié)偏移量)和第368774條消息(二分查找根據(jù)消息偏移量確定分段文件、二分查找+順序讀取? 根據(jù)分段索引確定相對于當前分段的字節(jié)偏移量)

以讀取offset=368776的message為例,需要通過下面2個步驟查找:

第一步查找segment

file;

以上圖為例,其中00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0.第二個文件00000000000000368769.index的消息量起始偏移量為368770

= 368769 +

1。只要根據(jù)offset二分查找文件列表,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index|log

第二步通過segment

file查找message;

算出368776-368770=6,取00000000000000368769.index文件第三項(6,1407),得出從00000000000000368769.log文件頭偏移1407字節(jié)讀取一條消息即可

Kafka消息的index

file的組成和物理結(jié)構(gòu)(.index

file)

Kafka集群維護

增加一個topic,針對某個topic進行擴展分區(qū),

實時獲取kafka集群信息

Topic工具

列出集群當前所有可用的topic

Kafka-topic.sh

–list –zookeeper zookeeper_address

查看集群特定的topic

Kafka-topic.sh

–describe –zookeeper zookeeper_address –topic topic_name

Ctrl

+ R –

創(chuàng)建topic

Kafka_topic.sh

–create –zookeeper zookeeper_address –replication-factor 1

–partitions 1 –topic topic_name

Kafka-topics.sh

–zookeeper zookeeper_address –alter –topic topic_name

–partitions 4修改partitions數(shù)量

Kafka集群leader平衡機制

Leader下線 上線時不平衡

每個partition的所有replicas叫做“assigned

replicas”,“assigned

replicas”中第一個replicas叫做“preferred

replica”,剛創(chuàng)建的topic一般“preferred

replica”是leader。

集群leader手動平衡:

Kafka-preferred-replica-election.sh

–zookeeper zookeeper_address

自動平衡設(shè)置:

Auto.leader.balance.enable=true

Kafka集群分區(qū)日志遷移

主要有兩種情況:

某個topic全部遷移一個機器移動到另外一個機器

某個topic的某個分區(qū)移動

遷移topic數(shù)據(jù)到其他broker,以下四步:

寫json文件文件格式:cat

topics-to-move.json

{“topics”:[{“topic”:”foo1”},{“topic”,”foo2”}],”version”:1}記錄遷移的topic列表

使用–generate生成遷移計劃kafka-reassign-partitions.sh

–zookeeper zookeeper_address –topics-to-move-json-file

topics-to-move.json –broker-list “5,6” –generate僅僅是生成計劃 沒有執(zhí)行數(shù)據(jù)遷移(會生成一個新的分區(qū)json文件可以寫入expand-cluster-reassignment.json命名隨意)

使用-execute執(zhí)行計劃kafka-reassign-partitions.sh

–zookeeper zookeeper_address –reassignment-json-file

expand-cluster-reassignment.json –execute執(zhí)行前最好保存當前分配情況,以防出錯回滾

使用-verify驗證是否已經(jīng)遷移完成kafka-reassign-partitions.sh

–zookeeper zookeeper_address –reassignment-json-file

expand-cluster-reassignment.json

–verify遷移某個topic的某些特定partition數(shù)據(jù)到其他broker,步驟同上,但json文件如下:cat

custom-reassignment.json

{“partitions”:[{“topic”:”foo1”,”partition”:0,”replicas”:[5,6]},{

“topic”:”foo2”,”partition”:1,”replicas”:[2,3]}]}

?可以指定到topic分區(qū)編號?

集群分區(qū)日志遷移和leader平衡機制在實際的kafka集群運維中非常重要

kafka-reassign-partitions.sh工具會復(fù)制磁盤上的日志文件,只有當完全復(fù)制完成,才會刪除遷移前磁盤的日志文件。執(zhí)行分區(qū)日志遷移需要注意:

kafka-reassign-partitions.sh粒度只能到broker(機器),不能到broker的目錄(如果broker上面配置了多個目錄,是按照磁盤上已駐留的分區(qū)數(shù)來均勻分配的),如果topic之間的數(shù)據(jù)或topic中partition之間的數(shù)據(jù)本身就不均勻,很有可能造成磁盤數(shù)據(jù)的不均勻。

對于分區(qū)數(shù)據(jù)較多的分區(qū)遷移數(shù)據(jù)會花大量時間(經(jīng)過網(wǎng)絡(luò)傳輸),topic數(shù)據(jù)較少或磁盤有效數(shù)據(jù)較少時進行數(shù)據(jù)遷移

進行分區(qū)遷移時,最好先保留一個分區(qū)在原來的磁盤上,這樣不會影響正常的消費和生產(chǎn),如果目的是將分區(qū)5(broker1,5)遷移到broker2,3.可以先將5遷移到2和1【1作為軸,沒有變,確切是將5遷移到2】;再將2和1遷移到2和3【首先使用leader平衡工具,將2和1中的作為分區(qū)的leader,將數(shù)據(jù)從1遷移到broker3】.而不是一次將1和5遷移到2和3.

Kafka集群監(jiān)控

Kafka

Offset Monitor –監(jiān)控當前的kafka集群有哪些機器存活哪些topic隊列生產(chǎn)者消費者積壓了多少數(shù)據(jù)? 監(jiān)控一個集群

Kafka

Manager? Yahoo開源的監(jiān)控工具? 監(jiān)控多個集群

(另有JMX)

Kafka

Offset Monitor:

當前存活的broker集合

當前活動topic集合

消費者組列表

當前consumer按組消費的offset

lag數(shù)(當前topic當前分區(qū)目前有多少消息積壓而沒有及時消費)

部署kafka

offset Monitor github的一個KafkaOffsetMonitor的jar包

Java

–cp (class package )KafkaOffsetMonitor-assembly-0.2.0.jar

com.quantifind.kafka.offsetapp.OffsetGetterWeb –zk zk-01,zk-02

–refresh 5.minutes –retain 1.day &? (刷新幾次 駐留時間)引用gogle的api必須翻墻? 一次即可

Kafka

Manager由雅虎開源:(安裝要求能上網(wǎng)還要求能翻墻)

管理幾個不同的集群

檢查集群狀態(tài)(topics,brokers,副本的分布,分區(qū)的分布)

選擇副本進行查看

基于集群的當前狀態(tài)產(chǎn)生分區(qū)分配

重新分配分區(qū)

安裝sbt

scala-sbt配置環(huán)境變量

安裝kafka-manager下載 編譯

部署 修改配置文件

下載打包好的kafka-manager解壓后修改配置

Kafka服務(wù)器模型和leader選舉機制

Hadoop

netty? redis? mina服務(wù)器 常使用的reactor模型leader會使用zookeeper選舉kafka不是使用zookeeper選舉因為會存在一系列的問題

Kafka核心源碼分析

設(shè)計和技巧

Kafka消費者源碼

分區(qū)消費模式源碼:

分區(qū)消費模式

服務(wù)器端源碼:

通過findLeader創(chuàng)建PartitionMetadata從kafka服務(wù)器端獲取消息,需要分區(qū)、leader的元數(shù)據(jù),通過findLeader服務(wù)器端提供的函數(shù),接收一個host、port,獲取對應(yīng)的分區(qū)。其分區(qū)、備份數(shù)據(jù),構(gòu)造一個PartitionMetadata。

通過PartitionMetadata創(chuàng)建一個SimpleConsumer,分區(qū)消費模式使用的是SimpleConsumer

SimpleConsumer創(chuàng)建完成之后,可以進行消費,構(gòu)建FetchRequest對象--包括所要獲取的topic、partition、offset等組成的數(shù)據(jù)結(jié)構(gòu),發(fā)送給服務(wù)器,服務(wù)器根據(jù)所要了解的這些key,獲取返回的值,發(fā)送這些結(jié)果給client。

發(fā)送FetchRequest,如果沒成功,可能是我們請求的那個broker掛了等原因,回到第一步創(chuàng)建MetaData,如果發(fā)送成功,服務(wù)器端回復(fù)一個響應(yīng)

接收服務(wù)器端發(fā)送的FetchResponse響應(yīng),(message、offset)

迭代的處理消息(message,offset)

再去構(gòu)造一個FetchRequest再進行消息的獲取

服務(wù)器端發(fā)送的消息是以消息組或消息塊的形式發(fā)送給我們,發(fā)送一個的消息后,會在塊內(nèi)進行迭代,迭代完成后,再構(gòu)建FetchRequest。一次發(fā)送一個塊,能夠使用盡量少的交互獲取盡量多的消息。

分區(qū)消費模式直接由客戶端(任何高級語言編寫)使用Kafka提供的協(xié)議向服務(wù)器發(fā)送RPC請求獲取數(shù)據(jù),服務(wù)器接收到客戶端的RPC請求后,將數(shù)據(jù)構(gòu)造成RPC響應(yīng),返回給客戶端,客戶端解析相應(yīng)的RPC響應(yīng)獲取數(shù)據(jù)。

Kafka支持的協(xié)議:

獲取消息的FetchRequest和FetchResponse發(fā)送請求 發(fā)回響應(yīng)

獲取offset的OffsetRequest和OffsetResponse單獨發(fā)送一個請求返回某一個offse

提交offset的OffsetCommitRequest和OffsetCommitResponse

low-level的api中,提交某個offset,發(fā)送OffsetCommitRequest獲取OffsetCommitResponse

獲取Metadata的Metadata

Request和Metadata

Response metadata(topic的某個分區(qū),leader是誰,還有多少分區(qū),當前服務(wù)器還有多少broker)

生產(chǎn)消息的producerRequest和ProducerResponse生產(chǎn)消息時,服務(wù)器端會構(gòu)造一個producerRequest,發(fā)送給broker,broker回復(fù)一個producerResponse,

組消費模式源碼:

通過ConsumerConfig對象創(chuàng)建配置,通過ConsumerConfig傳遞配置參數(shù),(zk地址,fetch取數(shù)據(jù)創(chuàng)建線程,buffer參數(shù))

根據(jù)配置ConsumerConfig中的相關(guān)配置創(chuàng)建ConsumerConnector(實際上創(chuàng)建的是zkConsumerConnector)

通過ConsumerConnector,創(chuàng)建一個KafkaStream流,kafkaStream流封裝了流式消息 ,類似于java的輸入輸出流

基于KafkaStream創(chuàng)建一個消費者迭代器ConsumerIterator獲取消息進行消息的處理

如果服務(wù)器無新的消息就會進行阻塞,知道有新的消息進入處理消息。Iterator

next hasNext

通過配置創(chuàng)建ConsumerConfig

基于配置建立ZookeeperConsumerConnector連接,創(chuàng)建ZookeeperConsumerConnector對象

一下分為兩方面,第一:

創(chuàng)建ConsumerFetchManager獲取消息通過ConsumerFetchManager

ConsumerFetchManager創(chuàng)建ConsumerFetchThread可以創(chuàng)建多個線程可配置

ConsumerFetchThread發(fā)送FetchRequest獲取消息

服務(wù)器端 將返回的消息填充至FetchDataChannel隊列 將隊列封裝成FetchResponse響應(yīng)? 獲取FetchResponse填充數(shù)據(jù)到KafkaStream獲取流式數(shù)據(jù),在流式數(shù)據(jù)上獲取迭代器,通過迭代器獲取消息

仍然通過發(fā)送FetchRequest

RPC請求獲取消息,由服務(wù)器端幫忙處理,不需要客戶端的操作? 分區(qū)消費模式需要我們自己發(fā)送FetchRequest

組消費模式也會自動提交offset

另一方面:

創(chuàng)建Scheduler,定式調(diào)度的任務(wù),調(diào)度的頻率可通過參數(shù)配置

定期調(diào)用autoCommit

Offset向Zk提交offet把當前用戶消費過的offset更新在zk對應(yīng)的節(jié)點并返回 下一次獲取的時候讀取zk的配置即可

兩種消費模式服務(wù)器端源碼對比:

分區(qū)消費模式:

指定消費topic、partition、Offset通過向服務(wù)器發(fā)送RPC請求進行消費

需要自己提交offset發(fā)送offset

commit Request提交offset;將offset寫到zk對應(yīng)的目錄

需要自己處理各種錯誤,如leader切換錯誤

自己處理消費者負載均衡策略在FetchRequest中指定需要消費的topic和對應(yīng)分區(qū)需要用戶、客戶端自己處理負載均衡

組消費模式:

通過向服務(wù)器發(fā)送RPC請求完成的 和分區(qū)消費模式一樣

組消費模式由kafka客戶端處理各種錯誤(如leader切換錯誤),然后將消息放入隊列再封裝為迭代器(隊列為FetchedDataChunk對象),客戶端只需要在迭代器上迭代取出消息。

由kafka服務(wù)器端周期性的通過scheduler提交當前消費的offset,無需客戶端負責。分區(qū)消費模式則需要自己來管理和提交

Kafka服務(wù)器端處理消費者負載均衡策略

監(jiān)控工具Kafka

Offset Monitor和KafkaManager均是基于組消費模式

盡可能使用組消費模式,除非需要:

自己管理offset,(為了實現(xiàn)消息投遞的其他語義 至多一次? 準確一次)

自己處理各種錯誤(根據(jù)業(yè)務(wù)需求)? 分區(qū)消費模式和low-level

api

Kafka生產(chǎn)者源碼介紹:

同步發(fā)送模式源碼:

創(chuàng)建producer對象 傳遞一些配置? 如brokerlist

ack等等

創(chuàng)建發(fā)送者線程

根據(jù)brokerId和partition選擇SyncPool發(fā)送消息在服務(wù)器端維護一個線程池 從線程池中選擇一個線程

選擇好一個發(fā)送線程后 構(gòu)造ProducerRequest請求 發(fā)送實質(zhì)的消息

按照用戶執(zhí)行的序列化函數(shù)序列化消息

用戶傳遞的消息可能是字符串或其他的形式,需要將其序列化為二進制

序列化函數(shù)可為用戶定義? 默認兩種:字符串序列化為字符數(shù)組

對象序列化為字節(jié)數(shù)組

發(fā)送ProducerRequest請求? 檢查發(fā)送是否出錯(所選擇broker

leader切換)

如果無錯誤接受ProducerResponse響應(yīng)

按照client配置發(fā)送ack信息

如果有錯誤

是否超過最大出錯次數(shù) 如果是發(fā)送出錯? 如果沒有繼續(xù)重試

異步發(fā)送源碼:

Producer異步發(fā)送至客戶端段的一個隊列中,隊列中的消息沒有達到一定數(shù)量或超過一定時間,producer消息一直存放在該隊列中,達到一定數(shù)量后或者積累時間超過配置,

會啟動一個發(fā)送線程,發(fā)送線程會通過輪詢讀取消息,并將這些消息批量發(fā)送至Kafka

Cluster

在發(fā)送消息之前會在Producer

Pool中根據(jù)partition尋找一個SyncProducer

創(chuàng)建producer

創(chuàng)建發(fā)送者線程(同樣是從創(chuàng)建的同步發(fā)送池中獲取的同步發(fā)送線程)

消息放入隊列

消息數(shù)達到或者時間達到?

是的話

根據(jù)brokerId和Partition選擇SyncPool(kafka消息都是通過leader進行讀寫的)

按照用戶執(zhí)行的序列化函數(shù)序列化消息

構(gòu)造ProducerRequest請求RPC請求 同步會一條一條發(fā)送消息和ack異步是批量發(fā)送數(shù)據(jù)給broker

發(fā)送ProducerRequest請求

接收ProducerReponse響應(yīng)

發(fā)送成功 –繼續(xù)接收producer發(fā)送的消息

不管是同步還是異步都是取出一個同步發(fā)送線程,最后都是同步過程,只是異步的時候會先將消息發(fā)送至隊列里,當消息數(shù)達到一定數(shù)量和時間達到配置時發(fā)送。所以同步發(fā)送和異步發(fā)送后半部分都是一致的。

同步發(fā)送和異步發(fā)送最后都是通過構(gòu)造RPC請求進行發(fā)送的,

同步發(fā)送服務(wù)器端會向客戶端發(fā)送一個確認,異步發(fā)送服務(wù)器端不會發(fā)送ACK。

兩種模式服務(wù)器端源碼介紹:

同步發(fā)送模式具有幾下特點:

同步的向服務(wù)器發(fā)送RPC請求進行生產(chǎn),(所謂同步就是每條消息一個請求一個ack)

發(fā)送錯誤可以重試

可以向客戶端發(fā)送ack(可以配置不發(fā)送)

異步:

最終也是通過向服務(wù)器發(fā)送RPC請求完成的(和同步發(fā)送模式一樣),異步發(fā)送的是消息段批量發(fā)送

異步發(fā)送模式先將一定量消息放入隊列中,待達到一定數(shù)量后再一起發(fā)送。

異步發(fā)送模式不支持發(fā)送ack,但是Client可以調(diào)用回調(diào)函數(shù)獲取發(fā)送結(jié)果。

性能高用異步,準確性用同步。

Kafka

Server Reactor設(shè)計模式

大量的連接數(shù) 高并發(fā)Reactor模型基于Java

NIO進行設(shè)計,是對Linux

epoll模型進行改造,

Reactor模型:

Java

NIO:

Java

1.4引入的,NIO比較成熟,Java1.3之前使用的是原生態(tài)的打開文件、讀取文件的方式(比較慢,不方便)

New

io全新的方式讀取文件 套接字

Java

NIO:

Channels:java中的流

Buffers:緩沖區(qū) 存放二進制的緩沖區(qū) 類似于數(shù)組

Selectors:解決并發(fā)網(wǎng)絡(luò)套接字

Channel通道和Java中的Stream一樣,用于傳輸數(shù)據(jù)的數(shù)據(jù)流,數(shù)據(jù)可以從Channel讀到buffer中,也可以從buffer寫到Channel中。

Selector允許單線程處理多個Channel。使用Selector,首先得向Selector注冊Channel,然后調(diào)用它的select方法。此方法會一直阻塞到某個注冊的Channel有時間就緒。一旦這個方法返回,線程可以處理這些事件,事件的例子如新連接進來,數(shù)據(jù)接收等。

Linux

epoll模型:

Epoll是一種IO多路復(fù)用技術(shù),在Linux內(nèi)核中廣泛使用。常見的三種IO多路復(fù)用技術(shù)為select模型、poll模型和epoll模型。

Select模型:輪詢所有的套接字查看是否有時間發(fā)生(套接字需要注冊在select上,select向注冊的套接字進行輪詢,查看是否有事件發(fā)生)

缺點:套接字最大1024個主動輪詢效率低(對所有的套接字不斷地輪詢) 事件發(fā)生后需要將套接字從內(nèi)存空間拷貝到用戶空間效率低

Poll模型和select模型類似,修正了select模型最大套接字限制

Epoll模型:修改主動輪詢?yōu)楸粍虞喸?,當有事件發(fā)生時,被動接收通知。所以,epoll模型注冊套接字后,主程序可以做其他事情,當事件發(fā)生時,接收到通知后再去處理。修正了select模型的三個缺點,(第三點使用共享內(nèi)存修正)共享內(nèi)存:內(nèi)核空間和用戶空間共同使用的內(nèi)存空間。

Epoll模型為Linux系統(tǒng)作為服務(wù)器提供很大的支持。

Java

nio叫做select模型,底層使用的是epoll模型。

Java

nio叫做select模型指的是選擇哪一個Channel的意思。

Kafka

Server Reactor模型:處理大量連接? 大量套接字 不同套接字 不同的行為

Kafka

SocketServer是基于Java

NIO開發(fā)的,采用Reactor的模式(已被大量實踐證明非常高效,在Netty和Mina中廣泛使用 網(wǎng)絡(luò)開發(fā)的框架 大量使用Reactor模型處理高并發(fā)大量連接)。Kafka

Reactor的模式包括三種角色:

Acceptor:接收注冊Channel

Procesor:接收從Select傳回的Channel然后將Channel注冊到Handler

Handler:Handler處理用戶邏輯

Kafka

Reactor包含了一個Acceptor負責接受客戶端請求,N個Processor線程負責讀寫數(shù)據(jù)(為每個Connection創(chuàng)建出一個Processor去單獨處理,每個Processor中均引用獨立的Selector—Processor下也有Selector需要注冊Channel),M個handler來處理業(yè)務(wù)邏輯。在Acceptor和Processor,Processor和Handler之間都有隊列(緩沖、解耦)來緩沖請求。

Kafka

Server Reactor模型:(高并發(fā) 的核心Hbase

Storm? Hadoop? Redis服務(wù)器端

大量使用)? (大量的連接 每個連接處理大量的事件)

客戶端發(fā)起連接

KafkaSocketServer接收連接? 通過Acceptor接收連接注冊Selector創(chuàng)建N個Processor處理業(yè)務(wù) 每個Processor對應(yīng)一個客戶端連接? 為Processor注冊一個Selector

Selector注冊一個Channel,查看對應(yīng)的連接上是否有讀數(shù)據(jù)和寫數(shù)據(jù)

Acceptor注冊Selector給Processor

Processor注冊Selector給Handler

Acceptor主要職責是監(jiān)聽客戶端的連接請求,并建立和客戶端的數(shù)據(jù)傳輸通道,然后為客戶端指定一個Processor,它的工作到此結(jié)束。接著響應(yīng)下一個客戶端的連接請求:

Processor主要職責是負責從客戶端讀取數(shù)據(jù)和將響應(yīng)返回給客戶端,本身不處理業(yè)務(wù)邏輯,每個Processor都有一個Selector,用來監(jiān)聽多個客戶端,可以非阻塞地處理多個客戶端的讀寫請求,Processor將數(shù)據(jù)放入RequestChannel的RequestQueue和從ReponseQueue讀取響應(yīng)。

Handler(kafka.server.KafkaRequestHandler

kafka.server.KafkaApis)的職責從RequestChannel中的RequestQueue取出Request,處理以后再將Response添加到RequestChannel中的ResponseQueue

Kafka

Partition Leader選主機制

大數(shù)據(jù)常用的選主機制:

Leader選舉算法很多

Zab:Zookeeper使用 (選主 同步隊列? 復(fù)制鎖)

他們都是Paxos算法的變種--大數(shù)據(jù)領(lǐng)域非常著名的算法

Zab協(xié)議四個階段:

Leader

election:leader選舉

Discovery(epoch

establish):發(fā)現(xiàn) 版本建立

Synchronize(sync

with followers):從leader同步數(shù)據(jù)和狀態(tài)

Broadcast:leader廣播狀態(tài)和數(shù)據(jù)到follower

Zk和raft

posx都會首先選擇自己為leader然后進行同步協(xié)商;另一個特點:半數(shù)以上的服務(wù)器認為是leader,則同一其為leader。(少數(shù)服從多數(shù))

先選自己為leader,然后選擇一個編號大的為leader,然后半數(shù)贊同為leader,則選舉其唯leader。

一般來說第二個啟動的節(jié)點就是leader

Zk是奇數(shù)的,負載不是很高,處理元數(shù)據(jù)。

Raft:

在Raft中,任何時候一個服務(wù)器可以扮演下面角色之一:

Leader:處理所有客戶端交互,日志復(fù)制等,一般只有一個Leader

Follower:類似選民,完全被動

Candidate:候選人可以被選為新領(lǐng)導(dǎo)者

Zk中每個人都可以成為leader

raft必須從候選人里選擇,候選人是由系統(tǒng)管理員配置的

啟動時集群中制定一些機器為候選人candidate,然后candidate開始向其他機器(尤其是Follower)拉票,當某一個Candidate的票數(shù)超過半數(shù),它就成為leader。

Kafka集群將元數(shù)據(jù)存儲在zk

Kafka集群依賴zk集群,所有follower都在zookeeper上設(shè)置一個watch,一旦leader宕機,其對應(yīng)的ephemeral

znode臨時節(jié)點會自動刪除,此時所有的Follower都嘗試創(chuàng)建該節(jié)點,創(chuàng)建成功者(Zookeeper保證只有一個能創(chuàng)建成功)即是新的leader,其它replica即成為follower

Split-brain(腦裂):由zookeeper特性引起,雖然zookeeper能保證所有watch順序觸發(fā),但并不能保證同一時刻所有repilca“看”到的狀態(tài)是一樣的(可能由于網(wǎng)絡(luò)原因),可能造成replica的響應(yīng)不一致 可能會產(chǎn)生多個leader

Herd

effect(羊群效應(yīng)):如果宕機的那個broker(leader)上的partition比較多,會造成多個watch(follower)被觸發(fā),造成集群內(nèi)大量的調(diào)整。

Zookeeper負載過重:每個replica都要為此在zookeeper注冊一個watch,當集群規(guī)模增加到幾千個partition時zookeeper負載會過重

主要用來處理數(shù)據(jù)量比較小的,比如HDFS的HA切換HA的選舉,用于通信量比較小 數(shù)據(jù)量比較小的選舉

像kafka大數(shù)據(jù)量的傳輸HDFS大數(shù)據(jù)量的傳輸不使用zk

raft進行選舉? 大量通信的開銷kafka和HDFS的大量數(shù)據(jù)的傳輸 很容易造成元數(shù)據(jù)通信消息的消失? 網(wǎng)絡(luò)丟包等缺點

Kafka使用自己的選主機制:

Kafka的Leader

Election解決以上問題,在所有broker中選出一個controller,所有partition的leader選舉都由controller決定。Controller會將leader的改變直接通過rpc的方式(比zookeeper

queue方式更高效)通知需要為此做出相應(yīng)的broker。

controller的選舉過程:以broker為單位使用zookeeper進行選舉而不是之前那種以partition為單位使用zookeeper進行選舉量太大了

kafka

partition leader選舉過程有controller執(zhí)行:從zk中讀取當前分區(qū)的所有ISR(in-sync

replicas)集合(心跳機制保持連接且同步數(shù)據(jù)中數(shù)據(jù)記錄相差不超過10000) 調(diào)用配置的分區(qū)選擇算法選擇分區(qū)的leader(基本都會選擇ISr中的第一個即Prefer分區(qū)作為leader) 當前包括五種選擇算法

離線處理:

Flume

-? kafka? - storm、spark

– mongodb(數(shù)據(jù)入庫)/hbase/redis

Elasticsearch

drill? impala? kylin

常用選主機制的缺點:

Kafka

Partition選主機制:

Kafka生產(chǎn)者源碼

Kafka

Server Reactor設(shè)計模型

KafkaPartition

Leader選舉機制

Kafka回顧:

Kafka業(yè)務(wù)場景

接觸耦合:

增加冗余:規(guī)避數(shù)據(jù)丟失風(fēng)險

提高可擴展性:解耦了處理過程

增大消息入隊和處理的頻率 – 額外增加處理的過程即可

Buffering:任何一個重要的系統(tǒng)需要處理不同的時間元素? 消息隊列通過緩沖層幫助任務(wù)高效執(zhí)行 寫入隊列的處理會盡可能的快速,該緩沖有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)系統(tǒng)的速度

異步通信:異步的非阻塞發(fā)送對于擴展消息系統(tǒng)來說

是一個基本的特性

應(yīng)用場景:

Push

Message:消息推送? 使用kafka作為消息系統(tǒng)的核心中間件完成消息的生產(chǎn)和消費

Website

Tracking:網(wǎng)站跟蹤

日志收集中心:kafka的push

pull適合異構(gòu)集群,適合批量提交消息,對于生產(chǎn)端來說,性能方面沒有消費。消費段;Hadoop離線分析storm實時分析

實時統(tǒng)計平臺搭建注意事項:

HA特性:分布式計算分布式存儲kafka有HA特性

核心文件配置:

啟動步驟:先啟動zookeeper再啟動kafka

Kafka

Project Process

Data

Collection ->? Data? Access? -> Stream Computing ->

Dataoutput

Flume

kafka? ? storm? ? Redis/Mysql(持久化)

Kafka

Producer:Flume

cluster ------ Sink ---------》 (producer)KafkaCluster

Kafka

Consumer:Kafka

Cluster ----------KafkaSpout? ------------》Storm

Cluster

Kafka工程準備:kafka監(jiān)控工具stormui管理界面

基礎(chǔ)環(huán)境準備:

Producer:服務(wù)器數(shù)據(jù)------

flume agent代理節(jié)點-------Sink到Kafka集群

Consumer:kafka集群-------通過kafkaSpout輸送到Storm集群----Storm集群--------通過Storm集群進行實時計算,并將結(jié)果持久化到DB庫中 (MySQl、Redis)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容