[kafka系列]之producer端消息發(fā)送

本小節(jié)我們來討論Kafka生產(chǎn)者是如何發(fā)送消息到Kafka的, Kafka項目有一個生產(chǎn)者客戶端,我們可以通過這個客戶端的API來發(fā)送消息。生產(chǎn)者客戶端是用Java寫的,但Kafka寫消息的協(xié)議是支持多語言的,其它語言的api可見這個wiki


概要

通過本文,你可以了解到以下內(nèi)容:

  • kafka producer端的整體結(jié)構(gòu),相關(guān)參數(shù)配置,以及性能優(yōu)化;
  • 分區(qū)器,攔截器的擴展;
  • 消息序列化擴展;
  • 分區(qū)器,攔截器,序列化的執(zhí)行順序;

開始

很多做業(yè)務(wù)的同學(xué)都知道,在我們系統(tǒng)中發(fā)送一條消息給kafka 集群,我們只需要簡單的調(diào)一下已經(jīng)封裝好的接口,下面是來于我實際項目中的接口方法:

kafkaProducer.produce(String topic,Object msg)

每次要發(fā)消息,我就是這么簡單的調(diào)用一下就能確保消息能被consumer端正常的消費,但是kafka producer做了哪些工作我卻渾然不知,今天我就跟大家說到底說道這個里面到底有哪些不為人知的操作;

  • 引出第一個問題,kafka消息的發(fā)送是一個什么樣的過程? 這個過程中做了哪些操作?

借助于kafka官網(wǎng)上的API,首先給大家來一張producer端的消息流轉(zhuǎn)圖:

消息發(fā)送.png

接下來,結(jié)合一段代碼,給大家簡單說下流程:

    Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("compresstion.type","snappy");
        props.put("partitioner.class", "org.apache.kafka.clients.producer.internals.DefaultPartitioner");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < MAZ_RETRY_SIZE; i++) {
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
        }
producer.close();

流程如下:

  • 首先需要指定kafka producer端的配置;
  1. zk的地址和端口;
  2. producer端ack應(yīng)答機制,本demo中ack設(shè)置為all,表示生產(chǎn)者會等待所有副本成功寫入該消息,這種方式是最安全的,能夠保證消息不丟失,但是延遲也是最大的;
  3. retries設(shè)置標示消息發(fā)送失敗,生產(chǎn)者可以自動重試,但是此刻設(shè)置為0標示不重試;這個參數(shù)需要結(jié)合retry.backoff.ms(重試等待間隔)來使用,建議總的重試時間比集群重新選舉群首的時間長,這樣可以避免生產(chǎn)者過早結(jié)束重試導(dǎo)致失敗;
  4. batch.size參數(shù)標示生產(chǎn)者為每個分區(qū)維護了一個未發(fā)送記錄的緩沖區(qū),這個緩沖區(qū)的大小由batch.size配置指定,配置的很大可能會導(dǎo)致更多的批處理,也需要更多的內(nèi)存(但是對于每個活動分區(qū),我們通常都有一個這樣的緩沖區(qū)),默認是16384Bytes;
  5. linger.ms 指定生產(chǎn)者在發(fā)送批量消息前等待的時間,當設(shè)置此參數(shù)后,即便沒有達到批量消息的指定大小,到達時間后生產(chǎn)者也會發(fā)送批量消息到broker.默認情況下,生產(chǎn)者的發(fā)送消息線程只要空閑了就會發(fā)送消息,即便只有一條消息.設(shè)置這個參數(shù)后,發(fā)送線程會等待一定的時間,這樣可以批量發(fā)送消息增加吞吐量,但同時也會增加延遲;
  6. buffer.memory控制生產(chǎn)者可用于緩沖的內(nèi)存總量;消息的發(fā)送速度超過了它們可以傳輸?shù)椒?wù)器的速度,那么這個緩沖空間將被耗盡.
    當緩沖區(qū)空間耗盡時,額外的發(fā)送調(diào)用將阻塞.阻止時間的閾值由max.block.ms確定,在此之后它將引發(fā)TimeoutException.這個緩存是針對每個producerThread,不應(yīng)設(shè)置高以免影響內(nèi)存;

