kafka配置


producer


bootstrap.servers

kafka server地址,設(shè)置一個(gè)節(jié)點(diǎn)雖然仍然可以拿到集群信息但是避免當(dāng)前節(jié)點(diǎn)掛掉,建議設(shè)置多個(gè).

建議: 當(dāng)設(shè)置一個(gè)host時(shí)只要這個(gè)host不掛是不影響使用的,但是為了高可用建議把所有server都配置上.

batch.size

批量發(fā)送到服務(wù)端的字節(jié)數(shù)大小,批量發(fā)送可以提高整體的吞吐量,但是如果設(shè)置過(guò)大會(huì)導(dǎo)致內(nèi)存浪費(fèi).

根據(jù)應(yīng)用實(shí)際情況設(shè)置,如果要求高吞吐量建議開啟.默認(rèn)關(guān)閉.

acks

server端返回ack的個(gè)數(shù).

  • =0那么不需要任何確認(rèn),添加到socket buffer中后就當(dāng)作成功處理,這時(shí)retries設(shè)置就會(huì)無(wú)效了因?yàn)槟J(rèn)就發(fā)送成功不存在重試情況;
  • =1那么只需要該partition的master確認(rèn)后才算發(fā)送成功.存在的問(wèn)題是當(dāng)寫入master后并未同步到ISR中的follower那么該條消息會(huì)丟失;
  • =ALL || =-1需要ISR中的所有節(jié)點(diǎn)都確認(rèn)收到才算發(fā)送成功.安全級(jí)別最高;

默認(rèn)值=1master確認(rèn)后才算發(fā)送成功,滿足大部分需求,如果消息量不大或者對(duì)消息準(zhǔn)確性要求高建議=ALL

linger.ms

發(fā)送消息的延遲時(shí)間,比如設(shè)置5ms那么消息會(huì)逗留5ms來(lái)等待更多消息批量發(fā)送,和上面batch.size組合使用,當(dāng)滿足batch.sizelinger.ms條件之一就會(huì)發(fā)送,默認(rèn)值是0,未設(shè)置的情況下消息直接發(fā)送出去了.

默認(rèn)值=0,直接發(fā)送.根據(jù)應(yīng)用實(shí)際情況調(diào)節(jié).收益類似batch.size.

client.id

應(yīng)用標(biāo)識(shí),消息溯源時(shí)可以使用,設(shè)置后server端的log中會(huì)同時(shí)記錄,host/ip/clientid.

建議添加該配置.

send.buffer.bytes

設(shè)置tcp send buffer,如果設(shè)置為-1則使用操作系統(tǒng)默認(rèn)值.建議跟隨系統(tǒng)默認(rèn)值,錯(cuò)誤設(shè)置會(huì)導(dǎo)致TCP協(xié)議層出現(xiàn)問(wèn)題.

默認(rèn)128K,原理不清楚不建議調(diào)整.

receive.buffer.bytes

設(shè)置tcp receive buffer,如果設(shè)置為-1則使用操作系統(tǒng)默認(rèn)值.建議跟隨系統(tǒng)默認(rèn)值,錯(cuò)誤設(shè)置會(huì)導(dǎo)致TCP協(xié)議層出現(xiàn)問(wèn)題.

默認(rèn)32K,原理不清楚不建議調(diào)整.

max.request.size

單次請(qǐng)求最大字節(jié)數(shù)的限制,影響batch.size配置,另外這個(gè)參數(shù)服務(wù)端也有自己的配置,存在服務(wù)端與客戶端不同的情況.

默認(rèn)1Mb, 根據(jù)應(yīng)用實(shí)際情況調(diào)整.如果開啟批量可能需要調(diào)整此參數(shù).

reconnect.backoff.ms

連接在重試之前等待時(shí)間,避免短時(shí)間頻繁的連接服務(wù)端.

reconnect.backoff.max.ms

