producer
bootstrap.servers
kafka server地址,設(shè)置一個(gè)節(jié)點(diǎn)雖然仍然可以拿到集群信息但是避免當(dāng)前節(jié)點(diǎn)掛掉,建議設(shè)置多個(gè).
建議: 當(dāng)設(shè)置一個(gè)host時(shí)只要這個(gè)host不掛是不影響使用的,但是為了高可用建議把所有server都配置上.
batch.size
批量發(fā)送到服務(wù)端的字節(jié)數(shù)大小,批量發(fā)送可以提高整體的吞吐量,但是如果設(shè)置過(guò)大會(huì)導(dǎo)致內(nèi)存浪費(fèi).
根據(jù)應(yīng)用實(shí)際情況設(shè)置,如果要求高吞吐量建議開啟.默認(rèn)關(guān)閉.
acks
server端返回ack的個(gè)數(shù).
- =0那么不需要任何確認(rèn),添加到socket buffer中后就當(dāng)作成功處理,這時(shí)
retries設(shè)置就會(huì)無(wú)效了因?yàn)槟J(rèn)就發(fā)送成功不存在重試情況; - =1那么只需要該partition的master確認(rèn)后才算發(fā)送成功.存在的問(wèn)題是當(dāng)寫入master后并未同步到ISR中的follower那么該條消息會(huì)丟失;
- =ALL || =-1需要ISR中的所有節(jié)點(diǎn)都確認(rèn)收到才算發(fā)送成功.安全級(jí)別最高;
默認(rèn)值=1master確認(rèn)后才算發(fā)送成功,滿足大部分需求,如果消息量不大或者對(duì)消息準(zhǔn)確性要求高建議=ALL
linger.ms
發(fā)送消息的延遲時(shí)間,比如設(shè)置5ms那么消息會(huì)逗留5ms來(lái)等待更多消息批量發(fā)送,和上面batch.size組合使用,當(dāng)滿足batch.size與linger.ms條件之一就會(huì)發(fā)送,默認(rèn)值是0,未設(shè)置的情況下消息直接發(fā)送出去了.
默認(rèn)值=0,直接發(fā)送.根據(jù)應(yīng)用實(shí)際情況調(diào)節(jié).收益類似batch.size.
client.id
應(yīng)用標(biāo)識(shí),消息溯源時(shí)可以使用,設(shè)置后server端的log中會(huì)同時(shí)記錄,host/ip/clientid.
建議添加該配置.
send.buffer.bytes
設(shè)置tcp send buffer,如果設(shè)置為-1則使用操作系統(tǒng)默認(rèn)值.建議跟隨系統(tǒng)默認(rèn)值,錯(cuò)誤設(shè)置會(huì)導(dǎo)致TCP協(xié)議層出現(xiàn)問(wèn)題.
默認(rèn)128K,原理不清楚不建議調(diào)整.
receive.buffer.bytes
設(shè)置tcp receive buffer,如果設(shè)置為-1則使用操作系統(tǒng)默認(rèn)值.建議跟隨系統(tǒng)默認(rèn)值,錯(cuò)誤設(shè)置會(huì)導(dǎo)致TCP協(xié)議層出現(xiàn)問(wèn)題.
默認(rèn)32K,原理不清楚不建議調(diào)整.
max.request.size
單次請(qǐng)求最大字節(jié)數(shù)的限制,影響batch.size配置,另外這個(gè)參數(shù)服務(wù)端也有自己的配置,存在服務(wù)端與客戶端不同的情況.
默認(rèn)1Mb, 根據(jù)應(yīng)用實(shí)際情況調(diào)整.如果開啟批量可能需要調(diào)整此參數(shù).
reconnect.backoff.ms
連接在重試之前等待時(shí)間,避免短時(shí)間頻繁的連接服務(wù)端.
reconnect.backoff.max.ms
重試連接的最大等待時(shí)間,和上一個(gè)參數(shù)相關(guān)聯(lián),如果設(shè)置了這個(gè)參數(shù),那么當(dāng)連接失敗后時(shí)間會(huì)成指數(shù)增加避免connection storm,比如第一次失敗,重連等待1s,又失敗了重連等待3s(只是舉例,具體值不是這個(gè)).
max.block.ms
當(dāng)緩沖區(qū)滿了或者metadata獲取不到時(shí),調(diào)用KafkaProducer.send()與KafkaProducer.partitionsFor()阻塞的最大時(shí)間.
默認(rèn)50ms,不建議調(diào)整.
buffer.memory
設(shè)置緩沖區(qū)大小,如果消息產(chǎn)生速度大于發(fā)送到broker的速度,緩沖區(qū)用量就會(huì)增加如果滿了就會(huì)阻塞max.block.ms設(shè)置的時(shí)間,阻塞后緩沖區(qū)仍然是滿的將會(huì)拋異常.
默認(rèn)32Mb,可以通過(guò)線上JMX MBean觀察,如果消息量比較小一般用不了這么大.
retry.backoff.ms
設(shè)置請(qǐng)求失敗重試延后時(shí)間,同樣避免短時(shí)內(nèi)大量重試的問(wèn)題.
默認(rèn)100ms,不建議修改
compression.type
壓縮類型,默認(rèn)沒有壓縮.提供none, gzip, snappy, lz4壓縮算法.
默認(rèn)無(wú)壓縮,消息量大建議壓縮.節(jié)省空間,節(jié)省網(wǎng)絡(luò)傳輸(帶寬是稀缺資源).留心壓縮帶來(lái)的CPU消耗.
metrics.sample.window.ms
指標(biāo)計(jì)算的時(shí)間窗口
metrics.num.samples
計(jì)算指標(biāo)抽樣的數(shù)量
metrics.recording.level
記錄指標(biāo)的級(jí)別
metric.reporters
設(shè)置reporter類型,需要實(shí)現(xiàn)org.apache.kafka.common.metrics.MetricsReporter,如果有事件發(fā)生則會(huì)觸發(fā)對(duì)應(yīng)的調(diào)用.
max.in.flight.requests.per.connection
一個(gè)connection上未ack的請(qǐng)求數(shù),如果這個(gè)值大于1,重試時(shí)就會(huì)導(dǎo)致順序是亂的.
retries
當(dāng)發(fā)送錯(cuò)誤時(shí)重試次數(shù),max.in.flight.requests.per.connection與這個(gè)參數(shù)有關(guān)系,如果max.in.flight.requests.per.connection大于1則存在亂序的可能.舉例:兩批消息,第一批發(fā)送失敗,第二批發(fā)送成功,第一批重試.那么第二批反而在前面.
key.serializer
序列化方式,需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口.
value.serializer
序列化方式,需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口.
connections.max.idle.ms
空閑多久之后會(huì)關(guān)閉這個(gè)connection.
partitioner.class
需要實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner這個(gè)接口,作topic partition的路由規(guī)則.
request.timeout.ms
針對(duì)請(qǐng)求的響應(yīng)時(shí)間超時(shí)時(shí)間設(shè)置,如果超過(guò)了這個(gè)時(shí)間則進(jìn)行重試,如果重試次數(shù)到了上限則失敗.該參數(shù)的值應(yīng)該大于broker端的配置replica.lag.time.max.ms, 避免因?yàn)閿?shù)據(jù)備份導(dǎo)致的重試.
interceptor.classes
攔截器配置,需要實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor這個(gè)接口,默認(rèn)無(wú).
enable.idempotence
開啟冪等,新功能0.11版本引入的.如果設(shè)置為true則保證每條消息只被寫入一次.并且要求max.in.flight.requests.per.connection<=5,retries>0,acks=all.如果這些參數(shù)沒有明確設(shè)置那么將自適應(yīng),如果配置了但是參數(shù)之間不兼容則會(huì)拋ConfigException異常. 冪等只對(duì)單partition生效.
transaction.timeout.ms
transaction coordinator等待事務(wù)狀態(tài)更新的超時(shí)時(shí)間,如果改值比broker中的transaction.max.timeout.ms大,那么請(qǐng)求將發(fā)生InvalidTransactionTimeout錯(cuò)誤.
transactional.id
事務(wù)ID,需要enable.idempotence=true,如果未設(shè)置則不開啟事務(wù)功能
使用producerkafka時(shí)建議都添加callback,即便什么都不做也要打印行日志.可以參考:org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
Consumer
group.id
標(biāo)識(shí)當(dāng)前消費(fèi)者屬于哪個(gè)消費(fèi)組
建議配置,管理offset的必要參數(shù)
max.poll.records
調(diào)用一次poll()返回的記錄數(shù)量
建議設(shè)置,和其他參數(shù)有關(guān)聯(lián),如果單次拉取條數(shù)過(guò)多導(dǎo)致消費(fèi)時(shí)間過(guò)長(zhǎng)未commit,可能會(huì)導(dǎo)致該consumer被判定為down掉了,從而觸發(fā)rebalance后其他consumer會(huì)重復(fù)消費(fèi).
max.poll.interval.ms
調(diào)用poll()方法的最大間隔時(shí)間,如果在間隔時(shí)間內(nèi)沒有調(diào)用poll()那么這個(gè)consumer被判定為失敗,觸發(fā)rebalance.
session.timeout.ms
該參數(shù)用來(lái)判定consumer是否失效,consumer會(huì)周期性的發(fā)送心跳到broker,如果在這個(gè)配置的周期內(nèi)并未發(fā)送心跳那么判定該consumer失效,觸發(fā)rebalance.
這個(gè)參數(shù)配置一定要在broker兩個(gè)配置數(shù)值之間group.min.session.timeout.ms與group.max.session.timeout.ms.
heartbeat.interval.ms
兩次心跳的間隔時(shí)間,心跳的作用有兩方面:
- 確保kafka知道當(dāng)前的consumer是存活的
- 當(dāng)consumer group發(fā)生變化時(shí)(新增或者減少),能確保觸發(fā)rebalance
建議配置的值要小于session.timeout.ms/3
bootstrap.servers
kafka server地址,設(shè)置一個(gè)節(jié)點(diǎn)雖然仍然可以拿到集群信息但是避免當(dāng)前節(jié)點(diǎn)掛掉,建議設(shè)置多個(gè).
建議: 當(dāng)設(shè)置一個(gè)host時(shí)只要這個(gè)host不掛是不影響使用的,但是為了高可用建議把所有server都配置上.
enable.auto.commit
如果設(shè)置成true,那么將異步周期性的提交offset
盡量設(shè)置成false,由自動(dòng)變手動(dòng)更加能控制消息的消費(fèi)情況.
auto.commit.interval.ms
自動(dòng)提交的周期時(shí)間. 開啟enable.auto.commit后有效
partition.assignment.strategy
partition的分配關(guān)系,也就是確定partition與consumer之間關(guān)系的方式
默認(rèn)org.apache.kafka.clients.consumer.RangeAssignor
auto.offset.reset
offset重置的規(guī)則,如果新的topic kafka沒有對(duì)應(yīng)的offset信息,或者當(dāng)前的offset無(wú)效了(歷史數(shù)據(jù)被刪除),那么需要指定新的offset是什么,提供幾種類型:
- earliest: 隊(duì)列中能找到的最早的offset
- latest: 加入時(shí)最新的offset
- none: 找不到就報(bào)錯(cuò)
- anything else: 直接報(bào)錯(cuò)
默認(rèn)latest
fetch.min.bytes
fetch一次返回的最小字節(jié)數(shù),如果不夠這個(gè)字節(jié)數(shù)就等待.
默認(rèn)值為1字節(jié).調(diào)大這個(gè)參數(shù)可以增加server端的吞吐量
fetch.max.bytes
fetch一次返回的最大字節(jié)數(shù),如果第一條消息的大小超過(guò)了這個(gè)限制仍然會(huì)繼續(xù)拉取保證consumer的正常運(yùn)行.因此并不是一個(gè)絕對(duì)的配置,消息的大小還需要受到broker的message.max.bytes限制,以及topic的max.message.bytes的限制.
默認(rèn)值是50Mb,
fetch.max.wait.ms
如果沒達(dá)到fetch.min.bytes配置的值,fetch請(qǐng)求阻塞的最長(zhǎng)時(shí)間
默認(rèn)500ms
metadata.max.age.ms
周期性的拉取metadata,即便服務(wù)端沒發(fā)生變化
默認(rèn)3分鐘
max.partition.fetch.bytes
與fetch.max.bytes類似,只不過(guò)這個(gè)是限制單partition
默認(rèn)值1Mb
send.buffer.bytes/receive.buffer.bytes/client.id/reconnect.backoff.ms/reconnect.backoff.max.ms/retry.backoff.ms/metrics.sample.window.ms/metrics.num.samples/metrics.recording.level/metric.reporters
同producer
check.crcs
使用crc32的方式校驗(yàn)消息是否準(zhǔn)確,避免磁盤等其他原因?qū)е碌南㈠e(cuò)誤,該功能有一定的性能損失,追求極致的性能需要關(guān)閉
默認(rèn)true
key.deserializer/value.deserializer
同producer
request.timeout.ms
同producer
default.api.timeout.ms
指定consumer所有操作的超時(shí)時(shí)間
interceptor.classes
consumer攔截器,需要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor .
默認(rèn)為空
exclude.internal.topics
是否排除kafka內(nèi)部隊(duì)列,比如offset隊(duì)列.
默認(rèn)值true
internal.leave.group.on.close
當(dāng)consumer關(guān)閉時(shí)是否離開group, 如果設(shè)置為false,則不離開組直到時(shí)間超過(guò)session.timeout.ms導(dǎo)致rebalance.
isolation.level
消息的隔離級(jí)別,類似mysql, 如果設(shè)置的是read_uncommitted那么調(diào)用consumer.poll()將讀取到所有消息,如果設(shè)置了read_committed那么調(diào)用consumer.poll()將只能讀取到已經(jīng)commit的消息.
使用kafkaconsumer時(shí)建議commit offset的操作都修改成手動(dòng)提交,來(lái)控制消息消費(fèi)的情況.