1. kafka 生產者發(fā)送消息的流程

2. Kafka 生產者發(fā)送數據的3種方式
(1) 發(fā)送并忘記(fire-and-forget)
把消息發(fā)送給服務器,但并不關心它是否正常到達。大多數情況下,消息會正常到達,因為 Kafka 是高可用的,而且生產者會自動嘗試重發(fā)。不過,使用這種方式有時候也會丟失一些消息。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* @Title Producer01.java
* @Description Kafka 生產者發(fā)送消息的第一種方式:發(fā)送并忘記
* @Author YangYunhe
* @Date 2018-06-21 10:35:34
*/
public class Producer01 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("acks", "1");
props.put("retries", 3);
props.put("batch.size", 16384); // 16K
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432); // 32M
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer01.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
// 創(chuàng)建 ProducerRecord 可以指定 topic、partition、key、value,其中 partition 和 key 是可選的
// ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", 0, "key", line);
// ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", "key", line);
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
// 只管發(fā)送消息,不管是否發(fā)送成功
producer.send(record);
Thread.sleep(100);
}
producer.close();
}
}
(2) 同步發(fā)送
使用 send() 方法發(fā)送消息,它會返回一個 Future 對象,調用 get() 方法進行等待(會返回元數據或者拋出異常),
就可以知道消息是否發(fā)送成功。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title Producer02.java
* @Description Kafka 生產者發(fā)送消息的第二種方式:同步發(fā)送
* @Author YangYunhe
* @Date 2018-06-21 10:38:37
*/
public class Producer02 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer02.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
// 程序阻塞,直到該條消息發(fā)送成功返回元數據信息或者報錯
RecordMetadata metadata = producer.send(record).get();
StringBuilder sb = new StringBuilder();
sb.append("record [").append(line).append("] has been sent successfully!").append("\n")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(sb.toString());
Thread.sleep(100);
}
producer.close();
}
}
(3) 異步發(fā)送
大多數時候,我們并不需要等待響應——盡管 Kafka會把目標主題、分區(qū)信息和消息的偏移量發(fā)送回來,但對于發(fā)送端的應用程序來說不是必需的。
不過在遇到消息發(fā)送失敗時,我們需要拋出異常、記錄錯誤日志等,這樣的情況下可以使用異步發(fā)送消息的方式,調用 send() 方法,并指定一個回調函數,服務器在返回響應時調用該函數。
package com.bonc.rdpe.kafka110.producer;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title Producer03.java
* @Description Kafka 生產者發(fā)送消息的第三種方式:異步發(fā)送
* @Author YangYunhe
* @Date 2018-06-21 11:06:05
*/
public class Producer03 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
String filePath = Producer03.class.getClassLoader().getResource("wechat_data.txt").getPath();
BufferedReader br = new BufferedReader(new FileReader(filePath));
String line;
while((line = br.readLine()) != null) {
ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
// 如果發(fā)送消息成功,返回了 RecordMetadata
if(metadata != null) {
StringBuilder sb = new StringBuilder();
sb.append("message has been sent successfully! ")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(sb.toString());
}
// 如果消息發(fā)送失敗,拋出異常
if(e != null) {
e.printStackTrace();
}
}
});
Thread.sleep(100);
}
producer.close();
}
}
3. 多線程生產者
在數據量比較大同時對發(fā)送消息的順序沒有嚴格要求時,可以使用多線程的方式發(fā)送數據,實現(xiàn)多線程生產者有兩種方式:1. 實例化一個 KafkaProducer 對象運行多個線程共享該對象發(fā)送消息;2. 實例化多個 KafkaProducer 對象。
由于 Kafka Producer 是線程安全的,所以多個線程共享一個 Kafka Producer 對象在性能上要好很多。
(1) 線程類實現(xiàn)
package com.bonc.rdpe.kafka110.thread;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @Title KafkaProducerThread.java
* @Description 多線程生產者的線程類實現(xiàn)
* @Author YangYunhe
* @Date 2018-06-25 13:54:38
*/
public class KafkaProducerThread implements Runnable {
private KafkaProducer<String, String> producer;
private ProducerRecord<String, String> record;
public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
this.producer = producer;
this.record = record;
}
@Override
public void run() {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null) {
System.out.println("exception occurs when sending message: " + exception);
}
if(metadata != null) {
StringBuilder result = new StringBuilder();
result.append("message[" + record.value() + "] has been sent successfully! ")
.append("send to partition ").append(metadata.partition())
.append(", offset = ").append(metadata.offset());
System.out.println(result.toString());
}
}
});
}
}
(2) 發(fā)送消息的具體實現(xiàn)
package com.bonc.rdpe.kafka110.producer;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import com.bonc.rdpe.kafka110.thread.KafkaProducerThread;
/**
* @Title MultiProducer.java
* @Description 多線程生產者的測試代碼
* @Author YangYunhe
* @Date 2018-06-25 14:30:58
*/
public class MultiProducer {
private static final int THREADS_NUMS = 10;
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(THREADS_NUMS);
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record;
try {
for(int i = 0; i < 100; i++) {
record = new ProducerRecord<>("dev3-yangyunhe-topic001", "hello" + i);
executor.submit(new KafkaProducerThread(producer, record));
Thread.sleep(1000);
}
}catch (Exception e) {
System.out.println("exception occurs when sending message: " + e);
}finally {
producer.close();
executor.shutdown();
}
}
}
(3) 運行結果:
message[hello0] has been sent successfully! send to partition 1, offset = 705
message[hello1] has been sent successfully! send to partition 0, offset = 705
message[hello2] has been sent successfully! send to partition 2, offset = 704
message[hello3] has been sent successfully! send to partition 1, offset = 706
message[hello4] has been sent successfully! send to partition 0, offset = 706
......
4. Kafka Producer 常用配置(kafka-1.1.0)
(1) acks
- 類型:string
- 默認值:1
- 可設置值:[all, -1, 0, 1]
- 重要性:高
- 說明:
- 0:生產者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現(xiàn)了問題,導致服務器沒有收到消息,那么生產者就無從得知,消息也就丟失了。不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發(fā)送消息,從而達到很高的吞吐量。
- 1:只要集群的首領節(jié)點收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法到達首領節(jié)點(比如首領節(jié)點崩潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發(fā)消息。不過,如果一個沒有收到消息的節(jié)點成為新首領,消息還是會丟失。這個時候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務器的響應(通過調用 Future 對象的 get() 方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發(fā)送中消息數量的限制(比如,生產者在收到服務器響應之前可以發(fā)送多少個消息)。
- all:只有當所有參與復制的節(jié)點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發(fā)生崩潰,整個集群仍然可以運行。不過,它的延遲比 acks=1 時更高,因為我們要等待不只一個服務器節(jié)點接收消息。
- -1:作用與"all"是一樣的。
(2) buffer.memory
- 類型:long
- 默認值:33554432(32M)
- 可設置值:[0,...]
- 重要性:高
- 說明:該參數用來設置生產者內存緩沖區(qū)的大小,生產者用它緩沖要發(fā)送到服務器的消息。如果應用程序發(fā)送消息的速度超過發(fā)送到服務器的速度,會導致生產者空間不足。
這個時候,send()方法調用要么被阻塞,要么拋出異常,取決于如何設置 max.block.ms (類型:long,默認值:60000(1分鐘),可設置值:[0,...],重要性:中等)參數。表示在拋出異常之前可以阻塞的時間。
(3) compression.type
- 類型:string
- 默認值:none
- 可設置值:[none, gzip, snappy, lz4]
- 重要性:高
- 說明:該參數可以指定消息被發(fā)送給 broker 之前使用哪一種壓縮算法進行壓縮。snappy 壓縮算法由 Google 發(fā)明,它占用較少的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。gzip 壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,所以如果網絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。
(4) retries
- 類型:int
- 默認值:0
- 可設置值:[0,...,2147483647]
- 重要性:高
- 說明:生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區(qū)找不到首領)。在這種情況下,retries 參數的值決定了生產者可以重發(fā)消息的次數,如果達到這個次數,生產者會放棄重試并返回錯誤。默認情況下,生產者會在每次重試之間等待 100ms,不過可以通過 retry.backoff.ms(類型:long,默認值:100, 可設置值:[0,...],重要性:低) 參數來改變這個時間間隔。
建議在設置重試次數和重試時間間隔之前,先測試一下恢復一個崩潰節(jié)點需要多少時間(比如所有分區(qū)選舉出首領需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長,否則生產者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如"消息太大"錯誤)。一般情況下,因為生產者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。你只需要處理那些不可重試的錯誤或重試次數超出上限的情況。
(5) batch.size
- 類型:int
- 默認值:16384(16K)
- 可設置值:[0,...]
- 重要性:中等
- 說明:當有多個消息需要被發(fā)送到同一個分區(qū)時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節(jié)數計算(而不是消息個數)。當批次被填滿,批次里的所有消息會被發(fā)送出去。不過生產者并不一定都會等到批次被填滿才發(fā)送,半滿的批次,甚至只包含一個消息的批次也有可能被發(fā)送。所以就算把批次大小設置得很大,也不會造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產者需要更頻繁地發(fā)送消息,會增加一些額外的開銷。
(6) linger.ms
- 類型:long
- 默認值:0
- 可設置值:[0,...]
- 重要性:中等
- 說明:該參數指定了生產者在發(fā)送批次之前等待更多消息加入批次的時間。KafkaProducer 會在批次填滿或 linger.ms 達到上限時把批次發(fā)送出去。默認情況下,只要有可用的線程,生產者就會把消息發(fā)送出去,就算批次里只有一個消息。把 linger.ms 設置成比 0 大的數,讓生產者在發(fā)送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發(fā)送更多的消息,每個消息的開銷就變小了)。
(7) max.in.flight.requests.per.connection
- 類型:int
- 默認值:5
- 可設置值:[1,...]
- 重要性:低
- 說明:該參數指定了生產者在收到服務器響應之前可以發(fā)送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。
把它設為 1 可以保證消息是按照發(fā)送的順序寫入服務器的,即使發(fā)生了重試。
(8) max.request.size
- 類型:int
- 默認值:1048576
- 可設置值:[0,...]
- 重要性:中等
- 說明:該參數用于控制生產者發(fā)送的請求大小。它可以指能發(fā)送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1MB,那么可以發(fā)送的單個最大消息為 1MB,或者生產者可以在單個請求里發(fā)送一個批次,該批次包含了 1000 個消息,每個消息大小為 1KB。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes(類型:int,默認值:1000012,大約0.95M,可設置值:[0,...],重要性:高)),所以兩邊的配置最好可以匹配,避免生產者發(fā)送的消息被 broker 拒絕。
(9) receive.buffer.bytes 和 send.buffer.bytes
receive.buffer.bytes
- 類型:int
- 默認值:32768(32K)
- 可設置值:[-1,...]
- 重要性:中等
send.buffer.bytes
- 類型:int
- 默認值:131072(128K)
- 可設置值:[-1,...]
- 重要性:中等
說明:這兩個參數分別指定了 TCP socket 接收和發(fā)送數據包的緩沖區(qū)大小。如果它們被設為 -1,就使用操作系統(tǒng)的默認值。如果生產者或消費者與 broker 處于不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。
(10) client.id
- 類型:string
- 默認值:""
- 可設置值:任意字符串
- 重要性:中等
- 說明:該參數可以是任意的字符串,服務器會用它來識別消息的來源。
(11) request.timeout.ms
- 類型:int
- 默認值:30000
- 可設置值:[0,...]
- 重要性:中等
- 說明:該參數指定了生產者在發(fā)送數據時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發(fā)送數據,要么返回一個錯誤(拋出異?;驁?zhí)行回調)。[metadata.fetch.timeout.ms] and [timeout.ms] have been removed. They were initially deprecated in Kafka 0.9.0.0.
(12) max.block.ms
- 類型:long
- 默認值:60000
- 可設置值:[0,...]
- 重要性:中等
- 說明:該參數指定了在調用 send() 方法或使用 partitionsFor() 方法獲取元數據時生產者的阻塞時間。當生產者的發(fā)送緩沖區(qū)已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。
(13) connections.max.idle.ms
- 類型:long
- 默認值:540000
- 可設置值:[0,...]
- 重要性:中等
- 說明:關閉空閑連接的等待時間,檢測到空閑的連接后,默認等待9分鐘才會關閉這個連接。
(14) metadata.max.age.ms
- 類型:long
- 默認值:300000
- 可設置值:[0,...]
- 重要性:低
- 說明:更新元數據的時間間隔,在等待該參數配置的時間后,即使 producer 沒有發(fā)現(xiàn)任何 partition 或 leader 的變化,也會強制刷新元數據。
(15) reconnect.backoff.ms
- 類型:long
- 默認值:50
- 可設置值:[0,...]
- 重要性:低
- 說明:嘗試重新連接 broker 的時間間隔。
(16) reconnect.backoff.max.ms
- 類型:long
- 默認值:1000
- 可設置值:[0,...]
- 重要性:低
- 說明:如果重新連接的時間累積到達該參數的配置時間還沒有連接到 broker,那么宣告連接失敗。