從編程角度而言,生產(chǎn)者就是負(fù)責(zé)向Kafka發(fā)送消息的應(yīng)用程序。本文使用java語言做詳細(xì)介紹。
一個正常的生產(chǎn)邏輯需要以下幾個步驟:
- 配置生產(chǎn)者客戶端參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實(shí)例。
- 構(gòu)建待發(fā)送的消息。
- 發(fā)送消息。
- 關(guān)閉生產(chǎn)者實(shí)例。
客戶端開發(fā)案例
本文先提供簡單的生產(chǎn)者客戶端程序,然后做具體的改進(jìn)和分析。
<--導(dǎo)入依賴-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
public class Producer {
//鏈接地址
public static final String brokerList = "192.168.0.18:9092";
//主題
public static final String topic = "topic-demo";
//配置參數(shù)
public static Properties initConfig(){
Properties properties = new Properties();
properties.put("bootstrap.servers",brokerList);
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer" );
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("client.id","producer.client.id.demo");
return properties;
}
public static void main(String[] args) throws InterruptedException {
//構(gòu)建發(fā)送主體
KafkaProducer<String, String> producer = new KafkaProducer<>(initConfig());
//構(gòu)建消息體
ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello world");
try {
//發(fā)送消息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
構(gòu)建的消息對象ProducerRecord并不是單純意義上的消息,它包含了多個屬性,原本需要發(fā)送的業(yè)務(wù)相關(guān)的消息體只是其中的一個value屬性,比如“hello world”,ProducerRecord的源碼如下:
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
//省略構(gòu)造方法和getter、setter方法
其中topic和partition字段分別代表消息要發(fā)往的主題和分區(qū)號。headers字段是消息的頭部,Kafka0.11x版本才引入這個屬性,它大多用來設(shè)定一些與應(yīng)用相關(guān)的信息,也可以不設(shè)置。key是用來指定消息的鍵,它不僅是消息的附加信息,還可以用來計(jì)算分區(qū)號進(jìn)而可以讓消息發(fā)往特定的分區(qū)。消息以主題為單位進(jìn)行歸類,而這個key可以讓消息再進(jìn)行二次歸類,同一個key的消息會被劃分到同一個分區(qū)中(事實(shí)上不總是這樣,后面會解釋)。有key的消息還可以支持日志壓縮功能(以后講壓縮)。value是消息體,一般不為空,如果為空則表示特定的消息——墓碑消息。temestamp是指消息的時間戳,它有CreateTime和LogAppendTime兩種類型,前者標(biāo)識消息創(chuàng)建的時間,后者表示消息追加到日志文件的時間。
必要的參數(shù)配置
參考initConfig方法,在創(chuàng)建真正的生產(chǎn)者實(shí)例前需要配置相應(yīng)的參數(shù),比如需要鏈接的kafka集群地址。通常有3個參數(shù)是必填的。
- bootstrap.server:該參數(shù)用來指定生產(chǎn)者客戶端連接Kafka集群所需的broker地址清單,具體的內(nèi)容格式是host1:port1,host2:port2,可以設(shè)置一個或者多個地址,中間以逗號隔開,此參數(shù)的默認(rèn)值為“”。注意這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會從給定的borker里查找到其他broker的信息。不過建議至少設(shè)置兩個以上的borker地址信息,當(dāng)其中一個宕機(jī)時,生產(chǎn)者仍然可以鏈接到集群上。
- key.serializer和value.serializer :broker端接收的消息必須以字節(jié)數(shù)組(byte[])的形式存在。在代碼中使用的
KafkaProducer<String, String>和ProducerRecord<String, String>中的泛型<String,String>對應(yīng)的就是小溪中key和value的類型,生產(chǎn)者客戶端使用這種方式可以讓代碼具有更好的可讀性,不過在發(fā)往broker之前需要將消息中對應(yīng)的key和value做相應(yīng)的序列化操作來轉(zhuǎn)換成字節(jié)數(shù)組。key.serializer和value.serializer這兩個參數(shù)分別用來指定key和value的序列化器,這兩個參數(shù)無默認(rèn)值。后面講如何自定義序列化器。
方法里還設(shè)置了一個參數(shù)client.id,這個參數(shù)用來設(shè)定KafkaProducer對應(yīng)的客戶端id,默認(rèn)值為“”。如果客戶端不設(shè)置,則KafkaProducer會自動生成一個非空字符串,內(nèi)容形式如“producer-1”,即字符串“producer-”與數(shù)字的拼接。
KafkaProducer中的參數(shù)眾多,遠(yuǎn)非實(shí)例方法中的那樣只有4個。一般而言,開發(fā)人員無法記住所有的參數(shù)名,只能有個大概的印象。在實(shí)際使用過程中,諸如key.serializer之類的字符串經(jīng)常由于認(rèn)為因素而書寫錯誤。為此我們可以使用ProducerConfig類來做一定程度上的預(yù)防措施,每個參數(shù)在這個類上都有對應(yīng)的名字。如下圖所示:

我們將initConfig方法做如下修改:
public static Properties initConfig(){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer" );
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
return properties;
}
注意到上面的代碼中key和value對應(yīng)的序列化器名字也容易寫錯,這里通過java的技巧來做進(jìn)一步修改:
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer是線程安全的,可以再多個線程中共享單個實(shí)例,也可以將KafkaProducer實(shí)例進(jìn)行池化。
KafkaProducer中有多個構(gòu)造函數(shù),假如在創(chuàng)建實(shí)例過程中沒有指定key.serializer和value.serializer這兩個參數(shù)的話,實(shí)例就得這么構(gòu)建:
KafkaProducer<String, String> producer
= new KafkaProducer<>(initConfig(),new StringSerializer(),new StringSerializer());
小編不會這么做,一般都是在initConfig函數(shù)里指定所有的參數(shù)。
消息的發(fā)送(同步、異步、回調(diào))
ProducerRecord是消息的載體。topic屬性和value屬性是必填的,其余屬性是選填的,其構(gòu)造方法也有很多:
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
this(topic, partition, timestamp, key, value, (Iterable)null);
}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
this(topic, partition, (Long)null, key, value, headers);
}
public ProducerRecord(String topic, Integer partition, K key, V value) {
this(topic, partition, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, K key, V value) {
this(topic, (Integer)null, (Long)null, key, value, (Iterable)null);
}
public ProducerRecord(String topic, V value) {
this(topic, (Integer)null, (Long)null, (Object)null, value, (Iterable)null);
}
實(shí)際應(yīng)用開發(fā)過程中,創(chuàng)建ProducerRecord對象是一個非常頻繁的動作。創(chuàng)建完消息體后,就可以開始發(fā)送消息了。消息的發(fā)送主要有三種模式
- 發(fā)后即忘(fire-and-forget)
- 同步(sync)
- 異步(async)
案例中的發(fā)送方式就是發(fā)后即忘,它只管往kafka中發(fā)送消息而不關(guān)心消息是否正確送達(dá)。在大多數(shù)情況下,這種發(fā)送方式?jīng)]什么問題,不過在某些時候(比如發(fā)生不可重試異常)會造成消息丟失。這種發(fā)送方式性能最高,可靠性也最差。
send方法返回的并非是void類型,而是Future<RecordMetadata>類型,send()方法有兩個重載方法:
Future<RecordMetadata> send(ProducerRecord<K, V> var1);
Future<RecordMetadata> send(ProducerRecord<K, V> var1, Callback var2);
要實(shí)現(xiàn)同步可以利用返回的Future對象實(shí)現(xiàn),如下所示:
try {
producer.send(record).get();//實(shí)現(xiàn)同步發(fā)送
} catch (InterruptedException |ExecutionException e) {
e.printStackTrace();
}
實(shí)際上,send方法本身就是異步的,send()方法返回的Future對象可以使調(diào)用方稍后獲得發(fā)送的結(jié)果。案例中,send方法之后直接鏈?zhǔn)秸{(diào)用了get()方法來阻塞等待Kafka的響應(yīng),知道消息發(fā)送成功或發(fā)生異常。如果發(fā)生異常,那么就需要捕獲異常并交由邏輯處理層。
也可在調(diào)用send方法之后不直接調(diào)用get方法,比如下面的一種實(shí)現(xiàn)同步的方式:
Future<RecordMetadata> retu = producer.send(record);//send方法本身是異步的
RecordMetadata recordMetadata = null;
try {
recordMetadata = retu.get();//實(shí)現(xiàn)同步發(fā)送
System.out.println("同步發(fā)送成功到:"+recordMetadata.topic());
} catch (InterruptedException |ExecutionException e) {
e.printStackTrace();
}
這樣可以獲取一個RecordMetadata對象,它包含了消息的一些元數(shù)據(jù)信息,比如當(dāng)前消息的主題、分區(qū)號、分區(qū)中的偏移量、時間戳等。如果你需要這些信息,則可以使用這個方式。如果不需要直接使用producer.send(record).get();方式更省事。
Future表示一個任務(wù)的聲明周期,并提供了響應(yīng)的方法來判斷任務(wù)是否已經(jīng)完成或取消,以及獲取任務(wù)的結(jié)果和取消任務(wù)等。也可以使用retu.get(3, TimeUnit.SECONDS);方式來實(shí)現(xiàn)超時阻塞。
KafkaProducer一般會發(fā)生兩種類型的異常:可重試異常和不可重試異常。常見的可重試異常包括:NetworkException、LerderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException等。比如,NetworkException表示網(wǎng)絡(luò)異常,這個有可能是由于網(wǎng)絡(luò)瞬時故障而導(dǎo)致的異常,可以通過重試解決;又比如LerderNotAvailableException表示分區(qū)的leader副本不可用,這個異常通常發(fā)生在leader副本下線而新的leader副本選舉完成之前,重試之后可以重新恢復(fù)。不可重試的異常,如RecordTooLargeException,暗示了所發(fā)送的消息太大,對此不會進(jìn)行任何重試,直接拋出異常。
對于可重試的異常,如果配置了retries參數(shù),那么只要在規(guī)定的重試次數(shù)內(nèi)自行恢復(fù)了,就不會拋出異常。retries的默認(rèn)值為0,配置也很簡單:
properties.put(ProducerConfig.RETRIES_CONFIG,10);
如果重試10次后還沒有恢復(fù),那么仍然會拋出異常,進(jìn)而發(fā)送的外層邏輯處理就要處理這些異常了。
同步方式可靠性高,要么消息發(fā)送成功,要么發(fā)生異常,如果發(fā)生異常則可以捕獲并進(jìn)行相應(yīng)的處理,不會造成消息丟失。不過性能確實(shí)差很多,需要阻塞等待一條消息發(fā)送完成后再繼續(xù)發(fā)送下一條消息。
來了解一下異步發(fā)送方式,一般是在send方法里指定一個Callback回調(diào)函數(shù),Kafka在返回響應(yīng)時調(diào)用該函數(shù)來實(shí)現(xiàn)異步發(fā)送確認(rèn)。Kafka有響應(yīng)時就會回調(diào),要么發(fā)送成功,要么拋出異常。如:
//異步發(fā)送
public static void sendAsyn(ProducerRecord<String, String> record,KafkaProducer<String, String> producer){
//異步發(fā)送
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e!=null){
e.printStackTrace();
}else{
System.out.println("異步發(fā)送成功"+recordMetadata.topic()+":"+recordMetadata.partition());
}
}
});
}
onComplete方法的兩個參數(shù)是互斥的,消息發(fā)送成功時,meta不為null而exception為null;而消息發(fā)送異常時,metadata為null而exception不為null。
對于同一個分區(qū)而言,如果消息record1于record2之前發(fā)送,系統(tǒng)可以保證對應(yīng)的callback1在callback2之前調(diào)用,也就是說回調(diào)函數(shù)的調(diào)用也是可以保證分區(qū)有序。
最后producer.close();方法會阻塞等待之前所有的發(fā)送請求完成后再關(guān)閉KafkaProducer,同時,還提供了一個帶超時的close方法,這個很少用。
序列化
生產(chǎn)者需要用序列化器把對象轉(zhuǎn)換成字節(jié)數(shù)組才能發(fā)給kafka。消費(fèi)者必須用反序列器把從kafka收到的字節(jié)數(shù)組轉(zhuǎn)換成相應(yīng)的對象。上文講的序列化器StringSerializer實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer接口,此外還有
ByteArray、ByteBuffer、Bytes、Double、Integer、Long等序列化器,都實(shí)現(xiàn)了Serializer接口,該接口有3個方法:
void configure(Map<String, ?> var1, boolean var2);
byte[] serialize(String var1, T var2);
void close();
configure用來配置當(dāng)前類,serialize方法用來執(zhí)行序列化操作。而close方法用來關(guān)閉當(dāng)前的序列化器,一般情況下close是個空方法,如果實(shí)現(xiàn)了此方法,必須保證此方法的冪等性,因?yàn)镵afkaProducer可能會調(diào)用多次該方法。
我們先來看一下StringSerializer的源碼,從而引出自定義序列化器的編寫
public class StringSerializer implements Serializer<String> {
private String encoding = "UTF8";
public StringSerializer() {
}
public void configure(Map<String, ?> configs, boolean isKey) {
String propertyName = isKey ? "key.serializer.encoding" : "value.serializer.encoding";
Object encodingValue = configs.get(propertyName);
if (encodingValue == null) {
encodingValue = configs.get("serializer.encoding");
}
if (encodingValue instanceof String) {
this.encoding = (String)encodingValue;
}
}
public byte[] serialize(String topic, String data) {
try {
return data == null ? null : data.getBytes(this.encoding);
} catch (UnsupportedEncodingException var4) {
throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding " + this.encoding);
}
}
public void close() {
}
}
首先是configure()方法,是在創(chuàng)建KafkaProducer實(shí)例的時候調(diào)用的,主要用來確定以編碼類型,不過一般客戶端對于key.serializer.encoding和value.serializer.encodeing這幾個參數(shù)是不會設(shè)置的,默認(rèn)為UTF-8。serialize()方法非常直觀,就是將String類型轉(zhuǎn)換為byte[]類型。
如果Kafka客戶端提供的幾種序列化器都無法滿足你,則可以使用Avro/JSON/Thrift/ProtoBuf和Protostuff等通用的序列化工具來實(shí)現(xiàn),或者使用自定義類型的序列化器來實(shí)現(xiàn)。下面看如何自定義:
首先創(chuàng)建一個業(yè)務(wù)類:
public class User {
private String name;
private int age = -1;
public String getName() {
return name;
}
public User setName(String name) {
this.name = name;
return this;
}
public int getAge() {
return age;
}
public User setAge(int age) {
this.age = age;return this;
}
}
定義序列化器
package serializer;
import bean.User;
import org.apache.kafka.common.serialization.Serializer;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;
//自定義序列化器
public class UserSerializer implements Serializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public byte[] serialize(String s, User user) {
if (user == null) {
return null;
}
byte[] name;
int age = user.getAge();
try {
if (user.getName() != null) {
name = user.getName().getBytes("UTF-8");
} else {
name = new byte[0];
}
//數(shù)組總共的長度
ByteBuffer byteBuffer = ByteBuffer.allocate(4+4+name.length);
//name字節(jié)數(shù)
byteBuffer.putInt(name.length);
//放name字節(jié)數(shù)組
byteBuffer.put(name);
//放age,age本身就是int類型的
byteBuffer.putInt(age);
return byteBuffer.array();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return new byte[0];
}
@Override
public void close() {
}
}
關(guān)于ByteBuffer怎么用,可以參考筆者的《「高并發(fā)通信框架Netty4 源碼解讀(四)」NIO緩沖區(qū)之字節(jié)緩沖區(qū)ByteBuffer詳解》
定義消費(fèi)端的反序列化器
public class UserDeserializer implements Deserializer<User> {
@Override
public void configure(Map<String, ?> map, boolean b) {
}
@Override
public User deserialize(String s, byte[] bytes) {
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
int nameLength = byteBuffer.getInt();
byte name[] = new byte[nameLength];
byteBuffer.get(name,0,nameLength);
int age = byteBuffer.getInt();
return new User().setAge(age).setName(new String(name));
}
@Override
public void close() {
}
}
更改序列化器:
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, UserSerializer.class.getName());
分區(qū)器
消息在發(fā)送過程中,有可能需要經(jīng)過攔截器、序列化器和分區(qū)器的一系列作用之后才能被真正發(fā)往broker。攔截器不是必須的,后面講。序列化器是必須的,消息經(jīng)過序列化后就需要確定它發(fā)往的分區(qū),如果ProducerRecord消息中指定了partition字段那么就不需要分區(qū)器的作用了,因?yàn)閜artition代表的就是所要發(fā)往的分區(qū)號。
如果消息中沒有指定partition字段,那么就需要依賴分區(qū)器,根據(jù)key這個字段來計(jì)算partition的值。分區(qū)器的作用就是為消息分配分區(qū)。
Kafka中提供的默認(rèn)分區(qū)器是DefaultPartitioner,它實(shí)現(xiàn)了Partitioner這個接口,定義的方法如下:
public interface Partitioner extends Configurable, Closeable {
int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);
void close();
}
其中,parition方法用來計(jì)算分區(qū)號,返回值為int類型。partition方法中的參數(shù)分別表示主題、鍵、序列化后的鍵、值、序列化后的值,以及集群的元數(shù)據(jù)信息,通過這些信息可以實(shí)現(xiàn)豐富的分區(qū)器。
在默認(rèn)的分區(qū)器DefaultPartitioner實(shí)現(xiàn)中,如果key不為null,那么默認(rèn)的分區(qū)器會對key進(jìn)行哈希(采用MurmurHash2算法,具備高運(yùn)算性能及低碰撞率),最終根據(jù)得到的哈希值來計(jì)算分區(qū)號,擁有相同的key的消息會被寫入同一個分區(qū),如果key為null,那么消息將會以輪詢的方式發(fā)往主題內(nèi)的各個可用分區(qū)內(nèi),在不改變主題分區(qū)的情況下,key與分區(qū)之間的映射可以保持不變。不過,一旦主題中增加了新的分區(qū),映射就破壞了。
指定分區(qū)器的方式:
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class.getName());//指定分區(qū)器,配合key使用
攔截器
生產(chǎn)者可以用來在消息發(fā)送前做一些準(zhǔn)備工作,比如按照某個規(guī)則過濾不符合要求的消息、修改消息的內(nèi)容,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化要求,比如統(tǒng)計(jì)類工作。
生產(chǎn)者攔截器的使用很簡單,主要自定義實(shí)現(xiàn)ProducerInterceptor接口
public interface ProducerInterceptor<K, V> extends Configurable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);
void onAcknowledgement(RecordMetadata var1, Exception var2);
void close();
}
KafkaProducer在將消息序列化和計(jì)算分區(qū)之前會調(diào)用攔截器的onSend方法來對消息進(jìn)行相應(yīng)的定制化操作。在消息被應(yīng)答之前或消息發(fā)送失敗時調(diào)用生產(chǎn)者攔截器的onAcknowledgement方法,優(yōu)先于用戶設(shè)定的Callback之前執(zhí)行。這個方法運(yùn)行在producer的IO線程中,所以這個方法的實(shí)現(xiàn)越簡單越好,否則影響消息的發(fā)送速度。
自定義攔截器實(shí)現(xiàn):
public class ProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String ,String> {
private volatile long sendSuccess = 0;
private volatile long sendFailure = 0;
@Override
public ProducerRecord onSend(ProducerRecord<String ,String> producerRecord) {
//消息發(fā)送前,進(jìn)行修改操作
String modifieldValue = "prefix-"+producerRecord.value();
return new ProducerRecord<String ,String>(producerRecord.topic(),
producerRecord.partition(),
producerRecord.timestamp(),
producerRecord.key(),
modifieldValue,
producerRecord.headers());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if(e == null){
sendSuccess++;
System.out.println("發(fā)送成功的消息:"+sendSuccess);
}else{
sendFailure++;
System.out.println("發(fā)送失敗的消息:"+sendFailure);
}
}
@Override
public void close() {
System.out.println("發(fā)送成功率:"+(double)sendSuccess/(sendSuccess+sendFailure));
}
@Override
public void configure(Map<String, ?> map) {
}
}
我們在onSend方法上修改了內(nèi)容,發(fā)送內(nèi)容前加上了prefix-前綴,onAcknowledgement用來統(tǒng)計(jì)發(fā)送成功與失敗的消息數(shù)。
攔截器的配置:
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class.getName()+","+ ProducerInterceptor2.class.getName());
多個攔截器用逗號分隔,攔截器的調(diào)用順序會按配置順序調(diào)用。
小總結(jié):生產(chǎn)者調(diào)用順序 :攔截器->序列化器->分區(qū)器
原理剖析
客戶端整體架構(gòu):

