參考
kafka 技術(shù)分享
如何確定Kafka的分區(qū)數(shù),key和consumer線程數(shù),以及不消費(fèi)問(wèn)題解決
kafka性能參數(shù)和壓力測(cè)試揭秘
kafka producer線程與吞吐量
1.partition數(shù)量配置
partition數(shù)量由topic的并發(fā)決定,并發(fā)少則1個(gè)分區(qū)就可以,并發(fā)越高,分區(qū)數(shù)越多,可以提高吞吐量。
創(chuàng)建topic時(shí)指定topic數(shù)量
bin/kafka-topics.sh --create --zookeeper 10.25.58.35:2181 --replication-factor 3 --partitions 3 --topic test8
2.日志保留策略設(shè)置
當(dāng)kafka broker的被寫入海量消息后,會(huì)生成很多數(shù)據(jù)文件,占用大量磁盤空間,kafka默認(rèn)是保留7天,建議根據(jù)磁盤情況配置,避免磁盤撐爆。
log.retention.hours=72
段文件配置1GB,有利于快速回收磁盤空間,重啟kafka加載也會(huì)加快(如果文件過(guò)小,則文件數(shù)量比較多,kafka啟動(dòng)時(shí)是單線程掃描目錄(log.dir)下所有數(shù)據(jù)文件)
log.segment.bytes=1073741824
3.文件刷盤策略
為了大幅度提高producer寫入吞吐量,需要定期批量寫文件。建議配置:
每當(dāng)producer寫入10000條消息時(shí),刷數(shù)據(jù)到磁盤
log.flush.interval.messages=10000
每間隔1秒鐘時(shí)間,刷數(shù)據(jù)到磁盤
log.flush.interval.ms=1000
4.網(wǎng)絡(luò)和io操作線程配置優(yōu)化
一般num.network.threads主要處理網(wǎng)絡(luò)io,讀寫緩沖區(qū)數(shù)據(jù),基本沒(méi)有io等待,配置線程數(shù)量為cpu核數(shù)加1.
broker處理消息的最大線程數(shù)
num.network.threads=xxx
num.io.threads主要進(jìn)行磁盤io操作,高峰期可能有些io等待,因此配置需要大些。配置線程數(shù)量為cpu核數(shù)2倍,最大不超過(guò)3倍.
broker處理磁盤IO的線程數(shù)
num.io.threads=xxx
加入隊(duì)列的最大請(qǐng)求數(shù),超過(guò)該值,network thread阻塞
queued.max.requests=5000
server使用的send buffer大小。
socket.send.buffer.bytes=1024000
server使用的recive buffer大小。
socket.receive.buffer.bytes=1024000
5.異步提交(kafka.javaapi.producer)
采用同步:1000條8s;
采用異步:100條或3s異步寫入,速度提升為1w條2s(ProducerConfig)
request.required.acks=0
producer.type=async
##在異步模式下,一個(gè)batch發(fā)送的消息數(shù)量。producer會(huì)等待直到要發(fā)送的消息數(shù)量達(dá)到這個(gè)值,之后才會(huì)發(fā)送。但如果消息數(shù)量不夠,達(dá)到queue.buffer.max.ms時(shí)也會(huì)直接發(fā)送。
batch.num.messages=100
##默認(rèn)值:200,當(dāng)使用異步模式時(shí),緩沖數(shù)據(jù)的最大時(shí)間。例如設(shè)為100的話,會(huì)每隔100毫秒把所有的消息批量發(fā)送。這會(huì)提高吞吐量,但是會(huì)增加消息的到達(dá)延時(shí)
queue.buffering.max.ms=100
##默認(rèn)值:5000,在異步模式下,producer端允許buffer的最大消息數(shù)量,如果producer無(wú)法盡快將消息發(fā)送給broker,從而導(dǎo)致消息在producer端大量沉積,如果消息的條數(shù)達(dá)到此配置值,將會(huì)導(dǎo)致producer端阻塞或者消息被拋棄。
queue.buffering.max.messages=1000 ##發(fā)送隊(duì)列緩沖長(zhǎng)度
##默認(rèn)值:10000,當(dāng)消息在producer端沉積的條數(shù)達(dá)到 queue.buffering.max.meesages 時(shí),阻塞一定時(shí)間后,隊(duì)列仍然沒(méi)有enqueue(producer仍然沒(méi)有發(fā)送出任何消息)。此時(shí)producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制阻塞的時(shí)間,如果值為-1(默認(rèn)值)則 無(wú)阻塞超時(shí)限制,消息不會(huì)被拋棄;如果值為0 則立即清空隊(duì)列,消息被拋棄。
queue.enqueue.timeout.ms=100
compression.codec=gzip
6.producer版本
參考
what's the difference between kafka.javaapi.* and org.apache.kafka.*?
Kafka new producer not behaving consistently
使用新producer發(fā)送少量消息時(shí)丟失
新producer:org.apache.kafka.clients.producer(KafkaProducer.java)
老producer:kafka.javaapi.producer(Producer.scala)
- 查閱資料后,原因?yàn)槭褂胮roducer時(shí)必須調(diào)用producer.close(),且在發(fā)送后Thread.sleep適當(dāng)時(shí)間,則不會(huì)丟失數(shù)據(jù)。否則會(huì)造成資源泄露,導(dǎo)致數(shù)據(jù)丟失。
- 當(dāng)使用多個(gè)producer進(jìn)行發(fā)送時(shí)(使用apache線程池),當(dāng)同時(shí)有多個(gè)producer并發(fā)發(fā)送時(shí),依然會(huì)造成數(shù)據(jù)丟失。sleep后有好轉(zhuǎn),但仍然丟失。
- 使用老producer,且compression.codec不為snappy時(shí),不會(huì)造成數(shù)據(jù)丟失。使用線程池也不會(huì)丟失。
7.性能測(cè)試
kafka 10 性能測(cè)試
kafka自帶的性能測(cè)試工具,位于bin/kafka-producer-perf-test.sh。
8.生產(chǎn)端發(fā)送堵塞
- 調(diào)整producer緩沖區(qū)大小 queue.buffering.max.messages
- 增加通道數(shù)量:多建幾個(gè)producer,使用連接池管理producer
producer使用線程池
- buffer.memory設(shè)置的緩存是針對(duì)每個(gè)producerThread
針對(duì)每個(gè)producerThread,不應(yīng)設(shè)置高,以免影響內(nèi)存 - 線程池中線程數(shù)量如何設(shè)置?
監(jiān)視剩余線程數(shù)據(jù),進(jìn)行動(dòng)態(tài)調(diào)整,并針對(duì)可能出現(xiàn)的峰值預(yù)留一定的線程。 - 使用tryAcquire()還是acquire()??阻塞或放棄消息??
使用apache的線程池即可,設(shè)置阻塞時(shí)的等待時(shí)間,超過(guò)后則拋出異常。 - 是否對(duì)線程池容量進(jìn)行動(dòng)態(tài)調(diào)整?
使用apache的線程池即可。 - 線程池最大線程數(shù)100,啟用50個(gè)thread同時(shí)發(fā)送日志,報(bào)錯(cuò):
kafka.common.QueueFullException: Event queue is full of unsent messages, could not send event: KeyedMessage(test12,null,null,........
報(bào)錯(cuò)原因?yàn)樯a(chǎn)速度大于發(fā)送速度(網(wǎng)絡(luò)傳輸?shù)葲Q定),可設(shè)置繼續(xù)等待時(shí)間,超過(guò)此時(shí)間后丟棄消息;或設(shè)置一直阻塞,排隊(duì)等待消息發(fā)送完畢(會(huì)造成線程死鎖)。