Kafka 新版生產者 API

1. kafka 生產者發(fā)送消息的流程

2. Kafka 生產者發(fā)送數據的3種方式

(1) 發(fā)送并忘記(fire-and-forget)

把消息發(fā)送給服務器,但并不關心它是否正常到達。大多數情況下,消息會正常到達,因為 Kafka 是高可用的,而且生產者會自動嘗試重發(fā)。不過,使用這種方式有時候也會丟失一些消息。

package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * @Title Producer01.java 
 * @Description Kafka 生產者發(fā)送消息的第一種方式:發(fā)送并忘記
 * @Author YangYunhe
 * @Date 2018-06-21 10:35:34
 */
public class Producer01 {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("acks", "1");
        props.put("retries", 3);
        props.put("batch.size", 16384); // 16K
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432); // 32M
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        String filePath = Producer01.class.getClassLoader().getResource("wechat_data.txt").getPath();
        BufferedReader br = new BufferedReader(new FileReader(filePath));

        String line;
        while((line = br.readLine()) != null) {
            // 創(chuàng)建 ProducerRecord 可以指定 topic、partition、key、value,其中 partition 和 key 是可選的
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", 0, "key", line);
            // ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", "key", line);
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
            
            // 只管發(fā)送消息,不管是否發(fā)送成功
            producer.send(record);
            Thread.sleep(100);
        }
        producer.close();
    }
}

(2) 同步發(fā)送

使用 send() 方法發(fā)送消息,它會返回一個 Future 對象,調用 get() 方法進行等待(會返回元數據或者拋出異常),
就可以知道消息是否發(fā)送成功。

package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @Title Producer02.java 
 * @Description Kafka 生產者發(fā)送消息的第二種方式:同步發(fā)送
 * @Author YangYunhe
 * @Date 2018-06-21 10:38:37
 */
public class Producer02 {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        String filePath = Producer02.class.getClassLoader().getResource("wechat_data.txt").getPath();
        BufferedReader br = new BufferedReader(new FileReader(filePath));

        String line;
        while((line = br.readLine()) != null) {
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
            // 程序阻塞,直到該條消息發(fā)送成功返回元數據信息或者報錯
            RecordMetadata metadata = producer.send(record).get();
            StringBuilder sb = new StringBuilder();
            sb.append("record [").append(line).append("] has been sent successfully!").append("\n")
                .append("send to partition ").append(metadata.partition())
                .append(", offset = ").append(metadata.offset());
            System.out.println(sb.toString());
            Thread.sleep(100);
        }
        producer.close();
    }
}

(3) 異步發(fā)送

大多數時候,我們并不需要等待響應——盡管 Kafka會把目標主題、分區(qū)信息和消息的偏移量發(fā)送回來,但對于發(fā)送端的應用程序來說不是必需的。

不過在遇到消息發(fā)送失敗時,我們需要拋出異常、記錄錯誤日志等,這樣的情況下可以使用異步發(fā)送消息的方式,調用 send() 方法,并指定一個回調函數,服務器在返回響應時調用該函數。

package com.bonc.rdpe.kafka110.producer;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @Title Producer03.java 
 * @Description Kafka 生產者發(fā)送消息的第三種方式:異步發(fā)送
 * @Author YangYunhe
 * @Date 2018-06-21 11:06:05
 */
public class Producer03 {
    
    public static void main(String[] args) throws Exception {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        
        String filePath = Producer03.class.getClassLoader().getResource("wechat_data.txt").getPath();
        BufferedReader br = new BufferedReader(new FileReader(filePath));

        String line;
        while((line = br.readLine()) != null) {
            ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", line);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    // 如果發(fā)送消息成功,返回了 RecordMetadata
                    if(metadata != null) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("message has been sent successfully! ")
                            .append("send to partition ").append(metadata.partition())
                            .append(", offset = ").append(metadata.offset());
                        System.out.println(sb.toString());
                    }
                    // 如果消息發(fā)送失敗,拋出異常
                    if(e != null) {
                        e.printStackTrace();
                    }
                }
            });
            Thread.sleep(100);
        }
        producer.close();
    }
}

3. 多線程生產者

在數據量比較大同時對發(fā)送消息的順序沒有嚴格要求時,可以使用多線程的方式發(fā)送數據,實現(xiàn)多線程生產者有兩種方式:1. 實例化一個 KafkaProducer 對象運行多個線程共享該對象發(fā)送消息;2. 實例化多個 KafkaProducer 對象。
由于 Kafka Producer 是線程安全的,所以多個線程共享一個 Kafka Producer 對象在性能上要好很多。

