我花了一周讀了Kafka Producer的源碼

talk is easy,show me the code,先來看一段創(chuàng)建producer的代碼

public class KafkaProducerDemo {

  public static void main(String[] args) {

    KafkaProducer<String,String> producer = createProducer();

    //指定topic,key,value
    ProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");

    //異步發(fā)送
    producer.send(record);
    producer.close();

    System.out.println("發(fā)送完成");

  }

  public static KafkaProducer<String,String> createProducer() {
    Properties props = new Properties();

    //bootstrap.servers 必須設置
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.131:9092");

    // key.serializer   必須設置
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    // value.serializer  必須設置
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    //client.id
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "client-0");

    //retries
    props.put(ProducerConfig.RETRIES_CONFIG, 3);

    //acks
    props.put(ProducerConfig.ACKS_CONFIG, "all");

    //max.in.flight.requests.per.connection
    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
    
    //linger.ms
    props.put(ProducerConfig.LINGER_MS_CONFIG, 100);

    //batch.size
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 10240);

    //buffer.memory
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10240);

    return new KafkaProducer<>(props);
  }
}

生產(chǎn)者的API使用還是比較簡單,創(chuàng)建一個ProducerRecord對象(這個對象包含目標主題和要發(fā)送的內容,當然還可以指定鍵以及分區(qū)),然后調用send方法就把消息發(fā)送出去了。在發(fā)送ProducerRecord對象時,生產(chǎn)者要先把鍵和值對象序列化成字節(jié)數(shù)組,這樣才能在網(wǎng)絡上進行傳輸。
在深入源碼之前,我先給出一張源碼分析圖給大家(其實應該在結尾的時候給出來),這樣看著圖再看源碼跟容易些

流程圖

簡要說明:

  1. new KafkaProducer()后創(chuàng)建一個后臺線程KafkaThread(實際運行線程是Sender,KafkaThread是對Sender的封裝)掃描RecordAccumulator中是否有消息

  2. 調用KafkaProducer.send()發(fā)送消息,實際是將消息保存到RecordAccumulator中,實際上就是保存到一個Map中(ConcurrentMap<TopicPartition, Deque<ProducerBatch>>),這條消息會被記錄到同一個記錄批次(相同主題相同分區(qū)算同一個批次)里面,這個批次的所有消息會被發(fā)送到相同的主題和分區(qū)上

  3. 后臺的獨立線程掃描到RecordAccumulator中有消息后,會將消息發(fā)送到kafka集群中(不是一有消息就發(fā)送,而是要看消息是否ready)

  4. 如果發(fā)送成功(消息成功寫入kafka),就返回一個RecordMetaData對象,它包換了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量。

  5. 如果寫入失敗,就會返回一個錯誤,生產(chǎn)者在收到錯誤之后會嘗試重新發(fā)送消息(如果允許的話,此時會將消息在保存到RecordAccumulator中),幾次之后如果還是失敗就返回錯誤消息

源碼分析

后臺線程的創(chuàng)建

KafkaClient client = new NetworkClient(...);
this.sender = new Sender(.,client,...);
String ioThreadName = "kafka-producer-network-thread" + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();

上面的代碼就是構造KafkaProducer時核心邏輯,它會構造一個KafkaClient負責和broker通信,同時構造一個Sender并啟動一個異步線程,這個線程會被命名為:kafka-producer-network-thread|${clientId},如果你在創(chuàng)建producer的時候指定client.id的值為myclient,那么線程名稱就是kafka-producer-network-thread|myclient

發(fā)送消息(緩存消息)

KafkaProducer<String,String> producer = createProducer();

//指定topic,key,value
ProducerRecord<String,String> record = new ProducerRecord<>("test1","newkey1","newvalue1");

//異步發(fā)送,可以設置回調函數(shù)
producer.send(record);
//同步發(fā)送
//producer.send(record).get();

發(fā)送消息有同步發(fā)送以及異步發(fā)送兩種方式,我們一般不使用同步發(fā)送,畢竟太過于耗時,使用異步發(fā)送的時候可以指定回調函數(shù),當消息發(fā)送完成的時候(成功或者失敗)會通過回調通知生產(chǎn)者。

發(fā)送消息實際上是將消息緩存起來,核心代碼如下:

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, 
  serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);

RecordAccumulator的核心數(shù)據(jù)結構是ConcurrentMap<TopicPartition, Deque<ProducerBatch>>,會將相同主題相同Partition的數(shù)據(jù)放到一個Deque(雙向隊列)中,這也是我們之前提到的同一個記錄批次里面的消息會發(fā)送到同一個主題和分區(qū)的意思。append()方法的核心源碼如下:

