Kafka 新版消費(fèi)者 API(四):優(yōu)雅的退出消費(fèi)者程序、多線程消費(fèi)者以及獨(dú)立消費(fèi)者

1. 優(yōu)雅的退出消費(fèi)者程序

package com.bonc.rdpe.kafka110.consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

/**
 * @author YangYunhe
 * @date 2018-07-17 11:05:39
 * @description: 優(yōu)雅的退出消費(fèi)者
 */
public class QuitConsumer {

    public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        props.put("group.id", "dev3-yangyunhe-topic001-group005");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
        consumer.subscribe(Arrays.asList("dev3-yangyunhe-topic001"));
        
        final Thread mainThread = Thread.currentThread();
        
        /*
         * 退出循環(huán)需要通過另一個(gè)線程調(diào)用consumer.wakeup()方法
         * 調(diào)用consumer.wakeup()可以退出poll(),并拋出WakeupException異常
         * 我們不需要處理 WakeupException,因?yàn)樗皇怯糜谔鲅h(huán)的一種方式
         * consumer.wakeup()是消費(fèi)者唯一一個(gè)可以從其他線程里安全調(diào)用的方法
         * 如果循環(huán)運(yùn)行在主線程里,可以在 ShutdownHook里調(diào)用該方法
         */ 
        Runtime.getRuntime().addShutdownHook(new Thread() {
             public void run() {
                 System.out.println("Starting exit...");
                 consumer.wakeup();
                 try {
                     // 主線程繼續(xù)執(zhí)行,以便可以關(guān)閉consumer,提交偏移量
                     mainThread.join();
                 } catch (InterruptedException e) {
                     e.printStackTrace();
                 }
             }
        });
        
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic = " + record.topic() + ", partition = " + record.partition() 
                        + ", offset = " + record.offset());
                }
                consumer.commitAsync();
            }
        }catch (WakeupException e) {
            // 不處理異常
        } finally {
            // 在退出線程之前調(diào)用consumer.close()是很有必要的,它會(huì)提交任何還沒有提交的東西,并向組協(xié)調(diào)器發(fā)送消息,告知自己要離開群組。
            // 接下來就會(huì)觸發(fā)再均衡,而不需要等待會(huì)話超時(shí)。
            consumer.commitSync();
            consumer.close();
            System.out.println("Closed consumer and we are done");
        }   
    }
}

2. 多線程消費(fèi)者

KafkaConsumer是非線程安全的,多線程需要處理好線程同步,多線程的實(shí)現(xiàn)方式有多種,這里介紹一種:每個(gè)線程各自實(shí)例化一個(gè)KakfaConsumer對象,這種方式的缺點(diǎn)是:當(dāng)這些線程屬于同一個(gè)消費(fèi)組時(shí),線程的數(shù)量受限于分區(qū)數(shù),當(dāng)消費(fèi)者線程的數(shù)量大于分區(qū)數(shù)時(shí),就有一部分消費(fèi)線程一直處于空閑狀態(tài)

多線程消費(fèi)者的線程實(shí)現(xiàn)類代碼如下:

package com.bonc.rdpe.kafka110.thread;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.StringDeserializer;

/**
 * @author YangYunhe
 * @date 2018-07-17 10:48:45
 * @description: 多線程消費(fèi)者的線程實(shí)現(xiàn)類
 */
public class ConsumerLoop implements Runnable {

    private final KafkaConsumer<String, String> consumer;
    private final List<String> topics;
    private final int id;

    public ConsumerLoop(int id, String groupId, List<String> topics) {
        this.id = id;
        this.topics = topics;
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<>(props);
      }

    @Override
    public void run() {
        try {
            consumer.subscribe(topics);
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (ConsumerRecord<String, String> record : records) {
                    Map<String, Object> data = new HashMap<>();
                    data.put("partition", record.partition());
                    data.put("offset", record.offset());
                    data.put("value", record.value());
                    System.out.println(this.id + ": " + data);
                }
            }
        } catch (WakeupException e) {
            // ignore for shutdown
        } finally {
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }

}

多線程消費(fèi)者主程序代碼如下:

package com.bonc.rdpe.kafka110.consumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import com.bonc.rdpe.kafka110.thread.ConsumerLoop;

/**
 * @author YangYunhe
 * @date 2018-07-17 10:39:25
 * @description: 多線程消費(fèi)者主程序
 */
public class MultiThreadConsumer {

