回顧:kafka極簡(jiǎn)入門(三)--創(chuàng)建topic
前言
kafka針對(duì)broker, topic, producer, consumer的配置非常多,這里講一下常用的配置
1.broker相關(guān)配置
-
broker.id
broker在kafka集群中的唯一標(biāo)識(shí),必須是一個(gè)大于等于0的整數(shù),如果不寫的話默認(rèn)從1001開始。
建議:把它設(shè)置成與機(jī)器名具有相關(guān)性的整數(shù)。 -
port
設(shè)置kafka的端口號(hào),默認(rèn)情況下是9092,不建議修改成1024以下的端口,因?yàn)樾枰褂胷oot權(quán)限啟動(dòng)。 -
zookeeper.connect
設(shè)置zookeeper集群,該配置參數(shù)是一組用逗號(hào)隔開的host:port/path列表- host是zookeeper服務(wù)器的機(jī)器名或ip地址
- port是zookeeper服務(wù)器的端口
- /path是可選的zookeeper路徑,作為kafka集群的chroot環(huán)境,默認(rèn)是根路徑,如果指定的chroot路徑不存在,kafka會(huì)在啟動(dòng)的時(shí)候創(chuàng)建它。使用chroot使得zookeeper集群可以共享給其他應(yīng)用程序時(shí)而不會(huì)產(chǎn)生沖突。
-
log.dirs
配置kafka日志片段的目錄位置,多個(gè)路徑以逗號(hào)隔開。如果配置了多個(gè)路徑,kafka會(huì)根據(jù)最少使用原則,把統(tǒng)一分區(qū)的日志保存到同一路徑下,注意的是,kafka會(huì)往擁有最少數(shù)量分區(qū)的路徑新增分區(qū),而不是往擁有最小磁盤空間的路徑新增分區(qū)。 -
auto.create.topics.enable
配置是否開啟自動(dòng)創(chuàng)建topic,如果設(shè)置為true, kafka會(huì)在以下幾種場(chǎng)景自動(dòng)創(chuàng)建topic:- 當(dāng)一個(gè)producer開始往topic寫入消息時(shí)。
- 當(dāng)一個(gè)consumer開始從topic消費(fèi)消息時(shí)。
- 當(dāng)一個(gè)client向topic發(fā)送元數(shù)據(jù)請(qǐng)求時(shí)。
-
num.partitions
配置創(chuàng)建主題時(shí)包含多少個(gè)分區(qū),默認(rèn)值為1,因?yàn)槲覀兡茉黾又黝}的分區(qū)數(shù),但是不能減少分區(qū)的個(gè)數(shù),所以,如果要讓一個(gè)主題的分區(qū)個(gè)數(shù)少于num.partitions需要手動(dòng)創(chuàng)建該主題而不是通過(guò)自動(dòng)創(chuàng)建主題。 -
log.retention.hours
配置kafka保留數(shù)據(jù)的時(shí)間,默認(rèn)為168小時(shí)也就是7天,效果等同log.retention.minutes和log.retention.ms,只是單位不一樣,分別是小時(shí),分鐘,和毫秒,推薦使用log.retention.ms,粒度更加細(xì),如果三個(gè)參數(shù)都配置了則去數(shù)值最小的配置。 -
log.retention.bytes
配置一個(gè)分區(qū)能保存最大的字節(jié)數(shù),如果超出的部分就會(huì)被刪除,同時(shí)配置了log.retention.hours/log.retention.minutes/log.retention.ms的話,任一個(gè)滿足條件都會(huì)觸發(fā)數(shù)據(jù)刪除。 -
message.max.bytes
配置消息的大小限制,默認(rèn)為100000,也就是1M,這里的大小是指在kafka壓縮后的大小,也就是說(shuō)實(shí)際消息可以大于1M,如果消息超過(guò)這個(gè)限制,則會(huì)被kafka拒收。
2.producer相關(guān)配置
-
bootstrap.servers
配置broker的地址,格式為host:port,如果多個(gè)則以逗號(hào)隔開,不需要配置所有的broker,producer會(huì)從給定的broker查詢其他broker的信息,不過(guò)建議至少填寫兩個(gè),以防在一個(gè)宕機(jī)的情況還能從另外一個(gè)去獲取broker的信息。 -
acks
acks指定了必須要有多少個(gè)分區(qū)副本收到消息,producer才會(huì)認(rèn)為消息寫入是成功的。- acks=0 producer發(fā)送消息不等待任何來(lái)自服務(wù)器的響應(yīng),所以會(huì)出現(xiàn)消息丟失而producer不感知的情況,該模式下可獲取最大的吞吐量。
- acks=1 只要集群的leader節(jié)點(diǎn)收到消息,producer就會(huì)收到一個(gè)服務(wù)器的成功響應(yīng),如果消息無(wú)法達(dá)到leader節(jié)點(diǎn),那么producer就會(huì)獲取到一個(gè)錯(cuò)誤響應(yīng),這時(shí)候?yàn)榱吮苊庀⒌膩G失,producer可以選擇重發(fā),不過(guò)如果一個(gè)沒(méi)有收到消息的節(jié)點(diǎn)成為新的leader,那么消息還是會(huì)丟失。
- acks=all 只有當(dāng)leader節(jié)點(diǎn)和follower節(jié)點(diǎn)都收到消息時(shí),producer才會(huì)收到成功的響應(yīng),這是一個(gè)避免消息丟失最安全的做法,不過(guò)這種模式吞吐量最低
-
client.id
可以是任意字符串,標(biāo)識(shí)消息的來(lái)源 -
max.in.flight.requests.per.connection
配置producer在收到服務(wù)器響應(yīng)前可以發(fā)送的消息個(gè)數(shù),值越高,吞吐量就會(huì)越高,不過(guò)相應(yīng)的占用的內(nèi)存也會(huì)越多,設(shè)置為1可以保證消息可以按照發(fā)送的順序?qū)懭敕?wù),即便發(fā)生了重試。 -
max.request.size
配置producer單次發(fā)送的所有消息的總的大小限制,例如設(shè)置為1M,單個(gè)消息大小為1K,那么單次可以發(fā)的上限是1000個(gè),最好跟message.max.bytes配置匹配,避免發(fā)送到broker的消息被拒絕。 -
retries
該參數(shù)決定了producer可以重發(fā)消息的次數(shù),producer從broker收到的錯(cuò)誤可能是臨時(shí)性的,例如分區(qū)找不到首領(lǐng),這種情況下,producer在進(jìn)行retries次重試后就會(huì)放棄重試并且返回錯(cuò)誤,默認(rèn)情況下,重試的時(shí)間間隔為100ms,可以通過(guò)retry.backoff.ms參數(shù)配置,建議在設(shè)置重試間隔之前最好測(cè)試一下恢復(fù)一個(gè)崩潰的節(jié)點(diǎn)要多長(zhǎng)時(shí)間,重試的間隔最好比恢復(fù)時(shí)間要長(zhǎng)。 -
batch.size
當(dāng)多個(gè)消息往同一個(gè)分區(qū)發(fā)送時(shí),producer會(huì)把這些消息放到同一個(gè)分區(qū),該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按字節(jié)數(shù)計(jì)算,當(dāng)批次填滿時(shí)消息就會(huì)被發(fā)送出去,不過(guò)producer不一定等批次被填滿才會(huì)發(fā)送,甚至只有一個(gè)消息也會(huì)被發(fā)送,所以就算把該值設(shè)置得很大也不會(huì)造成延遲,只不過(guò)會(huì)占用內(nèi)存,但是如果設(shè)置太小的話,producer會(huì)很頻繁的發(fā)送,增加一些額外的開銷。 -
linger.ms
指定producer發(fā)送批次之前要等待更多消息加入批次的時(shí)間,producer會(huì)在批次填滿或者longer.ms到達(dá)上限時(shí)把批次發(fā)送出去,默認(rèn)情況下,只要有可用的線程,就算批次只有一個(gè)消息,producer也會(huì)把消息發(fā)送出去。把linger.ms設(shè)置成比0大的數(shù),讓producer在發(fā)送批次之前多等待一會(huì),可以使得更多的消息可以加入到批次,雖然增加了延遲,但是同時(shí)也增加了吞吐量。
3.Consumer相關(guān)配置
-
fetch.min.bytes
配置Consumer從broker獲取記錄的最小字節(jié)數(shù),broker在收到Consumer的數(shù)據(jù)請(qǐng)求時(shí),如果可用的數(shù)據(jù)量小于該配置,那么broker會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給Consumer。 -
fetch.max.wait.ms
配置broker的等待時(shí)間,默認(rèn)為500ms,如果沒(méi)有足夠的數(shù)據(jù)流入,導(dǎo)致不滿足fetch.mis.bytes,最終會(huì)導(dǎo)致500ms的延遲。如果fetch.mis.bytes配置為1M,fetch.max.wait.ms配置為500ms,那么最終broker要么返回1M的數(shù)據(jù),要么等待500ms后返回所有可用的數(shù)據(jù),取決于哪個(gè)條件先得到滿足。 -
max.partition.fetch.bytes
配置broker從每個(gè)分區(qū)返回給Consumer的最大字節(jié)數(shù),默認(rèn)為1MB,也就是說(shuō),KafkaConsumer.poll()方法從每個(gè)分區(qū)里面返回的記錄最多不超過(guò)該值,加入有10個(gè)分區(qū)5個(gè)消費(fèi)者,則每個(gè)消費(fèi)者需要2MB的內(nèi)存來(lái)接收消息,需要注意的是,如果該值設(shè)置過(guò)大,導(dǎo)致消費(fèi)者處理的業(yè)務(wù)的時(shí)間過(guò)長(zhǎng),會(huì)有回話超時(shí)的風(fēng)險(xiǎn)。 -
session.timeout.ms
配置了Consumer被認(rèn)定為死亡前可以與服務(wù)器斷開連接的時(shí)間,默認(rèn)3秒,如果服務(wù)器在超過(guò)該值時(shí)間沒(méi)有收到Consumer的心跳,就會(huì)認(rèn)定Consumer死亡,會(huì)觸發(fā)再均衡,把死亡Consumer的分區(qū)分配給其他Consumer,這個(gè)配置要跟heartbeat.interval.ms配合使用,heartbeat.interval.ms設(shè)置了poll()方法向協(xié)調(diào)器發(fā)送心跳的頻率。建議heartbeat.interval.ms的值為session.timeout.ms的三分之一。 -
enable.auto.commit
指定了Consumer是否開啟自動(dòng)提交偏移量,默認(rèn)為true??梢园阉O(shè)置為false,由程序自己控制何時(shí)提交偏移量來(lái)避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失的情況。 -
auto.offset.reset
指定在Consumer讀取一個(gè)沒(méi)有偏移量的分區(qū)或者偏移量無(wú)效的情況下的策略(因?yàn)橄M(fèi)者長(zhǎng)時(shí)間失效,包含偏移量的記錄已經(jīng)過(guò)時(shí)并刪除),默認(rèn)為latest,表示會(huì)從最新的記錄開始讀取(在消費(fèi)者啟動(dòng)之后生成的記錄,會(huì)出現(xiàn)漏消費(fèi)歷史記錄的情況),另外一個(gè)配置是earliest,表示從最早的消息開始消費(fèi)(會(huì)出現(xiàn)重復(fù)消費(fèi)的情況) -
partition.assignment.strategy
分區(qū)分配給Consumer的策略,有兩種:-
Range
把topic的若干個(gè)連續(xù)的分區(qū)分配給Consumer,假設(shè)有Consumer1和Consumer2,分別訂閱了topic1和topic2,每個(gè)topic都有3個(gè)分區(qū),那么Consumer1可能分配到topic1和topic2的分區(qū)0和分區(qū)1,Consumer2分配到topic1和topic2的分區(qū)2,這種策略會(huì)導(dǎo)致當(dāng)分區(qū)數(shù)量(針對(duì)單個(gè)topic,上面例子是3個(gè)分區(qū))無(wú)法被消費(fèi)者數(shù)量(上面例子是2個(gè)消費(fèi)者)整除時(shí),就會(huì)出現(xiàn)分區(qū)分布不均勻的情況。
Range.png -
RoundRobin
該策略會(huì)把所有的分區(qū)逐個(gè)分配給Consumer,還是上面的例子,如果按這種策略分配那么Consumer1最終分到的是topic1的分區(qū)0,分區(qū)2和topic2的分區(qū)1,Consumer2最終分到的是topic1的分區(qū)1和topic2的分區(qū)0和分區(qū)2。
RoundRobin.png
-
默認(rèn)使用org.apache.kafka.clients.consumer.RangeAssignor,這個(gè)類實(shí)現(xiàn)了Range策略,RoundRabin的實(shí)現(xiàn)類為org.apache.kafka.clients.consumer.RoundRobinAssignor,我們還可以自定義策略。
-
max.poll.records
指定單次調(diào)用poll()方法返回的記錄數(shù)量。

