kafka——Consumer API

一、Kafka 核心 API

下圖是官方文檔中的一個圖,形象的描述了能與 Kafka集成的客戶端類型


Kafka的五類客戶端API類型如下:

  • AdminClient API:允許管理和檢測Topic、broker以及其他Kafka實例,與Kafka自帶的腳本命令作用類似。
  • Producer API:發(fā)布消息到1個或多個Topic,也就是生產(chǎn)者或者說發(fā)布方需要用到的API。
  • Consumer API:訂閱1個或多個Topic,并處理產(chǎn)生的消息,也就是消費者或者說訂閱方需要用到的API。
  • Stream API:高效地將輸入流轉(zhuǎn)換到輸出流,通常應(yīng)用在一些流處理場景。
  • Connector API:從一些源系統(tǒng)或應(yīng)用程序拉取數(shù)據(jù)到Kafka,如上圖中的DB。

本文中,我們將主要介紹 Consumer API。

二、Consumer API

Consumer 消費數(shù)據(jù)時的可靠性是很容易保證的,因為數(shù)據(jù)在 Kafka 中是持久化的,故
不用擔(dān)心數(shù)據(jù)丟失問題。

由于 consumer 在消費過程中可能會出現(xiàn)斷電宕機等故障,consumer 恢復(fù)后,需要從故
障前的位置的繼續(xù)消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復(fù)后繼續(xù)消費。

所以 offset 的維護是 Consumer 消費數(shù)據(jù)是必須考慮的問題。

2.1、導(dǎo)入相關(guān)依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.6.0</version>
</dependency>

需要用到的類:

  • KafkaConsumer:需要創(chuàng)建一個消費者對象,用來消費數(shù)據(jù)。
  • ConsumerConfig:獲取所需的一系列配置參數(shù)。
  • ConsuemrRecord:每條數(shù)據(jù)都要封裝成一個 ConsumerRecord 對象。

自動提交 offset 的相關(guān)參數(shù):

  • enable.auto.commit:是否開啟自動提交 offset 功能
  • auto.commit.interval.ms:自動提交 offset 的時間間隔

2.2、自動提交 offset

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 工作中這種用法有,但是不推薦
 */
public static void helloWorld(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消費者組,只要 group.id 相同,就屬于同一個消費者組
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //自動提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
    //自動提交的延遲
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化類
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消費訂閱一個或多個topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每間隔一定時間去拉取消息
        ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                    record.partition(),record.offset(),record.key(),record.value());
        }
    }
}

雖然自動提交 offset 十分簡介便利,但由于其是基于時間提交的,開發(fā)人員難以把握
offset 提交的時機。因此 Kafka 還提供了手動提交 offset 的 API。

手動提交 offset 的方法有兩種:分別是 commitSync(同步阻塞提交)和 commitAsync(異步提交)。兩者的相同點是,都會將本次 poll 的一批數(shù)據(jù)最高的偏移量提交;不同點是,commitSync 阻塞當(dāng)前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失?。欢?commitAsync 則沒有失敗重試機制,故有可能提交失敗。

2.3、手動提交offset

  • 雖然同步提交 offset 更可靠一些,但是由于其會阻塞當(dāng)前線程,直到提交成功。因此吞
    吐量會收到很大的影響。因此更多的情況下,會選用異步提交 offset 的方式。
private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手動提交offset
 */
public static void commitedOffset(){
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消費訂閱一個或多個topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每間隔一定時間去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            for (ConsumerRecord<String, String> record : records) {
                //消息的消費

                System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                        record.partition(),record.offset(),record.key(),record.value());

                //業(yè)務(wù)處理異常,則不提交
                //throw new RuntimeException("業(yè)務(wù)處理異常");
            }
            //手動控制offset提交
            consumer.commitAsync();
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.4、手動提交offset,并重置offset

重置消費者的offset,該配置生效方式:消費者換組

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手動提交offset,并重置offset
 */
public static void restCommitedOffset(){
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

    //重置消費者的offset,該配置生效方式:消費者換組
    properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消費訂閱一個或多個topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每間隔一定時間去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            for (ConsumerRecord<String, String> record : records) {
                //消息的消費

                System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                        record.partition(),record.offset(),record.key(),record.value());

                //業(yè)務(wù)處理異常,則不提交
                //throw new RuntimeException("業(yè)務(wù)處理異常");
            }
            //手動控制offset提交
            consumer.commitAsync();
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.5、手動提交offset,并且手動控制Partition

  • 由于同步提交 offset 有失敗重試機制,故更加可靠。
