kafka極簡(jiǎn)入門(四)--常用配置

回顧:kafka極簡(jiǎn)入門(三)--創(chuàng)建topic

前言

kafka針對(duì)broker, topic, producer, consumer的配置非常多,這里講一下常用的配置

1.broker相關(guān)配置
  • broker.id
    broker在kafka集群中的唯一標(biāo)識(shí),必須是一個(gè)大于等于0的整數(shù),如果不寫的話默認(rèn)從1001開始。
    建議:把它設(shè)置成與機(jī)器名具有相關(guān)性的整數(shù)。
  • port
    設(shè)置kafka的端口號(hào),默認(rèn)情況下是9092,不建議修改成1024以下的端口,因?yàn)樾枰褂胷oot權(quán)限啟動(dòng)。
  • zookeeper.connect
    設(shè)置zookeeper集群,該配置參數(shù)是一組用逗號(hào)隔開的host:port/path列表
    • host是zookeeper服務(wù)器的機(jī)器名或ip地址
    • port是zookeeper服務(wù)器的端口
    • /path是可選的zookeeper路徑,作為kafka集群的chroot環(huán)境,默認(rèn)是根路徑,如果指定的chroot路徑不存在,kafka會(huì)在啟動(dòng)的時(shí)候創(chuàng)建它。使用chroot使得zookeeper集群可以共享給其他應(yīng)用程序時(shí)而不會(huì)產(chǎn)生沖突。
  • log.dirs
    配置kafka日志片段的目錄位置,多個(gè)路徑以逗號(hào)隔開。如果配置了多個(gè)路徑,kafka會(huì)根據(jù)最少使用原則,把統(tǒng)一分區(qū)的日志保存到同一路徑下,注意的是,kafka會(huì)往擁有最少數(shù)量分區(qū)的路徑新增分區(qū),而不是往擁有最小磁盤空間的路徑新增分區(qū)。
  • auto.create.topics.enable
    配置是否開啟自動(dòng)創(chuàng)建topic,如果設(shè)置為true, kafka會(huì)在以下幾種場(chǎng)景自動(dòng)創(chuàng)建topic:
    • 當(dāng)一個(gè)producer開始往topic寫入消息時(shí)。
    • 當(dāng)一個(gè)consumer開始從topic消費(fèi)消息時(shí)。
    • 當(dāng)一個(gè)client向topic發(fā)送元數(shù)據(jù)請(qǐng)求時(shí)。
  • num.partitions
    配置創(chuàng)建主題時(shí)包含多少個(gè)分區(qū),默認(rèn)值為1,因?yàn)槲覀兡茉黾又黝}的分區(qū)數(shù),但是不能減少分區(qū)的個(gè)數(shù),所以,如果要讓一個(gè)主題的分區(qū)個(gè)數(shù)少于num.partitions需要手動(dòng)創(chuàng)建該主題而不是通過(guò)自動(dòng)創(chuàng)建主題。
  • log.retention.hours
    配置kafka保留數(shù)據(jù)的時(shí)間,默認(rèn)為168小時(shí)也就是7天,效果等同log.retention.minutes和log.retention.ms,只是單位不一樣,分別是小時(shí),分鐘,和毫秒,推薦使用log.retention.ms,粒度更加細(xì),如果三個(gè)參數(shù)都配置了則去數(shù)值最小的配置。
  • log.retention.bytes
    配置一個(gè)分區(qū)能保存最大的字節(jié)數(shù),如果超出的部分就會(huì)被刪除,同時(shí)配置了log.retention.hours/log.retention.minutes/log.retention.ms的話,任一個(gè)滿足條件都會(huì)觸發(fā)數(shù)據(jù)刪除。
  • message.max.bytes
    配置消息的大小限制,默認(rèn)為100000,也就是1M,這里的大小是指在kafka壓縮后的大小,也就是說(shuō)實(shí)際消息可以大于1M,如果消息超過(guò)這個(gè)限制,則會(huì)被kafka拒收。

