kafka高性能:
1.高效使用磁盤
2.零拷貝
3.批處理和壓縮
4.partition
5.ISR
1.1 順序寫磁盤,性能高于隨機寫
零拷貝(沒有用戶態(tài)參與的拷貝,只是在內核態(tài)進行的拷貝):
1.傳統(tǒng)模式下數(shù)據從文件傳輸?shù)骄W絡需要四次拷貝(2次系統(tǒng),2次cpu),4次上下文切換,2次系統(tǒng)調用
File.read(fileDesc,buf,len)
Socket.send(socket,but,len)
2.kafka中實現(xiàn)
通過NIO的transferTo/transferFrom調用系統(tǒng)級別的sendFile函數(shù)實現(xiàn)零拷貝,共2次數(shù)據拷貝和2次上下文切換,1次系統(tǒng)函數(shù)調用,消除了cpu數(shù)據拷貝
數(shù)據拷貝:
1.文件->系統(tǒng)態(tài)的read buff
2.文件描述符的拷貝descriptor
上下文切換:
用戶態(tài)->內核態(tài)->用戶態(tài)(next cycle)
批處理和壓縮
producer和consumer均支持批量處理數(shù)據,從而減少網絡開銷
producer可將數(shù)據壓縮后發(fā)送給broker,從而減少網絡傳輸代價,目前支持snappy,gzip和LZ4壓縮
partition:
1.通過partition實現(xiàn)了并行處理和水平擴展
2.partition是kafka并行處理的最小單元
3.不同kafka處于不同的broker中,充分利用多機資源
4.同一broker上的不同partition可處于不同的directory(磁盤目錄),如果節(jié)點上有多個disk driver(磁盤驅動),可將不同的driver對應不同的
directory,從而使kafka利用多disk driver的磁盤優(yōu)勢
partition中的數(shù)據只有在被broker commit了才能被consumer消費,只有所有isr中的replica都ack了leader,broker才能commit這條消息
isr實現(xiàn)了可用性和一致性的動態(tài)平衡:
replica.lag.time.max.ms=10000,超過10S,follower沒有想leader fetch數(shù)據(或fetch數(shù)據后,還沒有和leader保持同步),則會從isr中移除
Isr可容忍更多的節(jié)點失敗:
majority quorum如果要容忍f個節(jié)點失敗,則需要2f+1個節(jié)點
isr要容忍f個節(jié)點失敗,則需要f+1個節(jié)點
啟動:
./kafka-server-start.sh ../config/server2.properties &
./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 3 --topic topic1
創(chuàng)建消息:
./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 3 --topic topic1
./kafka-console-consumer.sh --bootstrap-service localhost:9091,localhost:9092,localhost:9093 --from-beginning --topic topic1 --new-consumer
./kafka-topics.sh --describe --zookeeper localhost:2181,localhost:2182,localhost:2183 --topic topic1
刪除topic
./kafka-topics.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --delete --topic topic1
#生產消息
./kafka-console-producer.sh --broker-list 127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093 --topic toefl.preclass.correct.rate.topic.local
./kafka-console-consumer.sh --zookeeper localhost:2181/kafka --from-beginning --topic topic1 --group group1
#展示所有的topic
./kafka-topics.sh --list --zookeeper localhost:2181
./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic test2
#指定test2從test1中消費消息
./kafka-replay-log-producer.sh --broker-list localhost:9092 --zookeeper localhost:2181 --inputtopic test1 --outputtopic test2
同步和異步比較
Sync producer:同步
低延遲
低吞吐率
不會丟失數(shù)據
Async producer:異步
高延遲
高吞吐率
可能丟失數(shù)據
consumer只能讀取到被commmit的數(shù)據
isr的常見配置如下
Server配置:
replica.lag.time.max.ms=10000
replica.lag.max.message=4000
topic配置:
min.insync.replicas=1
Producer配置:
request.required.acks=0
0:不需要partition返回ack
1:需要partition返回ack
-1:isr中所有的follower返回ack,leader才會commit
如果處理Replica全部數(shù)據宕機
a.等待ISR中任一Replica恢復,并選舉為Leader
1.等待時間叫較長,降低可用性
2.或isr中的所有數(shù)據都無法恢復或者數(shù)據丟失,則該partition將用不可用
b.選擇第一個恢復的replica為新的leader,無論他是否在isr中
1.并為包含之前commit過的數(shù)據,因此會造成數(shù)據丟失
2.可用性較高
kafka中使用zookeeper
1.zk是什么?
.naming service
.配置管理
.leader election
.服務發(fā)現(xiàn)
.同步
.group service
.barrier
.分布式隊列
.兩階段提交
.zookeeper集群包含一個leader和多個follower
.所有follower提供read服務
.所有寫操作都會被forward到leader
.client和service通過nio通信
.全局串型化所有write操作
.保證同一客戶端的指令被fifo執(zhí)行
.保證消息通知的fifo
zab 廣播協(xié)議:
.leader將所有更新(稱為proposal),順序發(fā)送給follow
.當leader收到半數(shù)以上的的follow對此proposal的ack時,即向所有的follow發(fā)送commit消息,并在本地commit消息
.follow收到proposal即將commit寫入磁盤,寫入成功返回ack給leader
.每個proposal都有一個全局唯一單調遞增的proposal id,即zxid
恢復模式:
進入恢復模式:當leader宕機或者丟失大多數(shù)的follow時
結束恢復模式:當leader被選舉出來且大多數(shù)的follow與leader完成狀態(tài)同步后,恢復模式結束,從而進入廣播模式
恢復模式的意義:
1.發(fā)現(xiàn)集群中最大的zxid
2.建立新的epoch,從而保證新的leader不能再commit新的proposal
3.集群中大部分節(jié)點都commit過前一個leader commit過的消息,而新的leader被大多數(shù)節(jié)點支持,所以之前的leader commit的proposal不會丟失,至少被一個節(jié)點所保存
4.新leader與大多數(shù)節(jié)點通信,從而保證大部分節(jié)點具有新的數(shù)據
zxid:高32位,epoch 每一個leader的任期內,epoch不變,選舉出新的leader后,epoch會增加
低32位 counter
zookeeper一致性的保證:
順序一致性:從一個客戶端發(fā)出的更新操作會按發(fā)送順序,順序執(zhí)行
原子性 :更新操作要不成功,要不失敗,沒有中間狀態(tài)
單一系統(tǒng)鏡像:一個客戶端只能看到同一個view,無論連接到哪臺服務器
可靠性:
1.一旦一個更新被應用,該更新將被持久化,直到有客戶端更新該結果
2.如果一個客戶端得到更新成功的狀態(tài)碼,該更新一定已經生效
3.任何一個被客戶端通過讀或者更新看到的結果,將不會回滾,即使從失敗中恢復
實時性:保證客戶端可以在一定時間內看到最新的視圖
kafka中將topic存在在/config/topics路徑下
get /config/topics/topic1
kafka的broker集群中l(wèi)eader(controller)的路徑:
get /controller 可以看到當前的controller是哪個broker
broker的信息存放在/brokers/ids路徑下面
get /brokers/ids/1 可以查看broker為1的具體信息
基于zk進行l(wèi)eader election
公平模式:
1.創(chuàng)建Leader父節(jié)點,如/chroot,并將其設置為persist
2.各客戶端通過在/chroot下創(chuàng)建leader,如/chroot/leader來競爭leader,該節(jié)點被設置為ephemeral_sequential
3.客戶端通過getChildren方法獲取/chroot下所有的字節(jié)點,如果其注冊的節(jié)點id在所有節(jié)點中最小,則當前客戶端競選leader成功
4.否則,在前一節(jié)點上注冊watch事件,一旦前者被刪除,則它收到通知,返回step 3(并不能認為自己成為新節(jié)點,有可能前面的節(jié)點只是宕機)
5.leader節(jié)點通過刪除自己放棄leadership
kafka基于Controller的leader election
1.整個集群中選舉出一個broker作為controller
2.controller為所有的topic的所有partition指定leader及follower
優(yōu)點:
.緩解herd effect問題
.減輕zk負擔
.controller與leader和follower間通過rpc通信,高效且實時
缺點:
.引入controller增加了復雜度
.需要考慮controller的failover
High level consumer rebalance:
1.high level consumer啟動時將其id注冊到consumer group下,在zk上的路徑為/consumers/[consumer group]/ids/[consumer id]
2.在consumers/[consumer group]/ids上注冊watch
3.在/brokers/ids上注冊watch
4.如果consumer通過topic filter創(chuàng)建消息流,則它會同時在/brokers/topics上創(chuàng)建watch
5.強制自己在consumer group內啟動rebalance流程
consumer rebalance算法:
1.將目標topic下的所有partition排序,存于pt
2.將consumer group下的所有consumer排序,存于cg,第i個consumer標記為ci
3.n=size(pt)/size(cg) 向上取整
4.解除ci對原來分配的partition的消費權,(i從0開始)
5.將第in到n(i+1)n-1個partition分配給ci
用命令啟動消費者:
java -jar Demokafka-0.8.2.2.jar localhost:2181/kafka topic1 group1 consumer1
使用low level consumer(simple consumer)的主要原因是,用戶希望比consumer group更好的控制數(shù)據的消費,如:
1.同一條消息重復讀多次,方便replay
2.只消費某一個topic的部分partition
3.管理事務,從而確保每條消息被處理一次(exacatly one)
與high level consumer相比較,low level consumer要求用戶做大量的額外工作
1.在應用程序中跟蹤處理offset,并決定嚇一條消費哪一條消息
2.獲知每個partition的leader
3.處理leader的變化
4.處理多consumer的協(xié)作
high level consumer:
配置
1.自動管理offset
auto.commit.enable=true。
auto.commit.interval.ms=60*1000. //offset提交的間隔
2.手工管理offset
ConsumerConnector.commitOffset()
3.offset存儲
offset.storage=zookeeper //默認是存儲在zookeeper上,也可以把offset保存在kafka中,配置:offset.storage=kafka
dual.commit.enable=true。 //設置為true,則會同時把offset寫在kafka和zk上
partition對應一個物理文件,每個partiotion中會有很多個segment,每個segment中會有很多的message,kafka會一次行刪除整個segment的文件
log compaction:
1.一直保持消費log head的consumer可按順序消費所有信息,并且offset連續(xù)
2.任何offset從0開始的讀操作都至少可以讀到每個key的最后一條消息
3.每條消息的offset保持不變,offset是消息的永久標示符
4.消費本身的順序不會被改變
new api:
producer:
1.增加回調接口
2.重構partitioner接口
統(tǒng)一high level api 和low level api
1.從kafka.consumer和kafka.javaapi到java.clients.consumer
2.subscribe動態(tài)rebalance vs. assign手動分配
3.將offset存在與zk和kafka之外
4.ConsumerRebalancelisten
5.控制消費位置
6.消費流程控制
subscribe:實現(xiàn)high level api的接口
1.自動rebalance
2.自動分配partition給consumer
3.使用subscribe接口,并可注冊consumerRebalanceListener
a.public void onPartitionsRevoked(Collection<TopicPartition> partitions) //partition原先被一個consumer消費,rebalance后,不被該consumer消費
b.public void onPartitionsAssigned(Collection<TopicPartition> partitions) //重新分配給該consumer消費的partition
assign:實現(xiàn)low level api接口
同一個group下的不同consumer可以指定消費相同的partition
1.使用assign
2.指定消費目標partition
1.0版本的offset管理
a.自動commit
b.手動commit
手動commit全部offset
手動commit部分offset
支持同步或異步commit,并直接commit回調
對特定partition進行commit:
stream api:
重新分配partition
1.可以針對一個topic進行reassign
2.也可以針對某幾個partition進行數(shù)據遷移
計劃plan
./kafka-reassign-partitions.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --topics-to-move-json-file /usr/local/kafka/bin/tmp/topics-to-move.json --broker-list "3,4,5" --generate
執(zhí)行 execute
./kafka-reassign-partitions.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --reassignment-json-file /usr/local/kafka/bin/tmp/resign-plan.json --execute
驗證
./kafka-reassign-partitions.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --verify --reassignment-json-file /usr/local/kafka/bin/tmp/resign-plan.json
preferred replica leader election:
1.在zk上創(chuàng)建/admin/preferred_replica_election節(jié)點,并存入需要調整preferred replica partition信息
2.controller 一直watch該節(jié)點,一旦該節(jié)點被創(chuàng)建,controller會收到通知,并獲取該內容
3.controll讀取preferred replica ,如果發(fā)現(xiàn)該replica并非leader并且不再isr中,controll會向該replica發(fā)送leaderAndRequest,使該replica成為leader,如果當前replica并非leader,并且不再isr中,controller為了保證數(shù)據不丟失,并不會將其設置為leader
./kafka-preferred-replica-election.sh --zookeeper localhost:2181,localhost:2182,localhost:2183 --path-to-json-file /usr/local/kafka/bin/tmp/prefer.json
prefer.json內容:
{"partitions":[{"topic":"topic1","partition":1}]} #設置topic1的partition 1的prefer leader
如何處理replica crash
1.leader crash后,isr中任何replica都可以成為leader
2.如果所有的isr都crash,則isr中第一個恢復的replica成為leader或第一個recover的replica成為leader
3.unclean.leader.election.enable