kafka2:性能優(yōu)化

參考

kafka 技術(shù)分享
如何確定Kafka的分區(qū)數(shù),key和consumer線程數(shù),以及不消費(fèi)問(wèn)題解決
kafka性能參數(shù)和壓力測(cè)試揭秘
kafka producer線程與吞吐量

1.partition數(shù)量配置

partition數(shù)量由topic的并發(fā)決定,并發(fā)少則1個(gè)分區(qū)就可以,并發(fā)越高,分區(qū)數(shù)越多,可以提高吞吐量。

創(chuàng)建topic時(shí)指定topic數(shù)量
bin/kafka-topics.sh --create --zookeeper 10.25.58.35:2181 --replication-factor 3 --partitions 3 --topic test8

2.日志保留策略設(shè)置

當(dāng)kafka broker的被寫入海量消息后,會(huì)生成很多數(shù)據(jù)文件,占用大量磁盤空間,kafka默認(rèn)是保留7天,建議根據(jù)磁盤情況配置,避免磁盤撐爆。

log.retention.hours=72

段文件配置1GB,有利于快速回收磁盤空間,重啟kafka加載也會(huì)加快(如果文件過(guò)小,則文件數(shù)量比較多,kafka啟動(dòng)時(shí)是單線程掃描目錄(log.dir)下所有數(shù)據(jù)文件)

log.segment.bytes=1073741824

3.文件刷盤策略

為了大幅度提高producer寫入吞吐量,需要定期批量寫文件。建議配置:

每當(dāng)producer寫入10000條消息時(shí),刷數(shù)據(jù)到磁盤
log.flush.interval.messages=10000

每間隔1秒鐘時(shí)間,刷數(shù)據(jù)到磁盤
log.flush.interval.ms=1000

4.網(wǎng)絡(luò)和io操作線程配置優(yōu)化

一般num.network.threads主要處理網(wǎng)絡(luò)io,讀寫緩沖區(qū)數(shù)據(jù),基本沒(méi)有io等待,配置線程數(shù)量為cpu核數(shù)加1.

broker處理消息的最大線程數(shù)
num.network.threads=xxx

num.io.threads主要進(jìn)行磁盤io操作,高峰期可能有些io等待,因此配置需要大些。配置線程數(shù)量為cpu核數(shù)2倍,最大不超過(guò)3倍.

broker處理磁盤IO的線程數(shù)
num.io.threads=xxx

加入隊(duì)列的最大請(qǐng)求數(shù),超過(guò)該值,network thread阻塞

queued.max.requests=5000

server使用的send buffer大小。

socket.send.buffer.bytes=1024000

server使用的recive buffer大小。

socket.receive.buffer.bytes=1024000

5.異步提交(kafka.javaapi.producer)

采用同步:1000條8s;
采用異步:100條或3s異步寫入,速度提升為1w條2s(ProducerConfig)

request.required.acks=0  
producer.type=async     
##在異步模式下,一個(gè)batch發(fā)送的消息數(shù)量。producer會(huì)等待直到要發(fā)送的消息數(shù)量達(dá)到這個(gè)值,之后才會(huì)發(fā)送。但如果消息數(shù)量不夠,達(dá)到queue.buffer.max.ms時(shí)也會(huì)直接發(fā)送。       
batch.num.messages=100  
##默認(rèn)值:200,當(dāng)使用異步模式時(shí),緩沖數(shù)據(jù)的最大時(shí)間。例如設(shè)為100的話,會(huì)每隔100毫秒把所有的消息批量發(fā)送。這會(huì)提高吞吐量,但是會(huì)增加消息的到達(dá)延時(shí)
queue.buffering.max.ms=100  
##默認(rèn)值:5000,在異步模式下,producer端允許buffer的最大消息數(shù)量,如果producer無(wú)法盡快將消息發(fā)送給broker,從而導(dǎo)致消息在producer端大量沉積,如果消息的條數(shù)達(dá)到此配置值,將會(huì)導(dǎo)致producer端阻塞或者消息被拋棄。
queue.buffering.max.messages=1000 ##發(fā)送隊(duì)列緩沖長(zhǎng)度
##默認(rèn)值:10000,當(dāng)消息在producer端沉積的條數(shù)達(dá)到 queue.buffering.max.meesages 時(shí),阻塞一定時(shí)間后,隊(duì)列仍然沒(méi)有enqueue(producer仍然沒(méi)有發(fā)送出任何消息)。此時(shí)producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制阻塞的時(shí)間,如果值為-1(默認(rèn)值)則 無(wú)阻塞超時(shí)限制,消息不會(huì)被拋棄;如果值為0 則立即清空隊(duì)列,消息被拋棄。
queue.enqueue.timeout.ms=100     
compression.codec=gzip