2.producer相關(guān)配置
  • bootstrap.servers
    配置broker的地址,格式為host:port,如果多個(gè)則以逗號(hào)隔開,不需要配置所有的broker,producer會(huì)從給定的broker查詢其他broker的信息,不過(guò)建議至少填寫兩個(gè),以防在一個(gè)宕機(jī)的情況還能從另外一個(gè)去獲取broker的信息。
  • acks
    acks指定了必須要有多少個(gè)分區(qū)副本收到消息,producer才會(huì)認(rèn)為消息寫入是成功的。
    • acks=0 producer發(fā)送消息不等待任何來(lái)自服務(wù)器的響應(yīng),所以會(huì)出現(xiàn)消息丟失而producer不感知的情況,該模式下可獲取最大的吞吐量。
    • acks=1 只要集群的leader節(jié)點(diǎn)收到消息,producer就會(huì)收到一個(gè)服務(wù)器的成功響應(yīng),如果消息無(wú)法達(dá)到leader節(jié)點(diǎn),那么producer就會(huì)獲取到一個(gè)錯(cuò)誤響應(yīng),這時(shí)候?yàn)榱吮苊庀⒌膩G失,producer可以選擇重發(fā),不過(guò)如果一個(gè)沒(méi)有收到消息的節(jié)點(diǎn)成為新的leader,那么消息還是會(huì)丟失。
    • acks=all 只有當(dāng)leader節(jié)點(diǎn)和follower節(jié)點(diǎn)都收到消息時(shí),producer才會(huì)收到成功的響應(yīng),這是一個(gè)避免消息丟失最安全的做法,不過(guò)這種模式吞吐量最低
  • client.id
    可以是任意字符串,標(biāo)識(shí)消息的來(lái)源
  • max.in.flight.requests.per.connection
    配置producer在收到服務(wù)器響應(yīng)前可以發(fā)送的消息個(gè)數(shù),值越高,吞吐量就會(huì)越高,不過(guò)相應(yīng)的占用的內(nèi)存也會(huì)越多,設(shè)置為1可以保證消息可以按照發(fā)送的順序?qū)懭敕?wù),即便發(fā)生了重試。
  • max.request.size
    配置producer單次發(fā)送的所有消息的總的大小限制,例如設(shè)置為1M,單個(gè)消息大小為1K,那么單次可以發(fā)的上限是1000個(gè),最好跟message.max.bytes配置匹配,避免發(fā)送到broker的消息被拒絕。
  • retries
    該參數(shù)決定了producer可以重發(fā)消息的次數(shù),producer從broker收到的錯(cuò)誤可能是臨時(shí)性的,例如分區(qū)找不到首領(lǐng),這種情況下,producer在進(jìn)行retries次重試后就會(huì)放棄重試并且返回錯(cuò)誤,默認(rèn)情況下,重試的時(shí)間間隔為100ms,可以通過(guò)retry.backoff.ms參數(shù)配置,建議在設(shè)置重試間隔之前最好測(cè)試一下恢復(fù)一個(gè)崩潰的節(jié)點(diǎn)要多長(zhǎng)時(shí)間,重試的間隔最好比恢復(fù)時(shí)間要長(zhǎng)。
  • batch.size
    當(dāng)多個(gè)消息往同一個(gè)分區(qū)發(fā)送時(shí),producer會(huì)把這些消息放到同一個(gè)分區(qū),該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按字節(jié)數(shù)計(jì)算,當(dāng)批次填滿時(shí)消息就會(huì)被發(fā)送出去,不過(guò)producer不一定等批次被填滿才會(huì)發(fā)送,甚至只有一個(gè)消息也會(huì)被發(fā)送,所以就算把該值設(shè)置得很大也不會(huì)造成延遲,只不過(guò)會(huì)占用內(nèi)存,但是如果設(shè)置太小的話,producer會(huì)很頻繁的發(fā)送,增加一些額外的開銷。
  • linger.ms
    指定producer發(fā)送批次之前要等待更多消息加入批次的時(shí)間,producer會(huì)在批次填滿或者longer.ms到達(dá)上限時(shí)把批次發(fā)送出去,默認(rèn)情況下,只要有可用的線程,就算批次只有一個(gè)消息,producer也會(huì)把消息發(fā)送出去。把linger.ms設(shè)置成比0大的數(shù),讓producer在發(fā)送批次之前多等待一會(huì),可以使得更多的消息可以加入到批次,雖然增加了延遲,但是同時(shí)也增加了吞吐量。