重試連接的最大等待時(shí)間,和上一個(gè)參數(shù)相關(guān)聯(lián),如果設(shè)置了這個(gè)參數(shù),那么當(dāng)連接失敗后時(shí)間會(huì)成指數(shù)增加避免connection storm,比如第一次失敗,重連等待1s,又失敗了重連等待3s(只是舉例,具體值不是這個(gè)).

max.block.ms

當(dāng)緩沖區(qū)滿了或者metadata獲取不到時(shí),調(diào)用KafkaProducer.send()KafkaProducer.partitionsFor()阻塞的最大時(shí)間.

默認(rèn)50ms,不建議調(diào)整.

buffer.memory

設(shè)置緩沖區(qū)大小,如果消息產(chǎn)生速度大于發(fā)送到broker的速度,緩沖區(qū)用量就會(huì)增加如果滿了就會(huì)阻塞max.block.ms設(shè)置的時(shí)間,阻塞后緩沖區(qū)仍然是滿的將會(huì)拋異常.

默認(rèn)32Mb,可以通過(guò)線上JMX MBean觀察,如果消息量比較小一般用不了這么大.

retry.backoff.ms

設(shè)置請(qǐng)求失敗重試延后時(shí)間,同樣避免短時(shí)內(nèi)大量重試的問(wèn)題.

默認(rèn)100ms,不建議修改

compression.type

壓縮類型,默認(rèn)沒有壓縮.提供none, gzip, snappy, lz4壓縮算法.

默認(rèn)無(wú)壓縮,消息量大建議壓縮.節(jié)省空間,節(jié)省網(wǎng)絡(luò)傳輸(帶寬是稀缺資源).留心壓縮帶來(lái)的CPU消耗.

metrics.sample.window.ms

指標(biāo)計(jì)算的時(shí)間窗口

metrics.num.samples

計(jì)算指標(biāo)抽樣的數(shù)量

metrics.recording.level

記錄指標(biāo)的級(jí)別

metric.reporters

設(shè)置reporter類型,需要實(shí)現(xiàn)org.apache.kafka.common.metrics.MetricsReporter,如果有事件發(fā)生則會(huì)觸發(fā)對(duì)應(yīng)的調(diào)用.

max.in.flight.requests.per.connection

一個(gè)connection上未ack的請(qǐng)求數(shù),如果這個(gè)值大于1,重試時(shí)就會(huì)導(dǎo)致順序是亂的.

retries

當(dāng)發(fā)送錯(cuò)誤時(shí)重試次數(shù),max.in.flight.requests.per.connection與這個(gè)參數(shù)有關(guān)系,如果max.in.flight.requests.per.connection大于1則存在亂序的可能.舉例:兩批消息,第一批發(fā)送失敗,第二批發(fā)送成功,第一批重試.那么第二批反而在前面.

key.serializer

序列化方式,需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口.

value.serializer

序列化方式,需要實(shí)現(xiàn)org.apache.kafka.common.serialization.Serializer接口.

connections.max.idle.ms

空閑多久之后會(huì)關(guān)閉這個(gè)connection.

partitioner.class

需要實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner這個(gè)接口,作topic partition的路由規(guī)則.

request.timeout.ms

針對(duì)請(qǐng)求的響應(yīng)時(shí)間超時(shí)時(shí)間設(shè)置,如果超過(guò)了這個(gè)時(shí)間則進(jìn)行重試,如果重試次數(shù)到了上限則失敗.該參數(shù)的值應(yīng)該大于broker端的配置replica.lag.time.max.ms, 避免因?yàn)閿?shù)據(jù)備份導(dǎo)致的重試.

interceptor.classes

攔截器配置,需要實(shí)現(xiàn)org.apache.kafka.clients.producer.ProducerInterceptor這個(gè)接口,默認(rèn)無(wú).

enable.idempotence

開啟冪等,新功能0.11版本引入的.如果設(shè)置為true則保證每條消息只被寫入一次.并且要求max.in.flight.requests.per.connection<=5,retries>0,acks=all.如果這些參數(shù)沒有明確設(shè)置那么將自適應(yīng),如果配置了但是參數(shù)之間不兼容則會(huì)拋ConfigException異常. 冪等只對(duì)單partition生效.