(1) 線程類實現(xiàn)

package com.bonc.rdpe.kafka110.thread;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * @Title KafkaProducerThread.java 
 * @Description 多線程生產者的線程類實現(xiàn)
 * @Author YangYunhe
 * @Date 2018-06-25 13:54:38
 */
public class KafkaProducerThread implements Runnable {
    
    private KafkaProducer<String, String> producer;
    private ProducerRecord<String, String> record;
    
    public KafkaProducerThread(KafkaProducer<String, String> producer, ProducerRecord<String, String> record) {
        this.producer = producer;
        this.record = record;
    }

    @Override
    public void run() {
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception != null) {
                    System.out.println("exception occurs when sending message: " + exception);
                }
                if(metadata != null) {
                    StringBuilder result = new StringBuilder();
                    result.append("message[" + record.value() + "] has been sent successfully! ")
                        .append("send to partition ").append(metadata.partition())
                        .append(", offset = ").append(metadata.offset());
                    System.out.println(result.toString());
                }
            }
        });
    }
}

(2) 發(fā)送消息的具體實現(xiàn)

package com.bonc.rdpe.kafka110.producer;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.bonc.rdpe.kafka110.thread.KafkaProducerThread;

/**
 * @Title MultiProducer.java 
 * @Description 多線程生產者的測試代碼
 * @Author YangYunhe
 * @Date 2018-06-25 14:30:58
 */
public class MultiProducer {
    
    private static final int THREADS_NUMS = 10;
    
    public static void main(String[] args) {
        
        ExecutorService executor = Executors.newFixedThreadPool(THREADS_NUMS);
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        ProducerRecord<String, String> record;
        
        try {
            for(int i = 0; i < 100; i++) {
                record = new ProducerRecord<>("dev3-yangyunhe-topic001", "hello" + i);
                executor.submit(new KafkaProducerThread(producer, record));
                Thread.sleep(1000);
            }
        }catch (Exception e) {
            System.out.println("exception occurs when sending message: " + e);
        }finally {
            producer.close();
            executor.shutdown();
        }
    }
}

(3) 運行結果:

message[hello0] has been sent successfully! send to partition 1, offset = 705
message[hello1] has been sent successfully! send to partition 0, offset = 705
message[hello2] has been sent successfully! send to partition 2, offset = 704
message[hello3] has been sent successfully! send to partition 1, offset = 706
message[hello4] has been sent successfully! send to partition 0, offset = 706

......

4. Kafka Producer 常用配置(kafka-1.1.0)

(1) acks

  • 類型:string
  • 默認值:1
  • 可設置值:[all, -1, 0, 1]
  • 重要性:高
  • 說明:
    • 0:生產者在成功寫入消息之前不會等待任何來自服務器的響應。也就是說,如果當中出現(xiàn)了問題,導致服務器沒有收到消息,那么生產者就無從得知,消息也就丟失了。不過,因為生產者不需要等待服務器的響應,所以它可以以網絡能夠支持的最大速度發(fā)送消息,從而達到很高的吞吐量。
    • 1:只要集群的首領節(jié)點收到消息,生產者就會收到一個來自服務器的成功響應。如果消息無法到達首領節(jié)點(比如首領節(jié)點崩潰,新的首領還沒有被選舉出來),生產者會收到一個錯誤響應,為了避免數據丟失,生產者會重發(fā)消息。不過,如果一個沒有收到消息的節(jié)點成為新首領,消息還是會丟失。這個時候的吞吐量取決于使用的是同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務器的響應(通過調用 Future 對象的 get() 方法),顯然會增加延遲(在網絡上傳輸一個來回的延遲)。如果客戶端使用回調,延遲問題就可以得到緩解,不過吞吐量還是會受發(fā)送中消息數量的限制(比如,生產者在收到服務器響應之前可以發(fā)送多少個消息)。
    • all:只有當所有參與復制的節(jié)點全部收到消息時,生產者才會收到一個來自服務器的成功響應。這種模式是最安全的,它可以保證不止一個服務器收到消息,就算有服務器發(fā)生崩潰,整個集群仍然可以運行。不過,它的延遲比 acks=1 時更高,因為我們要等待不只一個服務器節(jié)點接收消息。
    • -1:作用與"all"是一樣的。

