前言
Kafka 提供了數(shù)據(jù)高可靠的特性,
但是如果使用不當(dāng),
你可能無法享受到這一特性,
今天我們就來看看如何正確的使用Kafka 保證數(shù)據(jù)的不會(huì)丟失吧!
生產(chǎn)者的正確的消息發(fā)送方式
Kafka為生產(chǎn)者生產(chǎn)消息提供了一個(gè) send(msg) 方法,
另有一個(gè)重載的方法send(msg, callback),
-
send(msg)
該方法可以將一條消息發(fā)送出去,
但是對(duì)發(fā)送出去的消息沒有掌控能力,
無法得知其最后是不是到達(dá)了Kafka,
所以這是一種不可靠的發(fā)送方式,
但是也因?yàn)榭蛻舳酥恍枰?fù)責(zé)發(fā)送,
所以具有較好的性能。Future<RecordMetadata> future = producer.send(record)上面的示例代碼也可以看到,
send返回的是一個(gè)Future,
也就是說其實(shí)你是可以Future.get()獲取返回值的,
但這種同步的方式,基本上可以說是不會(huì)用到。 -
send(msg, callback)
該方法可以將一條消息發(fā)送出去,
并且可以從callback回調(diào)中得到該條消息的發(fā)送結(jié)果,
并且callback是異步回調(diào),
所以在兼具性能的情況下,
也對(duì)消息具有比較好的掌控。ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value); producer.send(myRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if(e != null) { e.printStackTrace(); } else { System.out.println("The offset of the record we just sent is: " + metadata.offset()); } } }); 綜上,如果要使數(shù)據(jù)不丟失,
首先你就的使用send(msg, callback)來發(fā)送消息,
絕大多數(shù)情況下,我也建議你這么做。
生產(chǎn)者的配置
當(dāng)我們通過 send(msg, callback) 是不是就意味著消息一定不丟失了呢?
答案明顯是:不是的
我們接著上面,
send(msg, callback)里面 callback返回的成功,
到底是不是真的確保消息萬無一失了?
其實(shí)這個(gè)返回的成功也是可以在生產(chǎn)者配置的:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
//*******重點(diǎn)*****************
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
這段代碼是生產(chǎn)者發(fā)送消息的一個(gè)例子,
其中沒使用callback主要是這里callback不是重點(diǎn),
我們的重點(diǎn)是props.put("acks", "all");
這個(gè)acks配置屬性就是我們callback成功的具體含義:
acks=0
acks = 0如果設(shè)置為零,那么生產(chǎn)者將完全不會(huì)管服務(wù)器是否收到消息。
該記錄將立即添加到套接字緩沖區(qū)中并視為已發(fā)送。
并且重試配置不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障)。
返回值的偏移量將始終等于 -1。
該方式具有最大的吞吐量,
一般建議直接配合send(msg)使用。acks=1
當(dāng)leader接受到消息就會(huì)直接給客戶端返回成功,
一般情況下這種模式都能很好的保證數(shù)據(jù)的不丟失,
只有在laeder接受到數(shù)據(jù),
然后還沒來得及同步到follower,
就掛掉了才會(huì)導(dǎo)致數(shù)據(jù)的丟失,
這種概率還是比較小的。
這也是默認(rèn)的選擇方式,
兼具較好的吞吐和較高的可靠性acks=all 或者 acks=-1
當(dāng)leader接受到消息,并同步到了一定數(shù)量的follower,
才向生產(chǎn)者發(fā)生成功的消息,
同步到的follower數(shù)量由 broker 端的min.insync.replicas決定
除非一些不可抗力因素,
這種方式基本可以確保數(shù)據(jù)的完全不丟失。
Broker 端的配置
其實(shí)到這里,生產(chǎn)者端基本已經(jīng)做好了數(shù)據(jù)不丟失的大部分準(zhǔn)備,
但是有些東西是要配合 Broker 端一起,
才能達(dá)到預(yù)期的不丟失數(shù)據(jù)的,
比如我們上面說到的
min.insync.replicas配置
我們上面知道了,
當(dāng) 生產(chǎn)者acks = -1的時(shí)候,
寫入的副本數(shù)就必須 >=min.insync.replicas數(shù),
當(dāng)達(dá)不到這個(gè)要求的時(shí)候,
生產(chǎn)者端會(huì)收到一個(gè)either NotEnoughReplicas or NotEnoughReplicasAfterAppend的異常。
所以我們這個(gè)參數(shù)必須不能大于replication.factor副本數(shù)。
否則生產(chǎn)者將無法寫入任何數(shù)據(jù),
一般建議replication.factor數(shù)要大于min.insync.replicas,
比如3個(gè)機(jī)器的集群,設(shè)置replication.factor= 3,
那么設(shè)置min.insync.replicas= 2 就可以了,
這樣既保證了數(shù)據(jù)寫入的時(shí)候有一個(gè)副本的冗余,
也能保證在一些情況下,
某臺(tái)Broker宕機(jī)導(dǎo)致數(shù)據(jù)無法達(dá)到3個(gè)副本時(shí),
依然可以正常寫入數(shù)據(jù)。unclean.leader.election.enable
這里 Broker 端還有一個(gè)重要的配置就是unclean.leader.election.enable = false
這個(gè)配置代表著一些數(shù)據(jù)落后比較多的 follower,
是否能在leader宕機(jī)后被選舉成新的 leader
如果你設(shè)置成 true,
很明顯,如果這樣的follower成為新leader,
就會(huì)造成最新的一部分?jǐn)?shù)據(jù)丟失掉,
重試 retries
上面已經(jīng)基本完成了不丟數(shù)據(jù)的方方面面了,
但是有些東西不是我們能控制的,
比如 網(wǎng)絡(luò)抖動(dòng) 等不可抗拒的因素,
這時(shí)候重試次數(shù)就很關(guān)鍵了,
配置合適的retries重試次數(shù),
和 合適的retry.backoff.ms重試間隔時(shí)間,
將為我們的數(shù)據(jù)發(fā)送提供更高的穩(wěn)定性,
當(dāng)然如果實(shí)在發(fā)送不成功,怎么辦呢?
一般我們也可以把發(fā)送不成功的數(shù)據(jù)保存在一個(gè)日志文件,
如果數(shù)據(jù)很重要,那就發(fā)送警告信息,
人工干預(yù)一下。