//從batchs(ConcurrentMap<TopicPartition, Deque<ProducerBatch>>)中
//根據(jù)主題分區(qū)獲取對應的隊列,如果沒有則new ArrayDeque<>返回
Deque<ProducerBatch> dq = getOrCreateDeque(tp);

//計算同一個記錄批次占用空間大小,batchSize根據(jù)batch.size參數(shù)決定
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(
    maxUsableMagic, compression, key, value, headers));

//為同一個topic,partition分配buffer,如果同一個記錄批次的內存不足,
//那么會阻塞maxTimeToBlock(max.block.ms參數(shù))這么長時間
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
  //創(chuàng)建MemoryRecordBuilder,通過buffer初始化appendStream(DataOutputStream)屬性
  MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
  ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());

  //將key,value寫入到MemoryRecordsBuilder中的appendStream(DataOutputStream)中
  batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());

  //將需要發(fā)送的消息放入到隊列中
  dq.addLast(batch);
}

發(fā)送消息到Kafka

上面已經(jīng)將消息存儲RecordAccumulator中去了,現(xiàn)在看看怎么發(fā)送消息。上面我們提到了創(chuàng)建KafkaProducer的時候會啟動一個異步線程去從RecordAccumulator中取得消息然后發(fā)送到Kafka,發(fā)送消息的核心代碼是Sender.java,它實現(xiàn)了Runnable接口并在后臺一直運行處理發(fā)送請求并將消息發(fā)送到合適的節(jié)點,直到KafkaProducer被關閉

/**
* The background thread that handles the sending of produce requests to the Kafka cluster. This thread makes metadata
* requests to renew its view of the cluster and then sends produce requests to the appropriate nodes.
*/
public class Sender implements Runnable {

  public void run() {

    // 一直運行直到kafkaProducer.close()方法被調用
    while (running) {
       run(time.milliseconds());
    }
    
    //從日志上看是開始處理KafkaProducer被關閉后的邏輯
    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

    //當非強制關閉的時候,可能還仍然有請求并且accumulator中還仍然存在數(shù)據(jù),此時我們需要將請求處理完成
    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
       run(time.milliseconds());
    }
    if (forceClose) {
        //如果是強制關閉,且還有未發(fā)送完畢的消息,則取消發(fā)送并拋出一個異常new KafkaException("Producer is closed forcefully.")
        this.accumulator.abortIncompleteBatches();
    }
    ...
  }
}

KafkaProducer的關閉方法有2個,close()以及close(long timeout,TimeUnit timUnit),其中timeout參數(shù)的意思是等待生產(chǎn)者完成任何待處理請求的最長時間,第一種方式的timeout為Long.MAX_VALUE毫秒,如果采用第二種方式關閉,當timeout=0的時候則表示強制關閉,直接關閉Sender(設置running=false)。

run(long)方法中我們先跳過對transactionManager的處理,查看發(fā)送消息的主要流程如下:

//將記錄批次轉移到每個節(jié)點的生產(chǎn)請求列表中
long pollTimeout = sendProducerData(now);

//輪詢進行消息發(fā)送
client.poll(pollTimeout, now);

首先查看sendProducerData()方法,它的核心邏輯在sendProduceRequest()方法(處于Sender.java)中

for (ProducerBatch batch : batches) {
    TopicPartition tp = batch.topicPartition;

    //將ProducerBatch中MemoryRecordsBuilder轉換為MemoryRecords(發(fā)送的數(shù)據(jù)就在這里面)
    MemoryRecords records = batch.records();
    produceRecordsByPartition.put(tp, records);
}

ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
        produceRecordsByPartition, transactionalId);

//消息發(fā)送完成時的回調
RequestCompletionHandler callback = new RequestCompletionHandler() {
    public void onComplete(ClientResponse response) {
        //處理響應消息
        handleProduceResponse(response, recordsByPartition, time.milliseconds());
    }
};

//根據(jù)參數(shù)構造ClientRequest,此時需要發(fā)送的消息在requestBuilder中
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
        requestTimeoutMs, callback);

//將clientRequest轉換成Send對象(Send.java,包含了需要發(fā)送數(shù)據(jù)的buffer),
//給KafkaChannel設置該對象,記住這里還沒有發(fā)送數(shù)據(jù)
client.send(clientRequest, now);

上面的client.send()方法最終會定位到NetworkClient.doSend()方法,所有的請求(無論是producer發(fā)送消息的請求還是獲取metadata的請求)都是通過該方法設置對應的Send對象。所支持的請求在ApiKeys.java中都有定義,這里面可以看到每個請求的request以及response對應的數(shù)據(jù)結構。