生產(chǎn)者如果每發(fā)送一條消息都直接通過網(wǎng)絡(luò)發(fā)送到服務(wù)端,勢必會造成過多 的網(wǎng)絡(luò)請求。如果我們能夠?qū)⒍鄺l消息按照分區(qū)進行分組,并采用批量的方式一次發(fā)送一個消息集,并且對消息集進行壓縮,就可以減少網(wǎng)絡(luò)傳輸?shù)膸?,進一步提高數(shù)據(jù)的傳輸效率。

  1. key.serializervalue.serializer指定了如何將key和value序列化成二進制碼流的方式,也就是上圖中的序列化方式;
  2. compresstion.type:默認情況下消息是不壓縮的,這個參數(shù)可以指定使用消息壓縮,參數(shù)可以取值為snappy、gzip或者lz4;
  • 接下來,我們需要創(chuàng)建一個ProducerRecord,這個對象需要包含消息的topic和值value,可以選擇性指定一個鍵值key或者分區(qū)partition
  • 發(fā)送消息時,生產(chǎn)者會根據(jù)配置的key.serializervalue.serializer對鍵值和值序列化成字節(jié)數(shù)組,然后發(fā)送到分配器partitioner
  • 如果我們指定了分區(qū),那么分配器返回該分區(qū)即可;否則,分配器將會基于鍵值來選擇一個分區(qū)并返回。
  • 選擇完分區(qū)后,生產(chǎn)者知道了消息所屬的主題和分區(qū),它將這條記錄添加到相同主題和分區(qū)的批量消息中,另一個線程負責(zé)發(fā)送這些批量消息到對應(yīng)的Kafka broker。
  • broker接收到消息后,如果成功寫入則返回一個包含消息的主題、分區(qū)及位移的RecordMetadata對象,否則返回異常.
  • 生產(chǎn)者接收到結(jié)果后,對于異常可能會進行重試,根據(jù)參數(shù)reties的配置決定.

kafka發(fā)送端文件存儲原理

我們普遍認為一旦涉及到磁盤的訪問,數(shù)據(jù)的讀寫就會變得很慢,其實不然,操作系統(tǒng)已經(jīng)針對磁盤的訪問速率做了很大的優(yōu)化;比如,預(yù)讀會提前將一個比較大的磁盤讀入內(nèi)存,后寫會把很多小的邏輯寫操作合并起來組合成一個大的物理寫操作;并且,操作系統(tǒng)還會將主內(nèi)存剩余的所有空間都用作磁盤緩存,所有的磁盤讀寫都會經(jīng)過統(tǒng)一的磁盤緩存,綜上所述,如果針對磁盤的順序讀寫,某些情況它可能比隨機的內(nèi)存訪問都要快。

文件寫入的邏輯無外乎一下這兩種,但kafka選擇了第一種,也就是a圖的邏輯:

image.png

b圖是首先在內(nèi)存中保存盡可能多的數(shù)據(jù),并在需要時將這些數(shù)據(jù)刷新進磁盤;
a圖是所有數(shù)據(jù)立即寫入磁盤,但不進行刷新數(shù)據(jù)的調(diào)用,數(shù)據(jù)首先會被傳輸?shù)酱疟P緩存,操作系統(tǒng)隨后會將數(shù)據(jù)定期自動刷新到磁盤。

發(fā)送端優(yōu)化

新的API中,生產(chǎn)者要發(fā)送消息,并不是直接發(fā)送給服務(wù)器,而是在客戶端先把消息放入一個緩沖隊列中,然后由一個消息發(fā)送線程從隊列中拉取消息,以批鹽的方式發(fā)送消息給服務(wù)端。 Kafka的記錄收集器RecordAccumulator 負責(zé)緩存生產(chǎn)者客戶端產(chǎn)生的消息,發(fā)送線程( Sender)負責(zé)讀取記錄收集器的批量消息, 通過網(wǎng)絡(luò)發(fā)送給服務(wù)端。

開篇我們便列出了kafka 發(fā)送端的流程圖,消息發(fā)送之初,首先會為消息指定一個分區(qū)(發(fā)送消息時未指定分區(qū)的情況下),對于沒有鍵的消息,通過計數(shù)器自增輪詢的方式依次將消息分配到不同的分區(qū)上;對于有鍵的消息,對鍵計算散列值,然后和主題的分區(qū)數(shù)進行取模得到分區(qū)編號,具體的客戶端代碼實現(xiàn):

public int partition(ProducerRecord<byte[], byte[]> record, Cluster cluster) {
       //獲取集群中所有的分區(qū)
        List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());
        int numPartitions = partitions.size();
        //如果指定分區(qū)
        if (record.partition() != null) {
            // they have given us a partition, use it
            if (record.partition() < 0 || record.partition() >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given with record: " + record.partition()
                                                   + " is not in the range [0..."
                                                   + numPartitions
                                                   + "].");
            return record.partition();
      //  如果沒有key,則負載均衡的分布
        } else if (record.key() == null) {
            int nextValue = counter.getAndIncrement();
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(record.topic());
            if (availablePartitions.size() > 0) {
                int part = Utils.abs(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.abs(nextValue) % numPartitions;
            }
        } else {
            // 如果有key,則對key 進行hash取模運算
            return Utils.abs(Utils.murmur2(record.key())) % numPartitions;
        }
    }

}