整個生產(chǎn)者客戶端由兩個線程協(xié)調(diào)運(yùn)行,分別是主線程和Sender線程(發(fā)送線程)。在主線程中由KafkaProducer創(chuàng)建消息,然后通過可能的攔截器、序列化器和分區(qū)器的作用之后緩存到消息累加器(RecordAccumulator,也成為消息收集器)中。Sender線程負(fù)責(zé)從RecordAccumulator中獲取消息并將其發(fā)送到kafka中。
RecordAccumulator主要用來緩存消息以便Sender線程可以批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸?shù)馁Y源消耗以提升性能。RecordAccumulator緩存的大小可以通過客戶端參數(shù)buffer.memory配置,默認(rèn)值為33554432B,即32MB。
//設(shè)置緩沖區(qū)大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"33554432");
如果生產(chǎn)者發(fā)送消息的速度超過發(fā)送服務(wù)器的速度,則會導(dǎo)致生產(chǎn)者空間不足,這個時候KafkaProducer的send()方法調(diào)用要么被阻塞,要么拋異常,這個取決于參數(shù)max.block.ms的配置,此參數(shù)的默認(rèn)值為60000,即60秒。
//設(shè)置阻塞異常的
properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG,"5000");//3s
主線程中發(fā)送過來的消息都會被追加到RecordAccumulator的某個雙端隊(duì)列中,在RecordAccumulator內(nèi)部為每個分區(qū)都維護(hù)了一個雙端隊(duì)列,隊(duì)列中的內(nèi)容就是ProducerBatch,即Deque<ProducerBatch>。消息寫入緩存時,追加到雙端隊(duì)列尾部;Sender讀取消息時,從雙端隊(duì)列的頭部讀取。注意ProducerBatch不是ProducerRecord,ProducerBatch中可以包含一個至多個ProducerRecord。通俗的說,ProducerRecord是生產(chǎn)者中創(chuàng)建的消息,而ProducerBatch是指一個消息批次,ProducerRecord會包含在ProducerBatch中,這樣可以使字節(jié)的使用更加緊湊。與此同時,將較小的ProducerRecord拼湊成一個較大的ProducerBatch,也可以減少網(wǎng)絡(luò)請求次數(shù)以提升整體的吞吐量。如果生產(chǎn)者客戶端需要向很多分區(qū)發(fā)送消息,則可以將buffer.memory參數(shù)適當(dāng)調(diào)大以增加整體的吞吐量。
消息在網(wǎng)絡(luò)上都是以字節(jié)(Byte)的形式傳輸?shù)模诎l(fā)送之前需要創(chuàng)建一塊內(nèi)存區(qū)域來保存對應(yīng)的消息。在Kafka生產(chǎn)者客戶端中,通過java.io.ButeBuffer實(shí)現(xiàn)消息內(nèi)存的創(chuàng)建和釋放。不過頻繁的創(chuàng)建和釋放是比較耗資源的,在RecordAccumulator的內(nèi)部還有一個BufferPool,它主要實(shí)現(xiàn)ByteBuffer的復(fù)用,以實(shí)現(xiàn)緩存的高效利用。不過,ByteBuffer只針對特定大小的ByteBuffer進(jìn)行管理,而其他大小的ByteBuffer不會緩存進(jìn)BufferPool,這個特定的大小由batch.size參數(shù)來指定,默認(rèn)為16384B,即16KB,我們可以適當(dāng)調(diào)大batch.size參數(shù)以便多緩存一些消息。
ProducerBatch的大小和batch.size參數(shù)也有密切的關(guān)系。當(dāng)一條消息(ProducerRecord)流入到RecordAccumulator后時,會先尋找與消息分區(qū)對應(yīng)的雙端隊(duì)列,再從這個雙端隊(duì)列尾部獲取一個ProducerBatch并查看是否還可以寫入ProducerRecord,如果可以則寫入,如果不可以則需要創(chuàng)建一個新的ProducerBatch。在新建ProducerBatch時評估這條消息的大小是否超過batch.size參數(shù)的大小,如果不超過,那么就以參數(shù)的大小來創(chuàng)建ProducerBatch,這樣在使用完這段內(nèi)存區(qū)域后,可以通過BufferPool來管理進(jìn)行復(fù)用;如果超過,那么就以評估的大小來創(chuàng)建ProducerBatch,這段內(nèi)存區(qū)域不會被復(fù)用。
Sender從RecordAccumulator獲取緩存的消息后,會進(jìn)一步量原本<分區(qū),Deque<ProducerBatch>>的保存形式轉(zhuǎn)變成<Node,List<ProducerBatc>>的形式,其中,node表示kafka集群的broker節(jié)點(diǎn)。對于網(wǎng)絡(luò)連接來說,生產(chǎn)者客戶端是與具體的broker節(jié)點(diǎn)建立連接的,也就是向具體的broker節(jié)點(diǎn)發(fā)送消息,而不關(guān)心消息屬于哪一個分區(qū);而對于KafkaProducer的應(yīng)用邏輯而言,我們只關(guān)注向哪一個分區(qū)中發(fā)送消息,所以在這里需要做一個應(yīng)用邏輯層面到網(wǎng)絡(luò)IO層面的轉(zhuǎn)換。
轉(zhuǎn)變成<Node,List<ProducerBatc>>的形式后,sender進(jìn)一步封裝成<Node,Request>的形式,這樣就可以發(fā)往各個Node了,這里的Request是Kafka的各種協(xié)議請求。
請求在發(fā)往Kafka前,還會保存在InFlightRequests中,它保存對象的具體形式為Map<NodedId,Deque<Request>>,它的主要作用是緩存了已經(jīng)發(fā)出去但還沒有收到響應(yīng)的請求,NodedId是一個String類型,表示節(jié)點(diǎn)的ID編號。
與此同時InFlightRequests還提供了許多管理類的方法,并且通過參數(shù)配置還可以限制每個鏈接(也就是客戶端與Node之間的鏈接)最多緩存的請求數(shù),這個配置參數(shù)為max.in.flight.requests.per.connection,默認(rèn)值為5,即每個鏈接最多只能緩存5個未響應(yīng)的請求,超過該數(shù)值后就不能向這個連接發(fā)送更多的請求了,除非緩存中有請求收到了響應(yīng)。通過比較Deque<Request>的size與這個參數(shù)的大小來判斷對應(yīng)的Node中是否已經(jīng)堆積了很多未響應(yīng)的消息,如果真是如此,那么說明這個Node節(jié)點(diǎn)負(fù)載較大或者網(wǎng)絡(luò)連接有問題,再繼續(xù)向其發(fā)送請求會增大請求超時的可能。
元數(shù)據(jù)的更新
KafkaProducer要將消息追加到指定主題的某個分區(qū)所對應(yīng)的leader副本之前,首先需要知道主題的分區(qū)數(shù)量,然后經(jīng)過計(jì)算得出(或直接指定)目標(biāo)分區(qū),之后需要知道目標(biāo)分區(qū)的leader副本所在的broker節(jié)點(diǎn)的地址、端口號等信息才能建立連接,最終才能將消息發(fā)送到Kafka,在這過程中需要的信息都屬于元數(shù)據(jù)信息。
案例中,我們了解到bootstrap.servers參數(shù)只需要配置部分broker節(jié)點(diǎn)的地址即可, 不需要配置所有broker節(jié)點(diǎn)的地址,因?yàn)榭蛻舳俗约嚎梢园l(fā)現(xiàn)其他broker的節(jié)點(diǎn)地址,這一過程也屬于元數(shù)據(jù)相關(guān)的更新操作。與此同時,分區(qū)數(shù)量及l(fā)eader副本的分布都會動態(tài)地變化,客戶端也需要動態(tài)的捕獲這些變化。
元數(shù)據(jù)是指Kafka集群的元數(shù)據(jù),這些元數(shù)據(jù)具體記錄了集群中有哪些主題,這些主題有哪些分區(qū),每個分區(qū)的leader副本分配在哪個節(jié)點(diǎn)上,follower副本分配在哪個節(jié)點(diǎn)上,哪些副本在AR、ISR等集合中,集群中有哪些數(shù)據(jù),控制器節(jié)點(diǎn)又有哪一個等信息。
當(dāng)客戶端沒有需要使用的元數(shù)據(jù)時,比如沒有指定的主題信息,或者超過metadata.max.age.ms時間沒有更新元數(shù)據(jù)都會引起元數(shù)據(jù)的更新操作。該參數(shù)的默認(rèn)值為300000,即5分鐘 。元數(shù)據(jù)的更新操作是在客戶端的內(nèi)部進(jìn)行的,對客戶端的外部使用者不可見。
//客戶端更新kafka集群元數(shù)據(jù)的時間間隔,默認(rèn)5分鐘
properties.put(ProducerConfig.METADATA_MAX_AGE_CONFIG,300000);
重要的生產(chǎn)者參數(shù)
在KafkaProducer中,大部分參數(shù)都有合理的默認(rèn)值,一般不需要修改它們,不過了解這些參數(shù)可以讓我們更合理的使用生產(chǎn)者客戶端,其中還有一些參數(shù)涉及程序的可用性和性能。
acks
這個參數(shù) 指定分區(qū)中必須要有多少個副本收到這條消息,之后生產(chǎn)者才會認(rèn)為這條消息時成功寫入的,acks是生產(chǎn)者客戶端中一個非常重要的參數(shù),它涉及消息的可靠性和吞吐量之間的平衡。acks參數(shù)有3種類型的值(都是字符串類型):
acks = 1
默認(rèn)值就為1。生產(chǎn)者發(fā)送消息之后,只要leader副本成功寫入消息,那么它就會收到來自服務(wù)端的成功響應(yīng)。如果消息無法寫入leader副本,比如在leader副本崩潰、重新選舉新的leader副本的過程中,那么生產(chǎn)者就會收到一個錯誤的響應(yīng),為了避免消息丟失,生產(chǎn)者可以重發(fā)消息。如果消息寫入leader副本并返回成功響應(yīng)給生產(chǎn)者,且在被其他follower副本拉取之前l(fā)eader副本崩潰,那么此時消息還是會丟失,因?yàn)橹匦逻x舉的leader副本中并沒有這條對應(yīng)的消息。acks=1是消息可靠性和吞吐量之間的折中方案。acks=0
生產(chǎn)者發(fā)送消息后不需要等待任何服務(wù)端的響應(yīng)。在其他配置環(huán)境下,acks=0可達(dá)到最大吞吐量。acks=-1或acks=all
生產(chǎn)者在發(fā)送消息后,需要等到所有ISR中的所有副本都成功寫入消息之后才能夠收到來自服務(wù)端的成功響應(yīng)。在其他配置環(huán)境下相同的情況下,可以達(dá)到最強(qiáng)的可靠性。但這不意味著消息一定可靠,因?yàn)镮SR中可能只有l(wèi)eader副本,這樣就退化成了acks=1的情況,要獲得最強(qiáng)的消息可靠性要配合min.insync.replicas等參數(shù)的聯(lián)動配合。
//leader副本成功收到消息后返回響應(yīng),不管follower副本
properties.put(ProducerConfig.ACKS_CONFIG,"1");
max.request.size
這個參數(shù)用來限制生產(chǎn)者客戶端能發(fā)送消息的最大值,默認(rèn)為1048576B,即1MB。一般情況下這個默認(rèn)值就可以滿足大多數(shù)的應(yīng)用場景了。筆者不建議盲目的增大這個參數(shù)值,尤其是對Kafka整體脈絡(luò)沒有足夠把控的時候。因?yàn)檫@個參數(shù)還涉及其他一些參數(shù)的聯(lián)動,比如broker端的message.max.bytes參數(shù),如果配置錯誤會引起一些不必要的異常。
retries 和 retry.backoff.ms
retries用來配置生產(chǎn)者重試的次數(shù),默認(rèn)值為0,即在發(fā)生異常的時候不進(jìn)行任何重試動作。消息在從生產(chǎn)者發(fā)出到成功寫入服務(wù)器之前可能發(fā)生一些臨時性的異常,比如網(wǎng)絡(luò)抖動、leader副本的選舉等,這種異常往往是可以自行恢復(fù)的,生產(chǎn)者可以通過配置retries大于0的值,以此通過內(nèi)部重試來恢復(fù)而不是一味地將異常拋給生產(chǎn)者應(yīng)用程序。如果重試達(dá)到設(shè)定的次數(shù),那么生產(chǎn)者就會放棄重試返回異常。不過不是所有的異常都是可以通過重試來解決的,如消息太大,超過max.request.size參數(shù)配置的時候,這種方式就不行了。
重試還和另外一個參數(shù)retry.backoff.ms有關(guān),這個參數(shù)的默認(rèn)值為100,它用來設(shè)定兩次重試之間的時間間隔,避免無效的頻繁重試。
Kafka可以保證同一個分區(qū)中的消息是有序的。如果生產(chǎn)者按照一定順序發(fā)送消息,那么這些消息也會順訊的寫入分區(qū),進(jìn)而消費(fèi)者也可以按照順序消費(fèi)。如果將acks參數(shù)配置為非零值,并且max.in.flight.request.per.connection參數(shù)配置大于1的值,那么就會出現(xiàn)錯序的現(xiàn)象:如果第一批消息寫入失敗,而第二批消息寫入成功,那么生產(chǎn)者會重試發(fā)送第一批次的消息,此時第一批次的消息寫入成功,那么這兩批消息的順序就會發(fā)生錯序。一般而言,在需要保證消息順序的場合建議把參數(shù)max.in.flight.requests.per.connection配置為1,而不是把a(bǔ)cks配置為0,不過這樣會影響整體的吞吐。
compression.type
這個參數(shù)用來指定消息的壓縮方式,默認(rèn)為none,默認(rèn)情況下不會壓縮消息。該參數(shù)還可以配置“gzip”/"snappy"和“l(fā)z4”。對消息進(jìn)行壓縮可以極大較少網(wǎng)絡(luò)傳輸量、降低網(wǎng)絡(luò)IO,從而提高整體性能。消息壓縮是一種使用時間換空間的優(yōu)化方式。如果對時延有一定的要求,則不推薦對消息進(jìn)行壓縮。
connections.max.idle.ms
這個參數(shù)用來指定在多久之后關(guān)閉限制的連接,默認(rèn)值是540000(ms),即9分鐘。
linger.ms
這個參數(shù)用來指定生產(chǎn)者發(fā)送ProducerBatch之前等待更多消息(ProducerRecord)加入ProducerBatch的時間,默認(rèn)值為0。生產(chǎn)者客戶端會在ProducerBatch被填滿或等待時間超過linger.ms值時發(fā)送出去。增大這個參數(shù)的值會增加消息的延遲,但是同時能提升一定的吞吐量。這個linger.ms參數(shù)與TCP協(xié)議中的Nagle算法有異曲同工之妙。
receive.buffer.bytes
這個參數(shù)用來設(shè)置Socket接收消息緩沖區(qū)(SO_RECBUF)的大小,默認(rèn)值為32768(B),即32KB。如果設(shè)置為-1,則使用操作系統(tǒng)默認(rèn)值。如果Producer與Kafka處于不同的機(jī)房,則可以適當(dāng)調(diào)大這個參數(shù)。
send.buffer.bytes
這個參數(shù)用來設(shè)置Socket發(fā)送消息緩沖區(qū)(SO_RECBUF)的大小,默認(rèn)值為131072(B),即128KB。與receiver.buffer.bytes參數(shù)一樣,如果設(shè)置為-1,則使用操作系統(tǒng)的默認(rèn)值。
request.timeout.ms
這個參數(shù)用來配置Producer等待請求響應(yīng)的最長時間,默認(rèn)值為30000(ms)。請求超時之后可以選擇進(jìn)行重試。注意這個參數(shù)需要比broker端參數(shù)replica.lag.time.max.ms的值要大,這樣可以減少因客戶端重試而引起的消息重復(fù)的概率。
還有一些參數(shù)沒有提及,這些參數(shù)同樣非常重要,他們需要單獨(dú)的文章或場景來描述。以后的博文會慢慢降到,歡迎大家關(guān)注。