3.Consumer相關(guān)配置
  • fetch.min.bytes
    配置Consumer從broker獲取記錄的最小字節(jié)數(shù),broker在收到Consumer的數(shù)據(jù)請(qǐng)求時(shí),如果可用的數(shù)據(jù)量小于該配置,那么broker會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給Consumer。
  • fetch.max.wait.ms
    配置broker的等待時(shí)間,默認(rèn)為500ms,如果沒(méi)有足夠的數(shù)據(jù)流入,導(dǎo)致不滿足fetch.mis.bytes,最終會(huì)導(dǎo)致500ms的延遲。如果fetch.mis.bytes配置為1M,fetch.max.wait.ms配置為500ms,那么最終broker要么返回1M的數(shù)據(jù),要么等待500ms后返回所有可用的數(shù)據(jù),取決于哪個(gè)條件先得到滿足。
  • max.partition.fetch.bytes
    配置broker從每個(gè)分區(qū)返回給Consumer的最大字節(jié)數(shù),默認(rèn)為1MB,也就是說(shuō),KafkaConsumer.poll()方法從每個(gè)分區(qū)里面返回的記錄最多不超過(guò)該值,加入有10個(gè)分區(qū)5個(gè)消費(fèi)者,則每個(gè)消費(fèi)者需要2MB的內(nèi)存來(lái)接收消息,需要注意的是,如果該值設(shè)置過(guò)大,導(dǎo)致消費(fèi)者處理的業(yè)務(wù)的時(shí)間過(guò)長(zhǎng),會(huì)有回話超時(shí)的風(fēng)險(xiǎn)。
  • session.timeout.ms
    配置了Consumer被認(rèn)定為死亡前可以與服務(wù)器斷開連接的時(shí)間,默認(rèn)3秒,如果服務(wù)器在超過(guò)該值時(shí)間沒(méi)有收到Consumer的心跳,就會(huì)認(rèn)定Consumer死亡,會(huì)觸發(fā)再均衡,把死亡Consumer的分區(qū)分配給其他Consumer,這個(gè)配置要跟heartbeat.interval.ms配合使用,heartbeat.interval.ms設(shè)置了poll()方法向協(xié)調(diào)器發(fā)送心跳的頻率。建議heartbeat.interval.ms的值為session.timeout.ms的三分之一。
  • enable.auto.commit
    指定了Consumer是否開啟自動(dòng)提交偏移量,默認(rèn)為true??梢园阉O(shè)置為false,由程序自己控制何時(shí)提交偏移量來(lái)避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失的情況。
  • auto.offset.reset
    指定在Consumer讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下的策略(因?yàn)橄M(fèi)者長(zhǎng)時(shí)間失效,包含偏移量的記錄已經(jīng)過(guò)時(shí)并刪除),默認(rèn)為latest,表示會(huì)從最新的記錄開始讀取(在消費(fèi)者啟動(dòng)之后生成的記錄,會(huì)出現(xiàn)漏消費(fèi)歷史記錄的情況),另外一個(gè)配置是earliest,表示從最早的消息開始消費(fèi)(會(huì)出現(xiàn)重復(fù)消費(fèi)的情況)
  • partition.assignment.strategy
    分區(qū)分配給Consumer的策略,有兩種:
    • Range
      把topic的若干個(gè)連續(xù)的分區(qū)分配給Consumer,假設(shè)有Consumer1和Consumer2,分別訂閱了topic1和topic2,每個(gè)topic都有3個(gè)分區(qū),那么Consumer1可能分配到topic1和topic2的分區(qū)0和分區(qū)1,Consumer2分配到topic1和topic2的分區(qū)2,這種策略會(huì)導(dǎo)致當(dāng)分區(qū)數(shù)量(針對(duì)單個(gè)topic,上面例子是3個(gè)分區(qū))無(wú)法被消費(fèi)者數(shù)量(上面例子是2個(gè)消費(fèi)者)整除時(shí),就會(huì)出現(xiàn)分區(qū)分布不均勻的情況。


      Range.png
    • RoundRobin
      該策略會(huì)把所有的分區(qū)逐個(gè)分配給Consumer,還是上面的例子,如果按這種策略分配那么Consumer1最終分到的是topic1的分區(qū)0,分區(qū)2和topic2的分區(qū)1,Consumer2最終分到的是topic1的分區(qū)1和topic2的分區(qū)0和分區(qū)2。

      RoundRobin.png

默認(rèn)使用org.apache.kafka.clients.consumer.RangeAssignor,這個(gè)類實(shí)現(xiàn)了Range策略,RoundRabin的實(shí)現(xiàn)類為org.apache.kafka.clients.consumer.RoundRobinAssignor,我們還可以自定義策略。

  • max.poll.records
    指定單次調(diào)用poll()方法返回的記錄數(shù)量。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Design 1. Motivation 我們?cè)O(shè)計(jì)Kafka用來(lái)作為統(tǒng)一的平臺(tái)來(lái)處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)源...
    BlackManba_24閱讀 1,645評(píng)論 0 8
  • 一、入門1、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,684評(píng)論 0 9
  • MQ(消息隊(duì)列)是跨進(jìn)程通信的方式之一,可理解為異步rpc,上游系統(tǒng)對(duì)調(diào)用結(jié)果的態(tài)度往往是重要不緊急。使用消息隊(duì)列...
    allin8116閱讀 634評(píng)論 0 0
  • 項(xiàng)目4月份發(fā)版,直接需要整合kafka,今天開始學(xué),一個(gè)月內(nèi)完成kafka的部分。 資料來(lái)源:http://www...
    MisterCH閱讀 1,831評(píng)論 0 4
  • Kafka的基本概念 BrokerKafka集群中包含多個(gè)服務(wù)器,其中每個(gè)服務(wù)器稱為一個(gè)broker。有一點(diǎn)需要注...
    frmark閱讀 436評(píng)論 0 0

友情鏈接更多精彩內(nèi)容