上面只是設置了發(fā)送消息所需要準備的內容,現(xiàn)在進入到發(fā)送消息的主流程,發(fā)送消息的核心代碼在Selector.java的pollSelectionKeys()方法中,代碼如下:

/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
  //底層實際調用的是java8 GatheringByteChannel的write方法
  channel.write();
}

就這樣,我們的消息就發(fā)送到了broker中了,發(fā)送流程分析完畢,這個是完美的情況,但是總會有發(fā)送失敗的時候(消息過大或者沒有可用的leader),那么發(fā)送失敗后重發(fā)又是在哪里完成的呢?還記得上面的回調函數(shù)嗎?沒錯,就是在回調函數(shù)這里設置的,先來看下回調函數(shù)源碼

private void handleProduceResponse(ClientResponse response, Map<TopicPartition, ProducerBatch> batches, long now) {
  RequestHeader requestHeader = response.requestHeader();

  if (response.wasDisconnected()) {
    //如果是網(wǎng)絡斷開則構造Errors.NETWORK_EXCEPTION的響應
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, now, 0L);

  } else if (response.versionMismatch() != null) {

   //如果是版本不匹配,則構造Errors.UNSUPPORTED_VERSION的響應
    for (ProducerBatch batch : batches.values())
        completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now, 0L);

  } else {
    
    if (response.hasResponse()) {
        //如果存在response就返回正常的response
           ...
        }
    } else {

        //如果acks=0,那么則構造Errors.NONE的響應,因為這種情況只需要發(fā)送不需要響應結果
        for (ProducerBatch batch : batches.values()) {
            completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.NONE), correlationId, now, 0L);
        }
    }
  }
}