private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手動提交offset,并且手動控制Partition
 */
public static void commitedOffsetWithPartition(){
    Properties properties = new Properties();
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
    //消費訂閱一個或多個topic
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每間隔一定時間去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            //每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {

                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //業(yè)務(wù)處理異常,則不提交
                    //throw new RuntimeException("業(yè)務(wù)處理異常");
                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //單個partition中的offset,并且進行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //對每個Partition做單獨的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.6、手動提交offset,并且手動控制Partition,更高級

  • 手動訂閱某個或某些partition,手動提交offset
private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手動提交offset,并且手動控制Partition,更高級
 */
public static void commitedOffsetWithPartition2(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消費者組,只要 group.id 相同,就屬于同一個消費者組
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //關(guān)閉自動提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    //自動提交的延遲
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化類
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
    TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);

    //消費訂閱某個Topic的某個分區(qū)
    consumer.assign(Arrays.asList(p0));

    //消費訂閱一個或多個topic
//        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每間隔一定時間去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            //每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {

                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //業(yè)務(wù)處理異常,則不提交
                    //throw new RuntimeException("業(yè)務(wù)處理異常");
                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //單個partition中的offset,并且進行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //對每個Partition做單獨的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.7、手動指定Offset的起始位置,及手動提交Offset

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 手動指定Offset的起始位置,及手動提交Offset
 */
public static void controlOffset(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消費者組,只要 group.id 相同,就屬于同一個消費者組
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //關(guān)閉自動提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    //自動提交的延遲
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化類
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);

    //消費訂閱某個Topic的某個分區(qū)
    consumer.assign(Arrays.asList(p0));

    while(true){
        try {

            /**
             * 1、人為控制Offset起始位置
             * 2、如果出現(xiàn)程序錯誤,重復(fù)消費一次
             */
            /**
             * 1、第一次從0消費[一般情況]
             * 2、比如一次消費了100條,offset置為101并且存入redis中
             * 3、每次poll之前從redis中獲取最新的Offset位置
             * 4、每次從這個位置開始消費
             */
            //手動指定Offset的起始位置
            consumer.seek(p0,30);

            //每間隔一定時間去拉取消息
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
            //每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {

                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //業(yè)務(wù)處理異常,則不提交
                    //throw new RuntimeException("業(yè)務(wù)處理異常");
                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //單個partition中的offset,并且進行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //對每個Partition做單獨的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.8、流量控制 限流

引入guava依賴

<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>29.0-jre</version>
</dependency>
private static final String TOPIC_NAME = "yibo_topic";

/*** 令牌生成速率,單位為秒 */
public static final int permitsPerSecond = 1;

/*** 限流器 */
private static final RateLimiter LIMITER = RateLimiter.create(permitsPerSecond);

/**
 * 流量控制 限流
 */
public static void controlPause(){
    Properties properties = new Properties();
    //Kafka 集群
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
    //消費者組,只要 group.id 相同,就屬于同一個消費者組
    properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
    //關(guān)閉自動提交 offset
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
    //自動提交的延遲
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
    //key,value的反序列化類
    properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
    KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);

    TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
    TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);

    //消費訂閱某個Topic的某個分區(qū)
    consumer.assign(Arrays.asList(p0,p1));

    //消費訂閱一個或多個topic
//        consumer.subscribe(Arrays.asList(TOPIC_NAME));
    while(true){
        //每間隔一定時間去拉取消息
        try {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));

            //每個partition單獨處理
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> recordList = records.records(partition);
                for (ConsumerRecord<String, String> record : recordList) {
                    /**
                     * 1、接收到record信息后,去令牌桶中拿取令牌
                     * 2、如果獲取到令牌,則繼續(xù)業(yè)務(wù)處理
                     * 3、如果獲取不到令牌,則pause等待令牌
                     * 4、當(dāng)令牌桶中的令牌足夠,則將consumer置為resume狀態(tài)
                     */
                    // 限流
                    if (!LIMITER.tryAcquire()) {
                        System.out.println("無法獲取到令牌,暫停消費");
                        consumer.pause(Arrays.asList(p0, p1));
                    }else {
                        System.out.println("獲取到令牌,恢復(fù)消費");
                        consumer.resume(Arrays.asList(p0, p1));
                    }


                    System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
                            record.partition(),record.offset(),record.key(),record.value());

                    //業(yè)務(wù)處理異常,則不提交
//                    throw new RuntimeException("業(yè)務(wù)處理異常");


                }
                long lastOffset = recordList.get(recordList.size() - 1).offset();
                //單個partition中的offset,并且進行提交
                Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
                offset.put(partition,new OffsetAndMetadata(lastOffset+1));
                //對每個Partition做單獨的offset提交
                consumer.commitSync(offset);

                System.out.println("--------------------- partition - " + partition + "end---------------------");
            }
        } catch (Exception e) {
            log.error("consumer offset error",e);
        }
    }
}