在客戶端就為消息選擇分區(qū)的目的是什么? 只有為消息選擇分區(qū),我們才能知道應(yīng)該發(fā)送到哪個節(jié)點,如果隨便找一個服務(wù)端節(jié)點,再由那個節(jié)點去決定如何將消息轉(zhuǎn)發(fā)給其他正確的節(jié)點來保存。這種方式增加了服務(wù)端的負擔,多了不必要的數(shù)據(jù)傳輸。

序列化

在上述代碼中,我們看到了kafka producer在發(fā)送消息的時候會將key和value進行序列化,上面的程序中使用的是Kafka客戶端自帶的org.apache.kafka.common.serialization.StringSerializer,除了用于String類型的序列化器之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long這幾種類型,這幾個序列化類都實現(xiàn)了org.apache.kafka.common.serialization.Serializer接口接下來,此接口有三種方法:

  • public void configure(Map<String, ?> configs, boolean isKey):用來配置當前類。
  • public byte[] serialize(String topic, T data):用來執(zhí)行序列化。
  • public void close():用來關(guān)閉當前序列化器。一般情況下這個方法都是個空方法,如果實現(xiàn)了此方法,必須確保此方法的冪等性,因為這個方法很可能會被KafkaProducer調(diào)用多次。

業(yè)界用的多的序列化框架無外乎如Avro、JSON、Thrift、ProtoBuf或者Protostuff等工具,這里我就不擴展開了,讀者如果感興趣可以搜索相關(guān)的資料,下面就以一個簡單的例子來介紹下如何自定義序列化方式.

假設(shè)我們有一個自定義的Company類:

@Data
public class Company {
    private String name;
    private String address;
}

接下來我們Company的name和address屬性進行序列化,實現(xiàn)下Serializer接口:

public class CompanySerializer implements Serializer<Customer> {
    public void configure(Map<String, ?> configs, boolean isKey) {}
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes("UTF-8");
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes("UTF-8");
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4+4+name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return new byte[0];
    }
    public void close() {}
}

使用自定義的序列化類的方式也簡單,在前面的代碼中替換下properties中的序列化類即可:

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.xxx.kafka. CompanySerializer");

分區(qū)器

在上文的demo中,我們創(chuàng)建消息的時候,必須要提供主題和消息的內(nèi)容,而消息的key是可選的(也就是我們平時工作中,發(fā)送消息時只需要指定topic和message),當不指定key時默認為null.消息的key有兩個重要的作用:

  • 提供描述消息的額外信息;
  • 用來決定消息寫入到哪個分區(qū),所有具有相同key的消息會分配到同一個分區(qū)中.

如果key為null,那么生產(chǎn)者會使用默認的分配器,該分配器使用輪詢round-robin)算法來將消息均衡到所有分區(qū).