    public static void main(String[] args) { 
          
        int numConsumers = 3;
        String groupId = "dev3-yangyunhe-topic001-group004";
        List<String> topics = Arrays.asList("dev3-yangyunhe-topic001");
        ExecutorService executor = Executors.newFixedThreadPool(numConsumers);

        final List<ConsumerLoop> consumers = new ArrayList<>();
        for (int i = 0; i < numConsumers; i++) {
            ConsumerLoop consumer = new ConsumerLoop(i, groupId, topics);
            consumers.add(consumer);
            executor.submit(consumer);
        }

        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                for (ConsumerLoop consumer : consumers) {
                    consumer.shutdown();
                } 
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

運(yùn)行結(jié)果:
2: {partition=2, offset=1216, value=...}
......
1: {partition=1, offset=1329, value=...}
......
0: {partition=0, offset=1292, value=...}
......

3. 獨(dú)立消費(fèi)者

有時(shí)候你可能只需要一個(gè)消費(fèi)者從一個(gè)主題的所有分區(qū)或者某個(gè)特定的分區(qū)讀取數(shù)據(jù)。這個(gè)時(shí)候就不需要消費(fèi)者群組和再均衡了,只需要把主題或者分區(qū)分配給消費(fèi)者,然后開始讀取消息并提交偏移量。如果是這樣的話,就不需要訂閱主題,取而代之的是為自己分配分區(qū)。一個(gè)消費(fèi)者可以訂閱主題(并加入消費(fèi)者群組),或者為自己分配分區(qū),但不能同時(shí)做這兩件事情。以下是獨(dú)立消費(fèi)者的示例代碼:

package com.bonc.rdpe.kafka110.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;

/**
 * @author YangYunhe
 * @date 2018-07-17 12:44:50
 * @description: 獨(dú)立消費(fèi)者
 */
public class AloneConsumer {
    
    public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put("bootstrap.servers", "rdpecore4:9092,rdpecore5:9092,rdpecore6:9092");
        // 獨(dú)立消費(fèi)者不需要設(shè)置消費(fèi)組
        // props.put("group.id", "dev3-yangyunhe-topic001-group003");
        props.put("auto.offset.reset", "earliest");
        props.put("auto.commit.offset", false);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        /*
         * consumer.partitionsFor(topic)用于獲取topic的分區(qū)信息
         * 當(dāng)有新的分區(qū)加入或者原有的分區(qū)被改變后,這個(gè)方法是不能動(dòng)態(tài)感知的
         * 所以要么周期性的執(zhí)行這個(gè)方法,要么當(dāng)分區(qū)數(shù)改變的時(shí)候,你需要重新執(zhí)行這個(gè)程序
         */
        List<PartitionInfo> partitionInfos = consumer.partitionsFor("dev3-yangyunhe-topic001");
        List<TopicPartition> partitions = new ArrayList<>();
        
        if(partitionInfos != null && partitionInfos.size() != 0) {
            for (PartitionInfo partition : partitionInfos) {
                partitions.add(new TopicPartition(partition.topic(), partition.partition()));
            }
            
            consumer.assign(partitions);
            
            try {
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("partition = " + record.partition() + ", offset = " + record.offset());
                    }
                    consumer.commitAsync();
                }
            } finally {
                consumer.commitSync();
                consumer.close();
            } 
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 原文地址 當(dāng)kafka最初被創(chuàng)建的時(shí)候,它附帶一個(gè)Scala的生產(chǎn)者和消費(fèi)者客戶端。隨著時(shí)間的推移,我們逐漸意識(shí)到...
    堯字節(jié)閱讀 6,720評(píng)論 0 9
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,635評(píng)論 19 139
  • 1. 訂閱主題 (1)訂閱主題的全部分區(qū) (2) 用正則表達(dá)式來訂閱主題的全部分區(qū) (3) 訂閱指定的分區(qū) 2. ...
    CoderJed閱讀 3,129評(píng)論 0 2
  • 騎行中,無意間看到一棵樹,瞬間有一種被震撼到的感覺。也許是落日相映,也許是心有所悟,仿佛在蒼穹之中勃然英發(fā)。 獨(dú)自...
    牧戎云越閱讀 368評(píng)論 0 0
  • 也許一個(gè)人最好的狀態(tài),就是你的本事配的上你的情懷,你可以腳踏實(shí)地,又可以仰望星空,從容不迫的與歲月相處。而你心中所...
    左眼殤暮光閱讀 275評(píng)論 5 12

友情鏈接更多精彩內(nèi)容