Kafka之java API

Kafka是一個分布式流媒體平臺。發(fā)布和訂閱記錄流,類似于消息隊列或企業(yè)消息傳遞系統(tǒng)。以容錯持久的方式存儲記錄流。處理記錄發(fā)生的流。本文講述如何使用java API 操作kafka集群

主要內(nèi)容:

  • 1.消息生產(chǎn)者
  • 2.消息消費者
  • 3.測試

1.消息生產(chǎn)者

1.1.引入依賴

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.1</version>
</dependency>

1.2.編寫生產(chǎn)者

public class KafkaProducerClient {
    public static void main(String[] args) {
        //1、準備配置文件 參考:ProducerConfig.java
        Properties props = new Properties();
        // kakfa 服務(wù)列表
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        /**
         * 當生產(chǎn)者將ack設(shè)置為“全部”(或“-1”)時,min.insync.replicas指定必須確認寫入被認為成功的最小副本數(shù)。
         * 如果這個最小值不能滿足,那么生產(chǎn)者將會引發(fā)一個異常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
         * 當一起使用時,min.insync.replicas和acks允許您執(zhí)行更大的耐久性保證。
         * 一個典型的情況是創(chuàng)建一個復(fù)制因子為3的主題,將min.insync.replicas設(shè)置為2,并使用“全部”選項來產(chǎn)生。
         * 這將確保生產(chǎn)者如果大多數(shù)副本沒有收到寫入引發(fā)異常。
         */
        props.put("acks", "all");
        /**
         * 設(shè)置一個大于零的值,將導(dǎo)致客戶端重新發(fā)送任何失敗的記錄
         */
        props.put("retries", 0);
        /**
         * 只要有多個記錄被發(fā)送到同一個分區(qū),生產(chǎn)者就會嘗試將記錄一起分成更少的請求。
         * 這有助于客戶端和服務(wù)器的性能。該配置以字節(jié)為單位控制默認的批量大小。
         */
        props.put("batch.size", 16384);
        /**
         * 在某些情況下,即使在中等負載下,客戶端也可能希望減少請求的數(shù)量。
         * 這個設(shè)置通過添加少量的人工延遲來實現(xiàn)這一點,即不是立即發(fā)出一個記錄,
         * 而是等待達到給定延遲的記錄,以允許發(fā)送其他記錄,以便發(fā)送可以一起批量發(fā)送
         */
        props.put("linger.ms", 1);
        /**
         * 生產(chǎn)者可用于緩沖等待發(fā)送到服務(wù)器的記錄的總字節(jié)數(shù)。
         * 如果記錄的發(fā)送速度比發(fā)送給服務(wù)器的速度快,那么生產(chǎn)者將會阻塞,max.block.ms之后會拋出異常。
         * 這個設(shè)置應(yīng)該大致對應(yīng)于生產(chǎn)者將使用的總內(nèi)存,但不是硬性限制,
         * 因為不是所有生產(chǎn)者使用的內(nèi)存都用于緩沖。
         * 一些額外的內(nèi)存將被用于壓縮(如果壓縮被啟用)以及用于維護正在進行的請求。
         */
        props.put("buffer.memory", 33554432);
        // key的序列化類型
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // value的序列化類型
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //2、創(chuàng)建KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(props);
        for (int i = 0; i < 100; i++) {
            //3、發(fā)送數(shù)據(jù)
            kafkaProducer.send(new ProducerRecord("TEST_JAVA", "key"+i, "value" + i));
        }

        kafkaProducer.flush();
        kafkaProducer.close();
    }
}

2.消息消費者

2.1.引入依賴

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.1</version>
</dependency>

2.2.編寫消費者

public class KafkaConsumerClient {
    public static void main(String[] args) {
        //1、準備配置文件 參考:ConsumerConfig.scala
        Properties props = new Properties();
        // kakfa 服務(wù)列表
        props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
        //一個字符串用來指示一組consumer所在的組
        props.put("group.id", "test");
        // 如果true,consumer定期地往zookeeper寫入每個分區(qū)的offset
        props.put("enable.auto.commit", "true");
        // 往zookeeper上寫offset的頻率
        props.put("auto.commit.interval.ms", "1000");
        // key的反序列化類型
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的反序列化類型
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //2、創(chuàng)建KafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(props);

        // 3、訂閱數(shù)據(jù),這里的topic可以是多個
        kafkaConsumer.subscribe(Arrays.asList("TEST_JAVA"));

        // 4、獲取數(shù)據(jù)
        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s,offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
            }
        }
    }
}

3.測試

3.1.啟動zookeeper

參考:Zookeeper集群安裝

運行一鍵啟動腳本:startzk.sh

3.2.啟動kafka

參考:Kafka之集群安裝

運行一鍵啟動腳本:startkafka.sh

3.3.啟動消費者

直接run KafkaConsumerClient.main 就會一直阻塞等待消息


3.4.啟動生產(chǎn)者

直接run KafkaProducerClient.main 就會往名為TEST_JAVA的Topic發(fā)送100條消息

可以看到消費者已經(jīng)消費了所有的數(shù)據(jù)


image.png

3.5.查看topics

kafka-topics.sh --describe --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka

其中TEST_JAVA就是我們剛才默認建的Topic

參考

1.Kafka 配置參數(shù)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容