背景:
? ? ? 最近在用kafka做消息中間件,producer從hive中讀取消息發(fā)送到kafka,后端storm對(duì)消息分類(lèi)發(fā)送到elasticsearch建立索引。
問(wèn)題:
? ? ? hive表中總共350萬(wàn)數(shù)據(jù),當(dāng)時(shí)整個(gè)全量索引結(jié)束后發(fā)現(xiàn),最后索引條數(shù)總共310萬(wàn)左右。storm日志沒(méi)有任何錯(cuò)誤日志。
排查:
? ? ? 首先排查storm consumer的問(wèn)題,由于發(fā)現(xiàn)storm日志沒(méi)有任何異常,所以第一步基本排除建索引程序的問(wèn)題。storm 消費(fèi)kafka用的官方storm-kafka包,而且已開(kāi)啟ack,所以基本排除storm端的問(wèn)題。
? ? 現(xiàn)在懷疑kafka里的數(shù)據(jù)本身只有310萬(wàn)條數(shù)據(jù),寫(xiě)了一個(gè)程序扔到了kafka集群上探查了一下,印證了自己的想法。果然,數(shù)據(jù)只有310萬(wàn)條?,F(xiàn)在基本判斷問(wèn)題的在kafka producer上。仔細(xì)查看了下producer代碼
props.put("acks","all");
props.put("retries",3);? ? ?
? ? ?"acks" 選項(xiàng)表示kafka 的ack級(jí)別:acks=0 意味著producer永遠(yuǎn)不會(huì)等待任何一個(gè)來(lái)自broker的ack,意味著不需要任何確實(shí),發(fā)送及以為著成功。acks=1 意味著在leader replica已經(jīng)接收到數(shù)據(jù)后,producer會(huì)得到一個(gè)ack,這個(gè)選項(xiàng)對(duì)速度與安全性做一個(gè)平衡,但是不需要等其他副本確認(rèn),如果發(fā)生leader掛了,其他副本還沒(méi)來(lái)得及同步,這時(shí)就會(huì)發(fā)生數(shù)據(jù)丟失的情況。最后一種數(shù)據(jù)最安全的情況就是acks=al,l意味著在所有的ISR都接收到數(shù)據(jù)后,producer才得到一個(gè)ack。這個(gè)選項(xiàng)提供了最好的持久性,只要還有一個(gè)replica存活,那么數(shù)據(jù)就不會(huì)丟失,但是相應(yīng)的吞吐量會(huì)受到影響。本著對(duì)業(yè)務(wù)對(duì)數(shù)據(jù)可靠性的要求,我選擇了最高的可靠級(jí)別,這點(diǎn)沒(méi)毛病。
? ? "retries"選項(xiàng)大于0的值將使客戶(hù)端重新發(fā)送任何數(shù)據(jù),一旦這些數(shù)據(jù)發(fā)送失敗,會(huì)間隔一段時(shí)間重試,這個(gè)值設(shè)置的就是重試間隔時(shí)間。初步懷疑這個(gè)值太小,如果磁盤(pán)卡頓,網(wǎng)絡(luò)中斷超過(guò)三秒,是否會(huì)丟數(shù)據(jù)。所以將這個(gè)參數(shù)調(diào)大到300。
? ? ?重新打包上傳到storm集群重新跑了一回,數(shù)據(jù)還是丟了30多萬(wàn)。場(chǎng)面一度尷尬。。問(wèn)題陷入了僵局。
轉(zhuǎn)機(jī):
? ? 現(xiàn)在的問(wèn)題已經(jīng)超過(guò)了我的認(rèn)知,之前從來(lái)沒(méi)出現(xiàn)過(guò)如此嚴(yán)重的丟數(shù)據(jù)的問(wèn)題。在網(wǎng)上搜的資料大部分都看過(guò)。理論上可靠性可以通過(guò)副本解決,沒(méi)有類(lèi)似于我這個(gè)種問(wèn)題。心想著如果不行,只能更改broker 從page cache同步到硬盤(pán)的頻率了。鬼使神差下,我更改了下producer的壓縮格式,從snappy改到gzip,這次kafka中的消息,竟然只少了2000。同樣的參數(shù),只改了下壓縮格式。我又查看下了前兩次用snapp格式,kafka里的消息數(shù),發(fā)現(xiàn)了一個(gè)問(wèn)題,兩次用snappy的時(shí)候,kafka消息數(shù)竟然一模一樣。如果不是玄學(xué)的問(wèn)題,理論上如果丟消息,350萬(wàn)條,丟相同條數(shù)的信息概率簡(jiǎn)直太小了。
? 現(xiàn)在問(wèn)題似乎已經(jīng)很清晰了,gzip壓縮率要比snappy高,snappy優(yōu)勢(shì)在于壓縮速度。壓縮率高意味著單條數(shù)據(jù)要小?,F(xiàn)在基本問(wèn)題定位在單條數(shù)據(jù)大小的問(wèn)題。但是為什么producer端沒(méi)有異常日志呢。查看一下producer發(fā)送消息的源碼:“Future send(ProducerRecord var1)” producer 發(fā)送消息后會(huì)發(fā)揮一個(gè)future,這種模式是異步發(fā)送方式,當(dāng)broker返回異常信息時(shí)并不會(huì)拋出。,producer.send(producerRecord).get(),加上get(),將異步改同步,打包運(yùn)行果然發(fā)送到30萬(wàn)條左右數(shù)據(jù)時(shí)就已經(jīng)拋出異常
kafka.common.MessageSizeTooLargeException
解決:
? 至此問(wèn)題已經(jīng)定位到,下一步解決問(wèn)題,搜了下stackoverflow,參考下最高票回答:
Consumer side:fetch.message.max.bytes- this will determine the largest size of a message that can be fetched by the consumer.
Broker side:replica.fetch.max.bytes- this will allow for the replicas in the brokers to send messages within the cluster and make sure the messages are replicated correctly. If this is too small, then the message will never be replicated, and therefore, the consumer will never see the message because the message will never be committed (fully replicated).
Broker side:message.max.bytes- this is the largest size of the message that can be received by the broker from a producer.
Broker side (per topic):max.message.bytes- this is the largest size of the message the broker will allow to be appended to the topic. This size is validated pre-compression. (Defaults to broker'smessage.max.bytes.)
? ? 已完美解決問(wèn)題。