transaction.timeout.ms

transaction coordinator等待事務(wù)狀態(tài)更新的超時(shí)時(shí)間,如果改值比broker中的transaction.max.timeout.ms大,那么請(qǐng)求將發(fā)生InvalidTransactionTimeout錯(cuò)誤.

transactional.id

事務(wù)ID,需要enable.idempotence=true,如果未設(shè)置則不開啟事務(wù)功能

使用producerkafka時(shí)建議都添加callback,即便什么都不做也要打印行日志.可以參考:org.apache.kafka.clients.producer.internals.ErrorLoggingCallback


Consumer


group.id

標(biāo)識(shí)當(dāng)前消費(fèi)者屬于哪個(gè)消費(fèi)組

建議配置,管理offset的必要參數(shù)

max.poll.records

調(diào)用一次poll()返回的記錄數(shù)量

建議設(shè)置,和其他參數(shù)有關(guān)聯(lián),如果單次拉取條數(shù)過(guò)多導(dǎo)致消費(fèi)時(shí)間過(guò)長(zhǎng)未commit,可能會(huì)導(dǎo)致該consumer被判定為down掉了,從而觸發(fā)rebalance后其他consumer會(huì)重復(fù)消費(fèi).

max.poll.interval.ms

調(diào)用poll()方法的最大間隔時(shí)間,如果在間隔時(shí)間內(nèi)沒有調(diào)用poll()那么這個(gè)consumer被判定為失敗,觸發(fā)rebalance.

session.timeout.ms

該參數(shù)用來(lái)判定consumer是否失效,consumer會(huì)周期性的發(fā)送心跳到broker,如果在這個(gè)配置的周期內(nèi)并未發(fā)送心跳那么判定該consumer失效,觸發(fā)rebalance.

這個(gè)參數(shù)配置一定要在broker兩個(gè)配置數(shù)值之間group.min.session.timeout.msgroup.max.session.timeout.ms.

heartbeat.interval.ms

兩次心跳的間隔時(shí)間,心跳的作用有兩方面:

  1. 確保kafka知道當(dāng)前的consumer是存活的
  2. 當(dāng)consumer group發(fā)生變化時(shí)(新增或者減少),能確保觸發(fā)rebalance

建議配置的值要小于session.timeout.ms/3

bootstrap.servers

kafka server地址,設(shè)置一個(gè)節(jié)點(diǎn)雖然仍然可以拿到集群信息但是避免當(dāng)前節(jié)點(diǎn)掛掉,建議設(shè)置多個(gè).

建議: 當(dāng)設(shè)置一個(gè)host時(shí)只要這個(gè)host不掛是不影響使用的,但是為了高可用建議把所有server都配置上.

enable.auto.commit

如果設(shè)置成true,那么將異步周期性的提交offset

盡量設(shè)置成false,由自動(dòng)變手動(dòng)更加能控制消息的消費(fèi)情況.

auto.commit.interval.ms

自動(dòng)提交的周期時(shí)間. 開啟enable.auto.commit后有效

partition.assignment.strategy

partition的分配關(guān)系,也就是確定partition與consumer之間關(guān)系的方式

默認(rèn)org.apache.kafka.clients.consumer.RangeAssignor

auto.offset.reset

offset重置的規(guī)則,如果新的topic kafka沒有對(duì)應(yīng)的offset信息,或者當(dāng)前的offset無(wú)效了(歷史數(shù)據(jù)被刪除),那么需要指定新的offset是什么,提供幾種類型:

  1. earliest: 隊(duì)列中能找到的最早的offset
  2. latest: 加入時(shí)最新的offset
  3. none: 找不到就報(bào)錯(cuò)
  4. anything else: 直接報(bào)錯(cuò)

默認(rèn)latest

fetch.min.bytes

fetch一次返回的最小字節(jié)數(shù),如果不夠這個(gè)字節(jié)數(shù)就等待.