(2) buffer.memory

  • 類型:long
  • 默認值:33554432(32M)
  • 可設置值:[0,...]
  • 重要性:高
  • 說明:該參數用來設置生產者內存緩沖區(qū)的大小,生產者用它緩沖要發(fā)送到服務器的消息。如果應用程序發(fā)送消息的速度超過發(fā)送到服務器的速度,會導致生產者空間不足。
    這個時候,send()方法調用要么被阻塞,要么拋出異常,取決于如何設置 max.block.ms (類型:long,默認值:60000(1分鐘),可設置值:[0,...],重要性:中等)參數。表示在拋出異常之前可以阻塞的時間。

(3) compression.type

  • 類型:string
  • 默認值:none
  • 可設置值:[none, gzip, snappy, lz4]
  • 重要性:高
  • 說明:該參數可以指定消息被發(fā)送給 broker 之前使用哪一種壓縮算法進行壓縮。snappy 壓縮算法由 Google 發(fā)明,它占用較少的 CPU,卻能提供較好的性能和相當可觀的壓縮比,如果比較關注性能和網絡帶寬,可以使用這種算法。gzip 壓縮算法一般會占用較多的 CPU,但會提供更高的壓縮比,所以如果網絡帶寬比較有限,可以使用這種算法。使用壓縮可以降低網絡傳輸開銷和存儲開銷,而這往往是向 Kafka 發(fā)送消息的瓶頸所在。

(4) retries

  • 類型:int
  • 默認值:0
  • 可設置值:[0,...,2147483647]
  • 重要性:高
  • 說明:生產者從服務器收到的錯誤有可能是臨時性的錯誤(比如分區(qū)找不到首領)。在這種情況下,retries 參數的值決定了生產者可以重發(fā)消息的次數,如果達到這個次數,生產者會放棄重試并返回錯誤。默認情況下,生產者會在每次重試之間等待 100ms,不過可以通過 retry.backoff.ms(類型:long,默認值:100, 可設置值:[0,...],重要性:低) 參數來改變這個時間間隔。
    建議在設置重試次數和重試時間間隔之前,先測試一下恢復一個崩潰節(jié)點需要多少時間(比如所有分區(qū)選舉出首領需要多長時間),讓總的重試時間比 Kafka 集群從崩潰中恢復的時間長,否則生產者會過早地放棄重試。不過有些錯誤不是臨時性錯誤,沒辦法通過重試來解決(比如"消息太大"錯誤)。一般情況下,因為生產者會自動進行重試,所以就沒必要在代碼邏輯里處理那些可重試的錯誤。你只需要處理那些不可重試的錯誤或重試次數超出上限的情況。

(5) batch.size

  • 類型:int
  • 默認值:16384(16K)
  • 可設置值:[0,...]
  • 重要性:中等
  • 說明:當有多個消息需要被發(fā)送到同一個分區(qū)時,生產者會把它們放在同一個批次里。該參數指定了一個批次可以使用的內存大小,按照字節(jié)數計算(而不是消息個數)。當批次被填滿,批次里的所有消息會被發(fā)送出去。不過生產者并不一定都會等到批次被填滿才發(fā)送,半滿的批次,甚至只包含一個消息的批次也有可能被發(fā)送。所以就算把批次大小設置得很大,也不會造成延遲,只是會占用更多的內存而已。但如果設置得太小,因為生產者需要更頻繁地發(fā)送消息,會增加一些額外的開銷。

(6) linger.ms

  • 類型:long
  • 默認值:0
  • 可設置值:[0,...]
  • 重要性:中等
  • 說明:該參數指定了生產者在發(fā)送批次之前等待更多消息加入批次的時間。KafkaProducer 會在批次填滿或 linger.ms 達到上限時把批次發(fā)送出去。默認情況下,只要有可用的線程,生產者就會把消息發(fā)送出去,就算批次里只有一個消息。把 linger.ms 設置成比 0 大的數,讓生產者在發(fā)送批次之前等待一會兒,使更多的消息加入到這個批次。雖然這樣會增加延遲,但也會提升吞吐量(因為一次性發(fā)送更多的消息,每個消息的開銷就變小了)。

(7) max.in.flight.requests.per.connection

  • 類型:int
  • 默認值:5
  • 可設置值:[1,...]
  • 重要性:低
  • 說明:該參數指定了生產者在收到服務器響應之前可以發(fā)送多少個消息。它的值越高,就會占用越多的內存,不過也會提升吞吐量。
    把它設為 1 可以保證消息是按照發(fā)送的順序寫入服務器的,即使發(fā)生了重試。