如果key不為null且使用的是默認的分配器,那么生產(chǎn)者會對key進行哈希并根據(jù)結(jié)果將消息分配到特定的分區(qū).注意的是,在計算消息與分區(qū)的映射關(guān)系時,使用的是全部的分區(qū)數(shù)而不僅僅是可用的分區(qū)數(shù).這也意味著,如果某個分區(qū)不可用(雖然使用復(fù)制方案的話這極少發(fā)生),而消息剛好被分配到該分區(qū),那么將會寫入失敗.另外,如果需要增加額外的分區(qū),那么消息與分區(qū)的映射關(guān)系將會發(fā)生改變,因此盡量避免這種情況,具體的信息可以查看DefaultPartitioner中的代碼實現(xiàn):

    /**
     * Compute the partition for the given record.
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //獲取指定topic的partitions
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //key=null 
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            //可用分區(qū)
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                //消息隨機分布到topic可用的partition中
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // 無分區(qū)可利用, 給定一個不可用的分區(qū)
                return Utils.toPositive(nextValue) % numPartitions;
            }
            //如果 key 不為 null,并且使用了默認的分區(qū)器,kafka 會使用自己的 hash 算法對 key 取 hash 值
        } else {//通過hash獲取partition
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

現(xiàn)在來看下如何自定義一個分配器,下面將key為Test的消息單獨放在一個分區(qū),與其他的消息進行分區(qū)隔離:

public class TestPartitioner implements Partitioner {
    public void configure(Map<String, ?> configs) {}
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if ((keyBytes == null) || (!(key instanceOf String)))
        throw new InvalidRecordException("We expect all messages to have customer name as key")
    if (((String) key).equals("Test"))
        return numPartitions; // Banana will always go to last partition
   
     // Other records will get hashed to the rest of the partitions
    return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
    }
    
    public void close() {}
 
}

使用自定義的分區(qū)器

使用很簡單,在配置文件中或者properties文件中指定分區(qū)器的類即可;

props.put("partitioner.class", "com.xxx.kafka.TestPartitioner");

攔截器

Producer攔截器是個相當新的功能.對于producer而言,interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等.同時,producer允許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈,Intercetpor的實現(xiàn)接口是org.apache.kafka.clients.producer.ProducerInterceptor,其定義的方法包括:

  • onSend(ProducerRecord):該方法封裝進KafkaProducer.send方法中,即它運行在用戶主線程中的。Producer確保在消息被序列化以計算分區(qū)前調(diào)用該方法。用戶可以在該方法中對消息做任何操作,但最好保證不要修改消息所屬的topic和分區(qū),否則會影響目標分區(qū)的計算
  • onAcknowledgement(RecordMetadata, Exception e):該方法會在消息被應(yīng)答之前或消息發(fā)送失敗時調(diào)用,并且通常都是在producer回調(diào)邏輯觸發(fā)之前。onAcknowledgement運行在producer的IO線程中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的消息發(fā)送效率
  • close:關(guān)閉interceptor,主要用于執(zhí)行一些資源清理工作,一般不作實現(xiàn);

interceptor可能被運行在多個線程中,因此在具體實現(xiàn)時用戶需要自行確保線程安全.另外倘若指定了多個interceptor,則producer將按照指定順序調(diào)用它們,并僅僅是捕獲每個interceptor可能拋出的異常記錄到錯誤日志中而非在向上傳遞.這在使用過程中要特別留意.

下面我們簡單演示一個雙interceptor組成的攔截鏈,第一個interceptor會在消息發(fā)送前將時間戳信息加到消息value的最前部;第二個interceptor會在消息發(fā)送后更新成功發(fā)送消息數(shù)或失敗發(fā)送消息數(shù).

第一個,在send方法中,我們會創(chuàng)建一個新的message,把時間戳寫入消息體的最前部.

public class TimeStampPrependerInterceptor implements ProducerInterceptor<String, String> {
    @Override
    public void configure(Map<String, ?> configs) {
 
    }
 
    @Override
    public ProducerRecord onSend(ProducerRecord msg) {
        return new ProducerRecord(
                msg(), msg(), record.timestamp(), msg(), System.currentTimeMillis() + "," + msg().toString());
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
 
    }
 
    @Override
    public void close() {
    }
}

定義第二個interceptor:CounterInterceptor,該interceptor會在消息發(fā)送后更新"發(fā)送成功消息數(shù)"和"發(fā)送失敗消息數(shù)"兩個計數(shù)器,并在producer關(guān)閉時打印這兩個計數(shù)器;

public class CounterInterceptor implements ProducerInterceptor<String, String> {
 
    private int errorCounter = 0;
    private int successCounter = 0;
 
    @Override
    public void configure(Map<String, ?> configs) {
    }
 
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }
 
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if (exception == null) {
            successCounter++;
        } else {
            errorCounter++;
        }
    }
 
    @Override
    public void close() {
        // 保存結(jié)果
        System.out.println("Successful sent: " + successCounter);
        System.out.println("Failed sent: " + errorCounter);
    }
}

定義好interceptor之后,我們需要在producer中這樣指定即可,代碼如下:

List<String> interceptors = new ArrayList<>();
interceptors.add("com.xxx.kafka.TimeStampPrependerInterceptor"); // interceptor 1
interceptors.add("com.xxx.kafka.CounterInterceptor"); // interceptor 2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

// 一定要關(guān)閉producer,這樣才會調(diào)用interceptors中的close方法
producer.close();

寫了這么多,基本上將一個簡單消息從kafka producer發(fā)送時可以做的事情弄清了,但是我還是有一個疑問,分區(qū)器,攔截器,序列化他們之間有順序?
這個疑問留給大家自己去解決!!!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一天晚上,在樓下奶茶店,偶然間看到這個乖巧的小可愛,就把它定格了下來。因為最近考研的事情,真的好煩人,總是覺得時間...
    突然就想你了閱讀 204評論 0 0
  • 喜歡的女生終于發(fā)消息給我,我卻果斷拉黑 大學(xué)喜歡一個女生四年,被拒絕了4年 畢業(yè)后她去了外省,我留在本地,從此不再...
    迷茫懶惰君閱讀 667評論 0 0
  • 想有一首歌 我們在歌里 冬天入夜猩紅的天空 再來一場細碎的大雪 空靜無人的廣場 出現(xiàn)一個你 陪我看完這雪景 若真有...
    竹七君閱讀 417評論 0 0
  • 一、10月整體分析 月計劃60%完成,重要事項取得的效果比預(yù)期更好!優(yōu)化了工作上的石墨協(xié)作工作集流程,建立了家庭石...
    揚頭望月亮閱讀 300評論 0 0

友情鏈接更多精彩內(nèi)容