6.producer版本

參考

what's the difference between kafka.javaapi.* and org.apache.kafka.*?
Kafka new producer not behaving consistently

使用新producer發(fā)送少量消息時(shí)丟失

新producer:org.apache.kafka.clients.producer(KafkaProducer.java)
老producer:kafka.javaapi.producer(Producer.scala)

  • 查閱資料后,原因?yàn)槭褂胮roducer時(shí)必須調(diào)用producer.close(),且在發(fā)送后Thread.sleep適當(dāng)時(shí)間,則不會(huì)丟失數(shù)據(jù)。否則會(huì)造成資源泄露,導(dǎo)致數(shù)據(jù)丟失。
  • 當(dāng)使用多個(gè)producer進(jìn)行發(fā)送時(shí)(使用apache線程池),當(dāng)同時(shí)有多個(gè)producer并發(fā)發(fā)送時(shí),依然會(huì)造成數(shù)據(jù)丟失。sleep后有好轉(zhuǎn),但仍然丟失。
  • 使用老producer,且compression.codec不為snappy時(shí),不會(huì)造成數(shù)據(jù)丟失。使用線程池也不會(huì)丟失。

7.性能測(cè)試

kafka 10 性能測(cè)試
kafka自帶的性能測(cè)試工具,位于bin/kafka-producer-perf-test.sh。

8.生產(chǎn)端發(fā)送堵塞

  • 調(diào)整producer緩沖區(qū)大小 queue.buffering.max.messages
  • 增加通道數(shù)量:多建幾個(gè)producer,使用連接池管理producer

producer使用線程池

  1. buffer.memory設(shè)置的緩存是針對(duì)每個(gè)producerThread
    針對(duì)每個(gè)producerThread,不應(yīng)設(shè)置高,以免影響內(nèi)存
  2. 線程池中線程數(shù)量如何設(shè)置?
    監(jiān)視剩余線程數(shù)據(jù),進(jìn)行動(dòng)態(tài)調(diào)整,并針對(duì)可能出現(xiàn)的峰值預(yù)留一定的線程。
  3. 使用tryAcquire()還是acquire()??阻塞或放棄消息??
    使用apache的線程池即可,設(shè)置阻塞時(shí)的等待時(shí)間,超過(guò)后則拋出異常。
  4. 是否對(duì)線程池容量進(jìn)行動(dòng)態(tài)調(diào)整?
    使用apache的線程池即可。
  5. 線程池最大線程數(shù)100,啟用50個(gè)thread同時(shí)發(fā)送日志,報(bào)錯(cuò):

kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(test12,null,null,........
報(bào)錯(cuò)原因?yàn)樯a(chǎn)速度大于發(fā)送速度(網(wǎng)絡(luò)傳輸?shù)葲Q定),可設(shè)置繼續(xù)等待時(shí)間,超過(guò)此時(shí)間后丟棄消息;或設(shè)置一直阻塞,排隊(duì)等待消息發(fā)送完畢(會(huì)造成線程死鎖)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • kafka的定義:是一個(gè)分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動(dòng)流(Act...
    時(shí)待吾閱讀 5,539評(píng)論 1 15
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,586評(píng)論 0 34
  • Design 1. Motivation 我們?cè)O(shè)計(jì)Kafka用來(lái)作為統(tǒng)一的平臺(tái)來(lái)處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)源...
    BlackManba_24閱讀 1,645評(píng)論 0 8
  • Kafka入門經(jīng)典教程-Kafka-about云開發(fā) http://www.aboutyun.com/threa...
    葡萄喃喃囈語(yǔ)閱讀 10,985評(píng)論 4 54

友情鏈接更多精彩內(nèi)容