而在completeBatch方法中我們主要關注失敗的邏輯處理,核心源碼如下:

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                           long now, long throttleUntilTimeMs) {
  Errors error = response.error;

  //如果發(fā)送的消息太大,需要重新進行分割發(fā)送
  if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 &&
        (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {

    this.accumulator.splitAndReenqueue(batch);
    this.accumulator.deallocate(batch);
    this.sensors.recordBatchSplit();

  } else if (error != Errors.NONE) {

    //發(fā)生了錯誤,如果此時可以retry(retry次數(shù)未達到限制以及產(chǎn)生異常是RetriableException)
    if (canRetry(batch, response)) {
        if (transactionManager == null) {
            //把需要重試的消息放入隊列中,等待重試,實際就是調用deque.addFirst(batch)
            reenqueueBatch(batch, now);
        } 
    } 
}

Producer發(fā)送消息的流程已經(jīng)分析完畢,現(xiàn)在回過頭去看流程圖會更加清晰。

更多關于Kafka協(xié)議的涉及可以參考這個鏈接

分區(qū)算法

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
    //如果key為null,則使用Round Robin算法
    int nextValue = nextValue(topic);
    List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
    if (availablePartitions.size() > 0) {
        int part = Utils.toPositive(nextValue) % availablePartitions.size();
        return availablePartitions.get(part).partition();
    } else {
        // no partitions are available, give a non-available partition
        return Utils.toPositive(nextValue) % numPartitions;
    }
} else {
    // 根據(jù)key進行散列
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}

Kafka中對于分區(qū)的算法有兩種情況

  1. 如果鍵值為null,并且使用了默認的分區(qū)器,那么記錄鍵隨機地發(fā)送到主題內各個可用的分區(qū)上。分區(qū)器使用輪詢(Round Robin)算法鍵消息均衡地分布到各個分區(qū)上。
  2. 如果鍵不為空,并且使用了默認的分區(qū)器,那么Kafka會對鍵進行散列(使用Kafka自己的散列算法,即使升級Java版本,散列值也不會發(fā)生變化),然后根據(jù)散列值把消息映射到特定的分區(qū)上。同一個鍵總是被映射到同一個分區(qū)上(如果分區(qū)數(shù)量發(fā)生了變化則不能保證),映射的時候會使用主題所有的分區(qū),而不僅僅是可用分區(qū),所以如果寫入數(shù)據(jù)分區(qū)是不可用的,那么就會發(fā)生錯誤,當然這種情況很少發(fā)生。

如果你想要實現(xiàn)自定義分區(qū),那么只需要實現(xiàn)Partitioner接口即可。

生產(chǎn)者的配置參數(shù)

分析了KafkaProducer的源碼之后,我們會發(fā)現(xiàn)很多參數(shù)是貫穿在整個消息發(fā)送流程,下面列出了一些KafkaProducer中用到的配置參數(shù)。

  1. acks
    acks參數(shù)指定了必須要有多少個分區(qū)副本收到該消息,producer才會認為消息寫入是成功的。有以下三個選項

    • acks=0,生產(chǎn)者不需要等待服務器的響應,也就是說如果其中出現(xiàn)了問題,導致服務器沒有收到消息,生產(chǎn)者就無從得知,消息也就丟失了,當時由于不需要等待響應,所以可以以網(wǎng)絡能夠支持的最大速度發(fā)送消息,從而達到很高的吞吐量。

    • acks=1, 只需要集群的leader收到消息,生產(chǎn)者就會收到一個來自服務器的成功響應。如果消息無法到達leader,生產(chǎn)者會收到一個錯誤響應,此時producer會重發(fā)消息。不過如果一個沒有收到消息的節(jié)點稱為leader,消息還是會丟失。

    • acks=all,當所有參與復制的節(jié)點全部收到消息的時候,生產(chǎn)者才會收到一個來自服務器的成功響應,最安全不過延遲比較高。

  2. buffer.memory

設置生產(chǎn)者內存緩沖區(qū)的大小,如果應用程序發(fā)送消息的速度超過生產(chǎn)者發(fā)送到服務器的速度,那么就會導致生產(chǎn)者空間不足,此時send()方法要么被阻塞,要么拋出異常。取決于如何設置max.block.ms,表示在拋出異常之前可以阻塞一段時間。

  1. retries

發(fā)送消息到服務器收到的錯誤可能是可以臨時的錯誤(比如找不到leader),這種情況下根據(jù)該參數(shù)決定生產(chǎn)者重發(fā)消息的次數(shù)。注意:此時要根據(jù)重試次數(shù)以及是否是RetriableException來決定是否重試。

  1. batch.size

當有多個消息需要被發(fā)送到同一個分區(qū)的時候,生產(chǎn)者會把他們放到同一個批次里面(Deque),該參數(shù)指定了一個批次可以使用的內存大小,按照字節(jié)數(shù)計算,當批次被填滿,批次里的所有消息會被發(fā)送出去。不過生產(chǎn)者并不一定會等到批次被填滿才發(fā)送,半滿甚至只包含一個消息的批次也有可能被發(fā)送。

  1. linger.ms

指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時間。KafkaProducer會在批次填滿或linger.ms達到上限時把批次發(fā)送出去。把linger.ms設置成比0大的數(shù),讓生產(chǎn)者在發(fā)送批次之前等待一會兒,使更多的消息加入到這個批次,雖然這樣會增加延遲,當時也會提升吞吐量。

  1. max.block.ms

指定了在調用send()方法或者partitionsFor()方法獲取元數(shù)據(jù)時生產(chǎn)者的阻塞時間。當生產(chǎn)者的發(fā)送緩沖區(qū)已滿,或者沒有可用的元數(shù)據(jù)時,這些方法就會阻塞。在阻塞時間達到max.block.ms時,就會拋出new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");

  1. client.id

任意字符串,用來標識消息來源,我們的后臺線程就會根據(jù)它來起名兒,線程名稱是kafka-producer-network-thread|{client.id}

  1. max.in.flight.requests.per.connection

該參數(shù)指定了生產(chǎn)者在收到服務器響應之前可以發(fā)送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。把它設為1可以保證消息是按照發(fā)送的順序寫入服務器的,即便發(fā)生了重試。

  1. timeout.ms、request.timeout.ms和metadata.fetch.timeout.ms

request.timeout.ms指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時等待服務器返回響應的時間,metadata.fetch.timeout.ms指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標分區(qū)的leader)時等待服務器返回響應的時間。如果等待響應超時,那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回一個錯誤。timeout.ms指定了broker等待同步副本返回消息確認的時間,與asks的配置相匹配——如果在指定時間內沒有收到同步副本的確認,那么broker就會返回一個錯誤。

  1. max.request.size

該參數(shù)用于控制生產(chǎn)者發(fā)送的請求大小。broker對可接收的消息最大值也有自己的限制(message.max.bytes),所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被broker拒絕。

  1. receive.buffer.bytes和send.buffer.bytes

這兩個參數(shù)分別制定了TCP socket接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小(和broker通信還是通過socket)。如果他們被設置為-1,就使用操作系統(tǒng)的默認值。如果生產(chǎn)者或消費者與broker處于不同的數(shù)據(jù)中心,那么可以適當增大這些值,因為跨數(shù)據(jù)中心的網(wǎng)絡一般都有比較高的延遲和比較低的帶寬。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容