kafka_03_Kafka消息發(fā)送

Kafka消息發(fā)送

1. 構(gòu)建ProducerRecord對象

該類如下:

public class ProducerRecord<K, V> {

    //The topic the record will be appended to
    private final String topic;
    //The partition to which the record should be sent
    private final Integer partition;
    //the headers that will be included in the record
    private final Headers headers;
    //The key that will be included in the record
    private final K key;
    //The record contents
    private final V value;
    //The timestamp of the record, in milliseconds since epoch. If null, the producer will assign the timestamp using System.currentTimeMillis().
    private final Long timestamp;
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null.");
        if (timestamp != null && timestamp < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid timestamp: %d. Timestamp should always be non-negative or null.", timestamp));
        if (partition != null && partition < 0)
            throw new IllegalArgumentException(
                    String.format("Invalid partition: %d. Partition number should always be non-negative or null.", partition));
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = new RecordHeaders(headers);
    }
    
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        this(topic, partition, timestamp, key, value, null);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {
        this(topic, partition, null, key, value, headers);
    }
    
    public ProducerRecord(String topic, Integer partition, K key, V value) {
        this(topic, partition, null, key, value, null);
    }
    
    public ProducerRecord(String topic, K key, V value) {
        this(topic, null, null, key, value, null);
    }
    
    //Create a record with no key
    public ProducerRecord(String topic, V value) {
        this(topic, null, null, null, value, null);
    }
    //省略getters和setters
}

這里有5個構(gòu)造方法,但是最后用的就是其中的一個。實際應(yīng)用中,構(gòu)建ProducerRecord對象是非常頻繁的操作。

2. 發(fā)送消息

發(fā)送消息有三種模式,發(fā)后即忘(fire-and-forget), 同步(sync),異步(async)

2.1 fire-and-forget(上一篇博客介紹的就是發(fā)后即忘)

 producer.send(record);

特點: 效率高,可靠性差

2.2 同步

代碼如下:

package com.ghq.kafka.server;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * 消息的發(fā)送
 */
public class ProducerSendMessage {

    public static final String brokerList = "192.168.52.135:9092";
    public static final String topic = "topic-demo";

    public static Properties initProperties(){
        Properties prop = new Properties();
        prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        prop.put(ProducerConfig.CLIENT_ID_CONFIG,"producer.client.id.demo");
        /**
         *
         * 配置重試次數(shù),這里重試10次,重試10次之后如果消息還是發(fā)送不成功,那么還是會拋出異常
         * 那些類可以重試呢?
         * org.apache.kafka.common.errors.RetriableException 及其子類
         *
         */
        prop.put(ProducerConfig.RETRIES_CONFIG,10);
        return prop;
    }

    public static void sync() {

        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        //3. 發(fā)送消息
        while (true){
            /**
             * send方法本身就是異步的
             */
            Future<RecordMetadata> future = producer.send(record);

            try {
                /**
                 * get方法是阻塞的
                 * 這里返回 RecordMetadata,包含了發(fā)送消息的元數(shù)據(jù)信息
                 */
                RecordMetadata metadata = future.get();
                System.out.println("topic:"+metadata.topic());
                System.out.println("partition:"+metadata.partition());
                System.out.println("offset:"+metadata.offset());
                System.out.println("hasTimestamp:"+metadata.hasTimestamp());
                System.out.println("-----------------------------------------");
                Thread.sleep(1000);
            } catch (ExecutionException | InterruptedException e) {
                e.printStackTrace();
            }

        }
        //4. 關(guān)閉資源
        //producer.close();

    }
}

特點:性能差,可靠性高

注:什么異??梢灾卦嚕縍etriableException

RetriableException.jpg

2.3 異步

public static void async() {
        Properties prop = initProperties();
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Hello,World");

        Future<RecordMetadata> future = producer.send(record, new Callback() {

            /**
             * metadata 和 exception 互斥
             * 消息發(fā)送成功:metadata != null exception == null
             * 消息發(fā)送失?。簃etadata == null exception != null
             */
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {

                if (exception != null) {
                    System.out.println("消息發(fā)送失敗:"+metadata);
                }else {
                    System.out.println("消息發(fā)送成功:"+metadata);
                }
            }
        });
        
        //這里采用lambda表達式
        producer.send(record, (metadata, exception) -> {
            if (exception != null) {
                System.out.println("消息2發(fā)送失?。?+metadata);
            }else {
                System.out.println("消息2發(fā)送成功:"+metadata);
            }
        });
        producer.close();
    }

輸出結(jié)果如下:

消息1發(fā)送成功:topic-demo-0@22
消息2發(fā)送成功:topic-demo-2@24

特點:性能 :同步 < 異步 < 發(fā)后即忘,可靠性:同步 > 異步 > 發(fā)后即忘

結(jié)束。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 在協(xié)作越來越重要的今天,需要高效,快速的建立關(guān)系和信任,最有效的方法就是樹立個人品牌,以下10項原則也許有所幫助。...
    嚴小愛閱讀 586評論 0 0
  • 首先主布局文件是SwipeRefreshLayout+ListView 加載更多作為腳布局添加到ListView ...
    碼圣閱讀 552評論 2 5
  • 簡單吃頓“農(nóng)家菜”已經(jīng)無法在競爭中難以站穩(wěn)腳。 較為典型的是,不少地方“農(nóng)家樂”已從最初提供吃農(nóng)家菜、住農(nóng)家屋等簡...
    6959cef3343c閱讀 534評論 0 0
  • 即是建立多角度的框架 從多個角度去解決一個問題 問自己有哪幾種方法,其中哪種方法更加簡單方便
    老菜頭_dca8閱讀 144評論 0 0
  • 新時代文明實踐,我為山村小伙伴送溫暖 今天上午,作為孩子班級的家委會主任,組織實驗小學2015級8班的學生、家長一...
    見山聞道閱讀 316評論 0 4

友情鏈接更多精彩內(nèi)容