kafka-第二章-生產(chǎn)者

學(xué)習(xí)大綱


學(xué)習(xí)大綱

一、kafka java客戶端數(shù)據(jù)生產(chǎn)流程解析

java客戶端數(shù)據(jù)生產(chǎn)流程

一、發(fā)送類型

1、同步發(fā)送
  • 通過send()發(fā)送完消息后返回一個(gè)Future對(duì)象,然后調(diào)用Future對(duì)象的get()方法等待kafka響應(yīng)
  • 如果kafka正常響應(yīng),返回一個(gè)RecordMetadata對(duì)象,該對(duì)象存儲(chǔ)消息的偏移量
  • 如果kafka發(fā)生錯(cuò)誤,無法正常響應(yīng),就會(huì)拋出異常,我們便可以進(jìn)行異常處理
    producer.send(record).get();
2、異步發(fā)送

異步發(fā)送通過callback來監(jiān)聽回調(diào)結(jié)果

//發(fā)送消息
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_TEST, obj);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
      @Override
      public void onFailure(Throwable throwable) {
          //發(fā)送失敗的處理
          log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息失?。? + throwable.getMessage());
      }
      @Override
      public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
           //成功的處理
          log.info(TOPIC_TEST + " - 生產(chǎn)者 發(fā)送消息成功:" + stringObjectSendResult.toString());
     }
});

二、序列化器

消息要到網(wǎng)絡(luò)上進(jìn)行傳輸,必須進(jìn)行序列化,而序列化器的作用就是如此。
Kafka 提供了默認(rèn)的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer),還有整型
( IntegerSerializer)和字節(jié)數(shù)組(BytesSerializer)序列化器等等,這些序列化器都實(shí)現(xiàn)了接口
( org.apache.kafka.common.serialization.Serializer)基本上能夠滿足大部分場景的需求。


序列化器

二、分區(qū)器

  • 本身kafka有自己的分區(qū)策略的,如果未指定,就會(huì)使用默認(rèn)的分區(qū)策略
  • Kafka根據(jù)傳遞消息的key來進(jìn)行分區(qū)的分配,即hash(key) % numPartitions。如果Key相同的話,那么就會(huì)分配到統(tǒng)一分區(qū)。
    源代碼org.apache.kafka.clients.producer.internals.DefaultPartitioner分析
public class DefaultPartitioner implements Partitioner {
    private final StickyPartitionCache stickyPartitionCache = new StickyPartitionCache();

    public DefaultPartitioner() {
    }