2.9、Consumer多線程并發(fā)控制

  • 經(jīng)典模式,每一個線程單獨創(chuàng)建一個KafkaConsumer,用于保證線程安全。
public class ConsumerThreadSample {

    private static final String TOPIC_NAME = "yibo_topic";

    /**
     * 這種類型是經(jīng)典模式,每一個線程單獨創(chuàng)建一個KafkaConsumer,用于保證線程安全
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        KafkaConsumerRunner r1 = new KafkaConsumerRunner();
        Thread t1 = new Thread(r1);

        t1.start();

        Thread.sleep(15000);

        r1.shutdown();
    }

    public static class KafkaConsumerRunner implements Runnable{

        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer<String,String> consumer;

        public KafkaConsumerRunner() {
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.174.128:9092");
            props.put("group.id", "test");
            props.put("enable.auto.commit", "false");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<>(props);

            TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
            TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);

            consumer.assign(Arrays.asList(p0, p1));
        }

        @Override
        public void run() {
            try {
                while(!closed.get()){
                    //處理消息
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));

                    for (TopicPartition partition : records.partitions()) {
                        List<ConsumerRecord<String, String>> pRecord = records.records(partition);
                        // 處理每個分區(qū)的消息
                        for (ConsumerRecord<String, String> record : pRecord) {
                            System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                                    record.partition(),record.offset(), record.key(), record.value());
                        }

                        // 返回去告訴kafka新的offset
                        long lastOffset = pRecord.get(pRecord.size() - 1).offset();
                        // 注意加1
                        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                    }
                }
            } catch (Exception e) {
                if(!closed.get()) {
                    throw e;
                }
            } finally {
                consumer.close();
            }
        }

        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    }
}

2.10、一個Consumer處理數(shù)據(jù),但處理業(yè)務(wù)是多線程并發(fā)控制

  • 采用線程池異步處理,此種方式無法手動控制Offset提交,不能保證消息的最終一致性。
public class ConsumerRecordThreadSample {

    private static final String TOPIC_NAME = "yibo_topic";

    public static void main(String[] args) throws InterruptedException {
        String brokerList = "192.168.174.128:9092";
        String groupId = "test";
        int workerNum = 5;

        CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
        consumers.execute(workerNum);

        Thread.sleep(1000000);

        consumers.shutdown();

    }

    // Consumer處理
    public static class CunsumerExecutor{
        private final KafkaConsumer<String, String> consumer;
        private ExecutorService executors;

        public CunsumerExecutor(String brokerList, String groupId, String topic) {
            Properties props = new Properties();
            props.put("bootstrap.servers", brokerList);
            props.put("group.id", groupId);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
        }

        public void execute(int workerNum) {
            executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());

            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(200);
                for (final ConsumerRecord record : records) {
                    executors.submit(new ConsumerRecordWorker(record));
                }
            }
        }

        public void shutdown() {
            if (consumer != null) {
                consumer.close();
            }
            if (executors != null) {
                executors.shutdown();
            }
            try {
                if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.out.println("Timeout.... Ignore for this case");
                }
            } catch (InterruptedException ignored) {
                System.out.println("Other thread interrupted this shutdown, ignore for this case.");
                Thread.currentThread().interrupt();
            }
        }
    }

    // 記錄處理
    public static class ConsumerRecordWorker implements Runnable {

        private ConsumerRecord<String, String> record;

        public ConsumerRecordWorker(ConsumerRecord record) {
            this.record = record;
        }

        @Override
        public void run() {
            //具體的業(yè)務(wù)邏輯,比如數(shù)據(jù)入庫操作
            System.out.println("Thread - "+ Thread.currentThread().getName());
            System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
                    record.partition(), record.offset(), record.key(), record.value());
        }

    }
}

2.11、數(shù)據(jù)漏消費和重復(fù)消費分析

無論是同步提交還是異步提交 offset,都有可能會造成數(shù)據(jù)的漏消費或者重復(fù)消費。先
提交 offset 后消費,有可能造成數(shù)據(jù)的漏消費;而先消費后提交 offset,有可能會造成數(shù)據(jù)的重復(fù)消費。

2.12、自定義存儲 offset

Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之后,默認(rèn)將 offset 存儲在 Kafka的一個內(nèi)置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。

offset 的維護是相當(dāng)繁瑣的,因為需要考慮到消費者的 Rebalace。
當(dāng)有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區(qū)發(fā)
生變化,就會觸發(fā)到分區(qū)的重新分配,重新分配的過程叫做 Rebalance。

消費者發(fā)生 Rebalance 之后,每個消費者消費的分區(qū)就會發(fā)生變化。因此消費者要首先獲取到自己被重新分配到的分區(qū),并且定位到每個分區(qū)最近提交的 offset 位置繼續(xù)消費。

要實現(xiàn)自定義存儲 offset,需要借助 ConsumerRebalanceListener,以下為示例代碼,其中提交和獲取 offset 的方法,需要根據(jù)所選的 offset 存儲系統(tǒng)自行實現(xiàn)。

public class CustomConsumer {
    
    private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
    
    public static void main(String[] args) {
        //創(chuàng)建配置信息
        Properties props = new Properties();
        //Kafka 集群
        props.put("bootstrap.servers","192.168.174.128:9092");
        //消費者組,只要 group.id 相同,就屬于同一個消費者組
        props.put("group.id", "test");
        //關(guān)閉自動提交 offset
        props.put("enable.auto.commit", "false");
        //Key 和 Value 的反序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //創(chuàng)建一個消費者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消費者訂閱主題
        consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

                    //該方法會在 Rebalance 之前調(diào)用
                    @Override
                    public void
                    onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        commitOffset(currentOffset);
                    }
                    //該方法會在 Rebalance 之后調(diào)用
                    @Override
                    public void
                    onPartitionsAssigned(Collection<TopicPartition> partitions) { currentOffset.clear();
                        for (TopicPartition partition : partitions) {
                            consumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置繼續(xù)消費
                        }
                    }
                });
        while (true) {
            ConsumerRecords<String, String> records =
                    consumer.poll(100);//消費者拉取數(shù)據(jù)
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
            }
            commitOffset(currentOffset);//異步提交
        }
    }
    
    //獲取某分區(qū)的最新 offset
    private static long getOffset(TopicPartition partition) {
        return 0;
    }
    
    //提交該消費者所有分區(qū)的 offset
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
        
    }
}

三、SpringBoot 集成 Kafka

3.1、添加maven依賴

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.6.1</version>
</dependency>

3.2、配置 application.properties

# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=192.168.174.128:9092

#=============== provider  =======================
# 寫入失敗時,重試次數(shù)。當(dāng)leader節(jié)點失效,一個repli節(jié)點會替代成為leader節(jié)點,此時可能出現(xiàn)寫入失敗,
# 當(dāng)retris為0時,produce不會重復(fù)。retirs重發(fā),此時repli節(jié)點完全成為leader節(jié)點,不會產(chǎn)生消息丟失。
spring.kafka.producer.retries=0
# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送
spring.kafka.producer.batch-size=16384
# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達到buffer.memory就發(fā)送數(shù)據(jù)
spring.kafka.producer.buffer-memory=33554432

#procedure要求leader在考慮完成請求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:
#acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會等待來自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務(wù)器已收到記錄,并且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。
#acks = 1 這意味著leader會將記錄寫入其本地日志,但無需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個同步副本服務(wù)器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當(dāng)于acks = -1的設(shè)置。
#可以設(shè)置的值為:all, -1, 0, 1
spring.kafka.producer.acks=1

# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  =======================
# 指定默認(rèn)消費者group id --> 由于在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設(shè)置組名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 設(shè)置自動提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#=============== listener  =======================
# 在偵聽器容器中運行的線程數(shù)。
spring.kafka.listener.concurrency=5
#listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false

3.3、新建Consumer

@Component
@Slf4j
public class KafkaDemoConsumer {

    private static final String TOPIC_NAME = "yibo_topic";
    private static final String TOPIC_GROUP1 = "topic_group1";
    private static final String TOPIC_GROUP2 = "topic_group2";

    @KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP1)
    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test 消費了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP2)
    public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("topic_test1 消費了: Topic:" + topic + ",Message:" + msg);
            ack.acknowledge();
        }
    }
}

參考:
https://www.cnblogs.com/L-Test/p/13447269.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容