Kafka無(wú)消息丟失配置
Kafka到底會(huì)不會(huì)丟數(shù)據(jù)(data loss)? 網(wǎng)上各種說(shuō)法都有,在回答這個(gè)問(wèn)題之前, 我們要明確“責(zé)任邊界”。所謂責(zé)任邊界就是要確定消息在生產(chǎn)和消費(fèi)的完整流程中是由誰(shuí)來(lái)負(fù)責(zé),確保它不會(huì)丟失。這樣即使真的出現(xiàn)了消息丟失,也能明確是責(zé)任主體,有針對(duì)性地進(jìn)行改進(jìn)和調(diào)整。
個(gè)人認(rèn)為,關(guān)于責(zé)任的劃定,官方其實(shí)已經(jīng)給出了很明確的答案:
Once a published message is committed it will not be lost as long as one broker that replicates the partition to which this message was written remains "alive".
倘若我們完全理解這句話,那么“是否丟失消息”的問(wèn)題自可迎刃而解。這句話有兩個(gè)關(guān)鍵要點(diǎn):
committed: Kafka只對(duì)已提交的消息做出交付保證(delivery guarantee),沒(méi)有成功提交的消息Kafka不對(duì)其做出任何承諾
alive:只要有一個(gè)保存了該條消息的broker還活著(alive)就不會(huì)丟失消息
Kafka如何定義一個(gè)broker是否存活(alive)呢? 很簡(jiǎn)單,也是兩個(gè)條件:
節(jié)點(diǎn)進(jìn)程必須存活,且一直維持與zookeeper的會(huì)話
如果是follower節(jié)點(diǎn),它與leader節(jié)點(diǎn)相差的消息數(shù)不能過(guò)大,即不能遠(yuǎn)遠(yuǎn)落后于leader節(jié)點(diǎn)的進(jìn)度。如果按照Kafka的術(shù)語(yǔ)來(lái)說(shuō),就是這個(gè)follower節(jié)點(diǎn)必須是一個(gè)ISR(in-sync replica,即與leader保持同步的副本節(jié)點(diǎn))
當(dāng)然,我個(gè)人絕對(duì)相信,因?yàn)橐恍┠J(rèn)的配置和尚未發(fā)現(xiàn)的bug等原因,上面Kafka所做的保證也不一定百分之百能夠?qū)崿F(xiàn),但大多數(shù)情況下通過(guò)本文的配置是可以幫助你做到無(wú)消息丟失的。
okay,閑言少敘,直接上配置了。下面的參數(shù)配置及Best practice列表可以較好地保證數(shù)據(jù)的持久性(當(dāng)然是trade-off,犧牲了吞吐量)。我會(huì)在該列表之后對(duì)列表中的每一項(xiàng)進(jìn)行討論,有興趣的同學(xué)可以看下后面的分析。
block.on.buffer.full = true
acks = all
retries = MAX_VALUE
max.in.flight.requests.per.connection = 1
使用KafkaProducer.send(record, callback)
如果僅僅是要消息無(wú)丟失,使用帶callback的send方法;如果還要保證無(wú)亂序問(wèn)題,那么發(fā)送失敗時(shí)一定要在callback邏輯中立即關(guān)閉producer:close(0)
unclean.leader.election.enable=false
replication.factor = 3
min.insync.replicas = 2
replication.factor >?min.insync.replicas
enable.auto.commit=false
使用手動(dòng)提交位移,消息處理完成之后再提交位移
給出列表之后,我們從兩個(gè)方面來(lái)探討一下數(shù)據(jù)為什么會(huì)丟失:
1. Producer端
本文討論的是Kafka 0.9版本之后的producer——Kafka0.9正式使用java版producer替換了老版的scala producer。
新版本默認(rèn)使用異步發(fā)送機(jī)制,所以KafkaProducer.send僅僅是把這條消息放入一個(gè)緩存中(即RecordAccumulator,本質(zhì)上使用了隊(duì)列來(lái)緩存記錄),同時(shí)后臺(tái)的Sender IO線程會(huì)不斷掃描該緩存區(qū),將滿(mǎn)足條件的消息封裝到某個(gè)batch中然后發(fā)送出去。顯然,這個(gè)過(guò)程中就有一個(gè)數(shù)據(jù)丟失的窗口:若IO線程發(fā)送之前client端掛掉了,累積在accumulator中的數(shù)據(jù)的確有可能會(huì)丟失。但顯然,這不在Kafka做出保證的責(zé)任邊界內(nèi),畢竟消息沒(méi)有提交成功,尚未被Kafka接管。不過(guò)上面列表中的一些參數(shù)配置仍然可以幫助你避免這種情況下的數(shù)據(jù)丟失。
Producer的另一個(gè)問(wèn)題是消息的亂序問(wèn)題。假設(shè)客戶(hù)端代碼依次執(zhí)行下面的語(yǔ)句將兩條消息發(fā)到相同的分區(qū)
producer.send(record1);
producer.send(record2);
如果此時(shí)由于某些原因(比如瞬時(shí)的網(wǎng)絡(luò)抖動(dòng))導(dǎo)致record1沒(méi)有成功發(fā)送,同時(shí)Kafka又配置了重試機(jī)制和max.in.flight.requests.per.connection大于1(默認(rèn)值是5,本來(lái)就是大于1的),那么重試record1成功后,record1在分區(qū)中就在record2之后,從而造成消息的亂序。很多某些要求強(qiáng)順序保證的場(chǎng)景是不允許出現(xiàn)這種情況的。
鑒于producer的這兩個(gè)問(wèn)題,我們應(yīng)該如何規(guī)避呢??對(duì)于消息丟失的問(wèn)題,很容易想到的一個(gè)方案就是:既然異步發(fā)送有可能丟失數(shù)據(jù), 我改成同步發(fā)送總可以吧?比如這樣:
producer.send(record).get();
這樣當(dāng)然是可以的,但是性能會(huì)很差,不建議這樣使用。因此特意總結(jié)了一份配置列表。個(gè)人認(rèn)為該配置清單應(yīng)該能夠比較好地規(guī)避producer端數(shù)據(jù)丟失情況的發(fā)生:(特此說(shuō)明一下,軟件配置的很多決策都是trade-off,下面的配置也不例外:應(yīng)用了這些配置,你可能會(huì)發(fā)現(xiàn)你的producer/consumer 吞吐量會(huì)下降,這是正常的,因?yàn)槟銚Q取了更高的數(shù)據(jù)安全性)
block.on.buffer.full = true ?盡管該參數(shù)在0.9.0.0已經(jīng)被標(biāo)記為“deprecated”,但鑒于它的含義非常直觀,所以這里還是顯式設(shè)置它為true,使得producer將一直等待緩沖區(qū)直至其變?yōu)榭捎谩7駝t如果producer生產(chǎn)速度過(guò)快耗盡了緩沖區(qū),producer將拋出異常
acks=all ?很好理解,所有follower都響應(yīng)了才認(rèn)為消息提交成功,即"committed"
retries = MAX 無(wú)限重試,直到你意識(shí)到出現(xiàn)了問(wèn)題:)
max.in.flight.requests.per.connection = 1 限制客戶(hù)端在單個(gè)連接上能夠發(fā)送的未響應(yīng)請(qǐng)求的個(gè)數(shù)。設(shè)置此值是1表示kafka broker在響應(yīng)請(qǐng)求之前client不能再向同一個(gè)broker發(fā)送請(qǐng)求。注意:設(shè)置此參數(shù)是為了避免消息亂序
使用KafkaProducer.send(record, callback)而不是send(record)方法 ? 自定義回調(diào)邏輯處理消息發(fā)送失敗
callback邏輯中最好顯式關(guān)閉producer:close(0)?注意:設(shè)置此參數(shù)是為了避免消息亂序
unclean.leader.election.enable=false ? 關(guān)閉unclean leader選舉,即不允許非ISR中的副本被選舉為leader,以避免數(shù)據(jù)丟失
replication.factor >= 3 ? 這個(gè)完全是個(gè)人建議了,參考了Hadoop及業(yè)界通用的三備份原則
min.insync.replicas > 1 消息至少要被寫(xiě)入到這么多副本才算成功,也是提升數(shù)據(jù)持久性的一個(gè)參數(shù)。與acks配合使用
保證replication.factor >?min.insync.replicas ?如果兩者相等,當(dāng)一個(gè)副本掛掉了分區(qū)也就沒(méi)法正常工作了。通常設(shè)置replication.factor =?min.insync.replicas + 1即可
2. Consumer端
consumer端丟失消息的情形比較簡(jiǎn)單:如果在消息處理完成前就提交了offset,那么就有可能造成數(shù)據(jù)的丟失。由于Kafka consumer默認(rèn)是自動(dòng)提交位移的,所以在后臺(tái)提交位移前一定要保證消息被正常處理了,因此不建議采用很重的處理邏輯,如果處理耗時(shí)很長(zhǎng),則建議把邏輯放到另一個(gè)線程中去做。為了避免數(shù)據(jù)丟失,現(xiàn)給出兩點(diǎn)建議:
enable.auto.commit=false ?關(guān)閉自動(dòng)提交位移
在消息被完整處理之后再手動(dòng)提交位移
okay,總結(jié)一下,本文給出了Kafka關(guān)于交付保證的基本定義以及無(wú)消息丟失配置。這只是一個(gè)best practice,具體的使用還要結(jié)合各自的業(yè)務(wù)特點(diǎn)進(jìn)行展開(kāi),有針對(duì)性地進(jìn)行設(shè)置。