默認(rèn)值為1字節(jié).調(diào)大這個(gè)參數(shù)可以增加server端的吞吐量

fetch.max.bytes

fetch一次返回的最大字節(jié)數(shù),如果第一條消息的大小超過(guò)了這個(gè)限制仍然會(huì)繼續(xù)拉取保證consumer的正常運(yùn)行.因此并不是一個(gè)絕對(duì)的配置,消息的大小還需要受到broker的message.max.bytes限制,以及topic的max.message.bytes的限制.

默認(rèn)值是50Mb,

fetch.max.wait.ms

如果沒達(dá)到fetch.min.bytes配置的值,fetch請(qǐng)求阻塞的最長(zhǎng)時(shí)間

默認(rèn)500ms

metadata.max.age.ms

周期性的拉取metadata,即便服務(wù)端沒發(fā)生變化

默認(rèn)3分鐘

max.partition.fetch.bytes

fetch.max.bytes類似,只不過(guò)這個(gè)是限制單partition

默認(rèn)值1Mb

send.buffer.bytes/receive.buffer.bytes/client.id/reconnect.backoff.ms/reconnect.backoff.max.ms/retry.backoff.ms/metrics.sample.window.ms/metrics.num.samples/metrics.recording.level/metric.reporters

同producer

check.crcs

使用crc32的方式校驗(yàn)消息是否準(zhǔn)確,避免磁盤等其他原因?qū)е碌南㈠e(cuò)誤,該功能有一定的性能損失,追求極致的性能需要關(guān)閉

默認(rèn)true

key.deserializer/value.deserializer

同producer

request.timeout.ms

同producer

default.api.timeout.ms

指定consumer所有操作的超時(shí)時(shí)間

interceptor.classes

consumer攔截器,需要實(shí)現(xiàn)org.apache.kafka.clients.consumer.ConsumerInterceptor .

默認(rèn)為空

exclude.internal.topics

是否排除kafka內(nèi)部隊(duì)列,比如offset隊(duì)列.

默認(rèn)值true

internal.leave.group.on.close

當(dāng)consumer關(guān)閉時(shí)是否離開group, 如果設(shè)置為false,則不離開組直到時(shí)間超過(guò)session.timeout.ms導(dǎo)致rebalance.

isolation.level

消息的隔離級(jí)別,類似mysql, 如果設(shè)置的是read_uncommitted那么調(diào)用consumer.poll()將讀取到所有消息,如果設(shè)置了read_committed那么調(diào)用consumer.poll()將只能讀取到已經(jīng)commit的消息.

使用kafkaconsumer時(shí)建議commit offset的操作都修改成手動(dòng)提交,來(lái)控制消息消費(fèi)的情況.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • ############################# System ####################...
    snail_knight閱讀 1,811評(píng)論 0 0
  • 1 生產(chǎn)者配置參數(shù):kafka.producer.retries 當(dāng)生產(chǎn)者發(fā)送失敗的時(shí)候重試的次數(shù),大多數(shù)情況下,...
    哈哈_dfde閱讀 11,073評(píng)論 0 4
  • 不管是把 Kafka 作為消息隊(duì)列、消息、總線還是數(shù)據(jù)存儲(chǔ)平臺(tái)來(lái)使用 ,總是需要有一個(gè)可以往 Kafka 寫入數(shù)據(jù)...
    消失er閱讀 11,487評(píng)論 1 5
  • 都說(shuō)深夜是自我的獨(dú)白,今夜喝茶喝多了的我有點(diǎn)難以入眠。 關(guān)燈后,我閉眼幾分鐘又爬起來(lái)開了床頭小燈。心里不知名的思緒...
    零蘭閱讀 507評(píng)論 3 4
  • 放假抽空和同學(xué)打了一哈上海大學(xué)生的ctf,題目質(zhì)量一般,腦洞太大,不過(guò)有兩道代碼審計(jì)題目還是不錯(cuò)的,同時(shí)自己也有好...
    yangc隨想閱讀 1,015評(píng)論 0 0

友情鏈接更多精彩內(nèi)容