Kafka ——如何保證消息不會(huì)丟失

前言

Kafka 提供了數(shù)據(jù)高可靠的特性,
但是如果使用不當(dāng),
你可能無法享受到這一特性,
今天我們就來看看如何正確的使用Kafka 保證數(shù)據(jù)的不會(huì)丟失吧!

生產(chǎn)者的正確的消息發(fā)送方式

Kafka為生產(chǎn)者生產(chǎn)消息提供了一個(gè) send(msg) 方法,
另有一個(gè)重載的方法send(msg, callback),

  • send(msg)
    該方法可以將一條消息發(fā)送出去,
    但是對(duì)發(fā)送出去的消息沒有掌控能力,
    無法得知其最后是不是到達(dá)了Kafka,
    所以這是一種不可靠的發(fā)送方式,
    但是也因?yàn)榭蛻舳酥恍枰?fù)責(zé)發(fā)送,
    所以具有較好的性能。

    Future<RecordMetadata> future = producer.send(record)
    

    上面的示例代碼也可以看到,send返回的是一個(gè) Future,
    也就是說其實(shí)你是可以 Future.get()獲取返回值的,
    但這種同步的方式,基本上可以說是不會(huì)用到。

  • send(msg, callback)
    該方法可以將一條消息發(fā)送出去,
    并且可以從callback回調(diào)中得到該條消息的發(fā)送結(jié)果,
    并且callback是異步回調(diào),
    所以在兼具性能的情況下,
    也對(duì)消息具有比較好的掌控。

     ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
     producer.send(myRecord,
               new Callback() {
                   public void onCompletion(RecordMetadata metadata, Exception e) {
                       if(e != null) {
                          e.printStackTrace();
                       } else {
                          System.out.println("The offset of the record we just sent is: " + metadata.offset());
                       }
                   }
               });
    
  • 綜上,如果要使數(shù)據(jù)不丟失,
    首先你就的使用 send(msg, callback)來發(fā)送消息,
    絕大多數(shù)情況下,我也建議你這么做。

生產(chǎn)者的配置

當(dāng)我們通過 send(msg, callback) 是不是就意味著消息一定不丟失了呢?
答案明顯是:不是的
我們接著上面,
send(msg, callback)里面 callback返回的成功,
到底是不是真的確保消息萬無一失了?
其實(shí)這個(gè)返回的成功也是可以在生產(chǎn)者配置的:

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
//*******重點(diǎn)*****************
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
 producer.close();

這段代碼是生產(chǎn)者發(fā)送消息的一個(gè)例子,
其中沒使用callback主要是這里callback不是重點(diǎn),
我們的重點(diǎn)是props.put("acks", "all");
這個(gè)acks配置屬性就是我們callback成功的具體含義:

  • acks=0
    acks = 0如果設(shè)置為零,那么生產(chǎn)者將完全不會(huì)管服務(wù)器是否收到消息。
    該記錄將立即添加到套接字緩沖區(qū)中并視為已發(fā)送。
    并且重試配置不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障)。
    返回值的偏移量將始終等于 -1。
    該方式具有最大的吞吐量,
    一般建議直接配合 send(msg)使用。

  • acks=1
    當(dāng)leader接受到消息就會(huì)直接給客戶端返回成功,
    一般情況下這種模式都能很好的保證數(shù)據(jù)的不丟失,
    只有在laeder接受到數(shù)據(jù),
    然后還沒來得及同步到follower,
    就掛掉了才會(huì)導(dǎo)致數(shù)據(jù)的丟失,
    這種概率還是比較小的。
    這也是默認(rèn)的選擇方式,
    兼具較好的吞吐和較高的可靠性

  • acks=all 或者 acks=-1
    當(dāng)leader接受到消息,并同步到了一定數(shù)量的follower,
    才向生產(chǎn)者發(fā)生成功的消息,
    同步到的follower數(shù)量由 broker 端的 min.insync.replicas 決定
    除非一些不可抗力因素,
    這種方式基本可以確保數(shù)據(jù)的完全不丟失。

Broker 端的配置

其實(shí)到這里,生產(chǎn)者端基本已經(jīng)做好了數(shù)據(jù)不丟失的大部分準(zhǔn)備,
但是有些東西是要配合 Broker 端一起,
才能達(dá)到預(yù)期的不丟失數(shù)據(jù)的,
比如我們上面說到的

  • min.insync.replicas 配置
    我們上面知道了,
    當(dāng) 生產(chǎn)者 acks = -1 的時(shí)候,
    寫入的副本數(shù)就必須 >= min.insync.replicas 數(shù),
    當(dāng)達(dá)不到這個(gè)要求的時(shí)候,
    生產(chǎn)者端會(huì)收到一個(gè)either NotEnoughReplicas or NotEnoughReplicasAfterAppend的異常。
    所以我們這個(gè)參數(shù)必須不能大于 replication.factor 副本數(shù)。
    否則生產(chǎn)者將無法寫入任何數(shù)據(jù),
    一般建議 replication.factor 數(shù)要大于 min.insync.replicas,
    比如3個(gè)機(jī)器的集群,設(shè)置 replication.factor = 3,
    那么設(shè)置 min.insync.replicas = 2 就可以了,
    這樣既保證了數(shù)據(jù)寫入的時(shí)候有一個(gè)副本的冗余,
    也能保證在一些情況下,
    某臺(tái)Broker宕機(jī)導(dǎo)致數(shù)據(jù)無法達(dá)到3個(gè)副本時(shí),
    依然可以正常寫入數(shù)據(jù)。

  • unclean.leader.election.enable
    這里 Broker 端還有一個(gè)重要的配置就是 unclean.leader.election.enable = false
    這個(gè)配置代表著一些數(shù)據(jù)落后比較多的 follower,
    是否能在leader宕機(jī)后被選舉成新的 leader
    如果你設(shè)置成 true,
    很明顯,如果這樣的follower成為新leader,
    就會(huì)造成最新的一部分?jǐn)?shù)據(jù)丟失掉,

重試 retries

上面已經(jīng)基本完成了不丟數(shù)據(jù)的方方面面了,
但是有些東西不是我們能控制的,
比如 網(wǎng)絡(luò)抖動(dòng) 等不可抗拒的因素,
這時(shí)候重試次數(shù)就很關(guān)鍵了,
配置合適的retries重試次數(shù),
和 合適的retry.backoff.ms重試間隔時(shí)間,
將為我們的數(shù)據(jù)發(fā)送提供更高的穩(wěn)定性,
當(dāng)然如果實(shí)在發(fā)送不成功,怎么辦呢?
一般我們也可以把發(fā)送不成功的數(shù)據(jù)保存在一個(gè)日志文件,
如果數(shù)據(jù)很重要,那就發(fā)送警告信息,
人工干預(yù)一下。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容