    public void configure(Map<String, ?> configs) {
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //判斷當(dāng)前消息是否為null
        if (keyBytes == null) {
            return this.stickyPartitionCache.partition(topic, cluster);
        } else {
            //通過cluster集群和topic主機(jī)獲取分區(qū)列表partitions
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            //獲取分區(qū)的大小
            int numPartitions = partitions.size();
           //通過獲取當(dāng)前消息與分區(qū)大小進(jìn)行取模來得到分區(qū)
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    public void close() {
    }

    public void onNewBatch(String topic, Cluster cluster, int prevPartition) {
        this.stickyPartitionCache.nextPartition(topic, cluster, prevPartition);
    }
}

org.apache.kafka.common.utils

public static int toPositive(int number) {
        return number & 2147483647;
}

public static int murmur2(byte[] data) {
        int length = data.length;
        int seed = -1756908916;
        int m = 1540483477;
        int r = true;
        int h = seed ^ length;
        int length4 = length / 4;

        for(int i = 0; i < length4; ++i) {
            int i4 = i * 4;
            int k = (data[i4 + 0] & 255) + ((data[i4 + 1] & 255) << 8) + ((data[i4 + 2] & 255) << 16) + ((data[i4 + 3] & 255) << 24);
            k *= 1540483477;
            k ^= k >>> 24;
            k *= 1540483477;
            h *= 1540483477;
            h ^= k;
        }

        switch(length % 4) {
        case 3:
            h ^= (data[(length & -4) + 2] & 255) << 16;
        case 2:
            h ^= (data[(length & -4) + 1] & 255) << 8;
        case 1:
            h ^= data[length & -4] & 255;
            h *= 1540483477;
        default:
            h ^= h >>> 13;
            h *= 1540483477;
            h ^= h >>> 15;
            return h;
        }
}

三、攔截器

Producer攔截器(interceptor)是個(gè)相當(dāng)新的功能,它和consumer端interceptor是在Kafka 0.10版本被引入的,主要用于實(shí)現(xiàn)clients端的定制化控制邏輯。生產(chǎn)者攔截器可以用在消息發(fā)送前做一些準(zhǔn)備工作。若要寫自定義攔截器則需實(shí)現(xiàn)org.apache.kafka.clients.producer.internals.ProducerInterceptors
使用場景

  • 1、按照某個(gè)規(guī)則過濾掉不符合要求的消息
  • 2、修改消息的內(nèi)容
  • 3、統(tǒng)計(jì)類需求
自定義攔截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;

public class ProducerInterceptorPrefix implements ProducerInterceptor<String, String> {

    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    public ProducerInterceptorPrefix(List<ProducerInterceptor<String, String>> producerInterceptors) {
        super(producerInterceptors);
    }

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        //該方法封裝進(jìn)kafkaProducer.send方法中,即它運(yùn)行在用戶主線程中。producer確保消息被序列化以及計(jì)算分區(qū)前調(diào)用該方法。我們可以在該方法中對(duì)消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會(huì)影響目標(biāo)分區(qū)的計(jì)算。
        //攔截?cái)?shù)據(jù),給數(shù)據(jù)加上默認(rèn)前綴
        String modifiedValue = "prefix1-" + record.value();
        return new ProducerRecord<>(record.value(), record.partition(), record.timestamp(), record.key(), modifiedValue, record.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        //該方法會(huì)從RecordAccumulator成功發(fā)送到kafka broker之后,或者在發(fā)送過程中失敗時(shí)調(diào)用。并且通常是在producer回調(diào)邏輯觸發(fā)之前。該方法運(yùn)行在producer的IO線程中,因此不要在該方法中放入重要的邏輯,否則會(huì)拖慢producer的消息發(fā)送效率。
        if (exception == null) {
            sendSuccess++;
        } else {
            sendFailure++;
        }
    }

    @Override
    public void close() {
         //關(guān)閉interceptor,清理一些資源。
        //輸出統(tǒng)計(jì)數(shù)目
        System.out.println("當(dāng)前發(fā)送成功總計(jì): " + sendSuccess + " 條,發(fā)送失敗總計(jì): " + sendFailure + " 條");
    }

   @Override
    public void configure(Map<String, ?> map) {

    }
}
添加攔截器
List<String> interceptors = new ArrayList<>();
interceptors.add("com.haijia.kafka.kafka.ProducerInterceptorPrefix");
interceptors.add("com.haijia.kafka.kafka.ProducerInterceptorPrefix2");
properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptors);

總結(jié):
interceptor可能運(yùn)行在多個(gè)線程中,因此在具體的實(shí)現(xiàn)時(shí)用戶需要自行確保線程安全。另外,若指定了多個(gè)interceptor,則producer將按照順序調(diào)用他們,并僅僅是捕獲每個(gè)interceptor可能拋出的異常記錄到錯(cuò)誤日志中而非再向上傳遞。

四、發(fā)送原理剖析

發(fā)送原理圖

消息發(fā)送的過程中,涉及到兩個(gè)線程協(xié)同工作,主線程首先將業(yè)務(wù)數(shù)據(jù)封裝成ProducerRecord對(duì)象,之后調(diào)用send()方法將消息放入RecordAccumulator(消息收集器,也可以理解為主線程與Sender線程直接的緩沖區(qū))中暫存,Sender線程負(fù)責(zé)將消息信息構(gòu)成請(qǐng)求,并最終執(zhí)行網(wǎng)絡(luò)I/O的線程,它從RecordAccumulator中取出消息并批量發(fā)送出去,需要注意的是,KafkaProducer是線程安全的,多個(gè)線程間可以共享使用同一個(gè)KafkaProducer對(duì)象。具體可查看org.apache.kafka.clients.producer.KafkaProducer源碼

    @Override
    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions先經(jīng)過攔截器處理
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }
   ...

五、部分參數(shù)介紹

retries

生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng))。在這種情況下,如果達(dá)到了retires 設(shè)置的次數(shù),生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤。默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待100ms,可以通過retry.backoff.ms 參數(shù)來修改這個(gè)時(shí)間間隔。

batch.size

當(dāng)有多個(gè)消息要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算,而不是消息個(gè)數(shù)。當(dāng)批次被填滿,批次里的所有消息會(huì)被發(fā)送出去。不過生產(chǎn)者并不一定都會(huì)等到批次被填滿才發(fā)送,半滿的批次,甚至只包含一個(gè)消息的批次也可能被發(fā)送。所以就算把batch.size設(shè)置的很大,也不會(huì)造成延遲,只會(huì)占用更多的內(nèi)存而已,如果設(shè)置的太小,生產(chǎn)者會(huì)因?yàn)轭l繁發(fā)送消息而增加一些額外的開銷。

max.request.size

該參數(shù)用于控制生產(chǎn)者發(fā)送的請(qǐng)求大小,它可以指定能發(fā)送的單個(gè)消息的最大值,也可以指單個(gè)請(qǐng)求里所有消息的總大小。broker對(duì)可接收的消息最大值也有自己的限制(message.max.size),所以兩邊的配置最好匹配,避免生產(chǎn)者發(fā)送的消息被broker拒絕。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容