(8) max.request.size

  • 類型:int
  • 默認值:1048576
  • 可設置值:[0,...]
  • 重要性:中等
  • 說明:該參數用于控制生產者發(fā)送的請求大小。它可以指能發(fā)送的單個消息的最大值,也可以指單個請求里所有消息總的大小。例如,假設這個值為 1MB,那么可以發(fā)送的單個最大消息為 1MB,或者生產者可以在單個請求里發(fā)送一個批次,該批次包含了 1000 個消息,每個消息大小為 1KB。另外,broker 對可接收的消息最大值也有自己的限制(message.max.bytes(類型:int,默認值:1000012,大約0.95M,可設置值:[0,...],重要性:高)),所以兩邊的配置最好可以匹配,避免生產者發(fā)送的消息被 broker 拒絕。

(9) receive.buffer.bytes 和 send.buffer.bytes

receive.buffer.bytes

  • 類型:int
  • 默認值:32768(32K)
  • 可設置值:[-1,...]
  • 重要性:中等

send.buffer.bytes

  • 類型:int
  • 默認值:131072(128K)
  • 可設置值:[-1,...]
  • 重要性:中等

說明:這兩個參數分別指定了 TCP socket 接收和發(fā)送數據包的緩沖區(qū)大小。如果它們被設為 -1,就使用操作系統(tǒng)的默認值。如果生產者或消費者與 broker 處于不同的數據中心,那么可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

(10) client.id

  • 類型:string
  • 默認值:""
  • 可設置值:任意字符串
  • 重要性:中等
  • 說明:該參數可以是任意的字符串,服務器會用它來識別消息的來源。

(11) request.timeout.ms

  • 類型:int
  • 默認值:30000
  • 可設置值:[0,...]
  • 重要性:中等
  • 說明:該參數指定了生產者在發(fā)送數據時等待服務器返回響應的時間。如果等待響應超時,那么生產者要么重試發(fā)送數據,要么返回一個錯誤(拋出異?;驁?zhí)行回調)。[metadata.fetch.timeout.ms] and [timeout.ms] have been removed. They were initially deprecated in Kafka 0.9.0.0.

(12) max.block.ms

  • 類型:long
  • 默認值:60000
  • 可設置值:[0,...]
  • 重要性:中等
  • 說明:該參數指定了在調用 send() 方法或使用 partitionsFor() 方法獲取元數據時生產者的阻塞時間。當生產者的發(fā)送緩沖區(qū)已滿,或者沒有可用的元數據時,這些方法就會阻塞。在阻塞時間達到 max.block.ms 時,生產者會拋出超時異常。

(13) connections.max.idle.ms

  • 類型:long
  • 默認值:540000
  • 可設置值:[0,...]
  • 重要性:中等
  • 說明:關閉空閑連接的等待時間,檢測到空閑的連接后,默認等待9分鐘才會關閉這個連接。

(14) metadata.max.age.ms

  • 類型:long
  • 默認值:300000
  • 可設置值:[0,...]
  • 重要性:低
  • 說明:更新元數據的時間間隔,在等待該參數配置的時間后,即使 producer 沒有發(fā)現(xiàn)任何 partition 或 leader 的變化,也會強制刷新元數據。

(15) reconnect.backoff.ms

  • 類型:long
  • 默認值:50
  • 可設置值:[0,...]
  • 重要性:低
  • 說明:嘗試重新連接 broker 的時間間隔。

(16) reconnect.backoff.max.ms

  • 類型:long
  • 默認值:1000
  • 可設置值:[0,...]
  • 重要性:低
  • 說明:如果重新連接的時間累積到達該參數的配置時間還沒有連接到 broker,那么宣告連接失敗。
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,569評論 19 139
  • 姓名:周小蓬 16019110037 轉載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,899評論 13 425
  • kafka的定義:是一個分布式消息系統(tǒng),由LinkedIn使用Scala編寫,用作LinkedIn的活動流(Act...
    時待吾閱讀 5,539評論 1 15
  • 如果在一個相當長的時間里,我們要把自己穩(wěn)定在一定水平,在剛剛過去的25分鐘,寫了大約4500字,按理來說,30分鐘...
    劍飛在思考閱讀 465評論 0 0
  • 今天媽媽去了家長會,媽媽拿回來了我的考試成績。語文考了96數學考了93。媽媽說我的語文進步很大,但是數學沒有考10...
    王靖童閱讀 158評論 0 0

友情鏈接更多精彩內容