「kafka」kafka-clients,java編寫消費(fèi)者客戶端及原理剖析

與生產(chǎn)者對應(yīng)的是消費(fèi)者,應(yīng)用程序通過KafkaConsumer來訂閱主題,并從訂閱的主題中拉取消息。不過我們需要先了解消費(fèi)者和消費(fèi)組的概念,否則無法理解如何使用KafkaConsumer。

消費(fèi)者與消費(fèi)組

每個(gè)消費(fèi)者對應(yīng)一個(gè)消費(fèi)組,當(dāng)消息發(fā)布到主題后,只會(huì)被投遞給訂閱它的每個(gè)消費(fèi)組中的一個(gè)消費(fèi)者。如下圖所示:



主題中,共有四個(gè)分區(qū),P0、P1、P2、P3,有兩個(gè)消費(fèi)組A和B都訂閱了這個(gè)主題,消費(fèi)組A中有4個(gè)消費(fèi)者(C0、C1、C2、C3),消費(fèi)者B有兩個(gè)消費(fèi)者(C4、C5)。按照Kafka默認(rèn)的規(guī)則,消費(fèi)組A中的每一個(gè)消費(fèi)者分配到一個(gè)分區(qū),消費(fèi)組B中每一個(gè)消費(fèi)者分配到兩個(gè)分區(qū),兩個(gè)消費(fèi)組之間互不影響。
每個(gè)消費(fèi)者只能消費(fèi)被分配到的分區(qū)中的消息。換言之,每個(gè)分區(qū)只能被一個(gè)消費(fèi)組中的一個(gè)消費(fèi)者所消費(fèi)。
再來看一下消費(fèi)組內(nèi)消費(fèi)者的個(gè)數(shù)變化時(shí)所對應(yīng)分區(qū)分配的演變。假設(shè)目前某消費(fèi)組內(nèi)只有一個(gè)消費(fèi)者C0,訂閱了一個(gè)主題,這個(gè)主題包含7個(gè)分區(qū),P0/P1/P2/P3/P4/P5/P6。也就是說,這個(gè)消費(fèi)者訂閱了7個(gè)分區(qū):



此時(shí)消費(fèi)組內(nèi)又增加了新的消費(fèi)者C1,按照既定的邏輯,需要將原來消費(fèi)者C0的部分分區(qū)分配給消費(fèi)者C1消費(fèi):

C0和C1各自消費(fèi)所分配的分區(qū),彼此間并無邏輯上的干擾。緊接著又增加消費(fèi)者C2:



消費(fèi)者與消費(fèi)組這種模型可以讓整體的消費(fèi)能力具備橫向伸縮,我們可以增加(減少)消費(fèi)者的個(gè)數(shù)來提高(或降低)整體的消費(fèi)能力。對于分區(qū)數(shù)固定的情況,一味地增加消費(fèi)者并不能讓消費(fèi)能力一直得到增強(qiáng),如果消費(fèi)者過多,出現(xiàn)了消費(fèi)者個(gè)數(shù)大于分區(qū)個(gè)數(shù)的情況,就會(huì)有消費(fèi)者分配不到任何分區(qū)。如下圖所示:

以上分配策略都是基于默認(rèn)的分區(qū)分配策略進(jìn)行分析的,可以通過消費(fèi)者客戶端參數(shù)partition.assignment.strategy來設(shè)置消費(fèi)者與訂閱主題之間的分區(qū)分配策略。

對于消息中間件而言,一般有兩種消息投遞模式:點(diǎn)對點(diǎn)模式和發(fā)布/訂閱模式。點(diǎn)對點(diǎn)模式是基于隊(duì)列的,消息生產(chǎn)者發(fā)送消息到隊(duì)列,消息消費(fèi)者從隊(duì)列中接收消息。發(fā)布訂閱模式以主題為內(nèi)容節(jié)點(diǎn),主題可以認(rèn)為是消息傳遞的中介,使得消息訂閱者和發(fā)布者保持獨(dú)立,不需要進(jìn)行接觸即可保持消息的傳遞,在消息的一對多廣播時(shí)采用。

  • 如果消費(fèi)者都屬于同一消費(fèi)組,那么所有的消息都會(huì)被均衡的投遞給每一個(gè)消費(fèi)者,即每條消息都只會(huì)被一個(gè)消費(fèi)者處理,這就相當(dāng)于點(diǎn)對點(diǎn)模式的應(yīng)用。
  • 如果所有消費(fèi)者都隸屬于不同的消費(fèi)組,那么所有的消息都會(huì)被廣播給所有的消費(fèi)者,即每條消息都會(huì)被所有的消費(fèi)者處理,這就相當(dāng)于訂閱/發(fā)布應(yīng)用。

可以通過消費(fèi)者客戶端參數(shù)group.id來配置,默認(rèn)值為空字符串。消費(fèi)組是邏輯上的概念,它將消費(fèi)者進(jìn)行歸類,消費(fèi)者并非邏輯上的概念,它是實(shí)際上的應(yīng)用實(shí)例,它可以是一個(gè)線程,也可以是一個(gè)進(jìn)程,同一個(gè)消費(fèi)組內(nèi)的消費(fèi)者可以部署在同一臺(tái)機(jī)器上,也可以部署在不同的機(jī)器上。

客戶端開發(fā)

采用目前流行的新消費(fèi)者(java語言編寫)客戶端。
一個(gè)正產(chǎn)的消費(fèi)邏輯需要以下幾個(gè)步驟

  1. 配置消費(fèi)者客戶端參數(shù)及創(chuàng)建響應(yīng)的客戶端實(shí)例。
  2. 訂閱主題。
  3. 拉取消息并消費(fèi)。
  4. 提交消費(fèi)位移。
  5. 關(guān)閉消費(fèi)者實(shí)例。
    一個(gè)基本的消費(fèi)者案例如下:
public class Consumer {
    public static final String brokerList = "192.168.0.138:9092";
    public static final String topic = "topic-demo";
    public static final String group = "group-id";
    public static final String client = "client-id";

    public  static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }
    public static void main(String[] args) {

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(initConfig());
        consumer.subscribe(Collections.singletonList(topic));
        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record->{
                    System.out.println("topic="+record.topic()+",  partition="+record.partition()+",  offset="+record.offset());
                    System.out.println("key="+record.key()+", value="+record.value());
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}
  • bootstrap.servers:指定kafka集群地址,可以設(shè)置一個(gè)或多個(gè),用逗號(hào)隔開。注意這里不需要設(shè)置集群中全部的broker地址,消費(fèi)者會(huì)從現(xiàn)有的配置中查找全部的集群成員。如果只設(shè)置一個(gè)地址,啟動(dòng)時(shí)為避免該地址機(jī)器宕機(jī)連接不到集群,最好設(shè)置兩個(gè)或兩個(gè)以上地址。

  • key.deserializer和value.deserializer:與生產(chǎn)者客戶端參數(shù)相對應(yīng)。消費(fèi)者從kafka獲取到的消息格式都是字節(jié)數(shù)組(byte[]),所以需要執(zhí)行相應(yīng)的反序列化操作才能還原成原有的對象格式。

  • client.id:客戶端id,如果不設(shè)置,會(huì)自動(dòng)生成一個(gè)非空字符串,內(nèi)容形式為consumer-1,consumer-2這種格式。

消費(fèi)者客戶端參數(shù)眾多,在這里羅列講解沒有意義,之后會(huì)一一詳解。

訂閱主題與分區(qū)

一個(gè)消費(fèi)者可以訂閱一個(gè)或多個(gè)主題。如上代碼示例,通過consumer.subscribe方式訂閱主題,對于這個(gè)方法而言,既可以以集合的方式訂閱多個(gè)主題,也可以以正則表達(dá)式的形式訂閱特定模式的主題。subscribe的幾個(gè)重載的方法如下:

void subscribe(Collection<String> var1);
void subscribe(Collection<String> var1, ConsumerRebalanceListener var2);
void assign(Collection<TopicPartition> var1);
void subscribe(Pattern var1, ConsumerRebalanceListener var2);
void subscribe(Pattern var1);

如果消費(fèi)者采用的是正則表達(dá)式的方式訂閱,在之后的創(chuàng)建過程中,如果有人又創(chuàng)建了新的主題,并且主題的名字與正則表達(dá)式相匹配,那么這個(gè)消費(fèi)者就可以消費(fèi)到新添加的主題中的消息。如果應(yīng)用程序需要消費(fèi)多個(gè)主題,并且可以處理不同的類型,那么這種訂閱方式就很有效。在kafka和其他系統(tǒng)之間進(jìn)行數(shù)據(jù)賦值時(shí),這種正則表達(dá)式的方式顯得很常見。

consumer.subscribe(Pattern.compile("topic-.*"));

重載方法中有一個(gè)ConsumerRebalanceListener ,這個(gè)是用來設(shè)置相應(yīng)的再均衡監(jiān)聽器的,之后會(huì)講。

消費(fèi)者不但可以訂閱主題,還可以直接訂閱主題的特定分區(qū),通過assign方法來實(shí)現(xiàn)這一功能。

void assign(Collection<TopicPartition> partitions);

這個(gè)方法只接受一個(gè)參數(shù)partitions,用來指定分區(qū)集合。關(guān)于TopicPartition類,用來表示分區(qū),這個(gè)類的內(nèi)部結(jié)果如下所示:

public final class TopicPartition implements Serializable {
    private final int partition;
    private final String topic;
其他省略

有兩個(gè)屬性,partition和topic,分別代表自身的分區(qū)編號(hào)和主題名稱,這個(gè)類和我們所說的主題-分區(qū)概念對應(yīng)起來。在案例代碼清單中,我們使用assign方法替代subscribe方法,訂閱主題topic-demo的分區(qū)0。

consumer.assign(Arrays.asList(new TopicPartition("topic-demo",0)));//訂閱主題topic-demo的分區(qū)0

如果,我們事先不知道主題中有多少個(gè)分區(qū)怎么辦?partitionsFor方法可以用來查詢指定主題的元數(shù)據(jù)信息,定義如下:

 List<PartitionInfo> partitionsFor(String topic);

其中,PartitionInfo類型即為主題的分區(qū)元數(shù)據(jù)信息:

public class PartitionInfo {
    private final String topic;//主題名稱
    private final int partition;//分區(qū)編號(hào)
    private final Node leader;//leader副本所在的位置
    private final Node[] replicas;//分區(qū)的AR集合
    private final Node[] inSyncReplicas;//分區(qū)的ISR集合
    private final Node[] offlineReplicas;//分區(qū)的OSR集合

通過partitionsFor方法的協(xié)助,我們可以通過assign方法來實(shí)現(xiàn)訂閱主題全部分區(qū)的功能:

List<TopicPartition> partitions = new ArrayList<>();
 //獲取主題的全部分區(qū)
consumer.partitionsFor("topic-demo").forEach(partitionInfo -> {
      System.out.println("分區(qū):"+partitionInfo.partition());
      partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
    });
    //通過assign方法來實(shí)現(xiàn)訂閱主題全部分區(qū)
   consumer.assign(partitions);

既然有訂閱,那就有取消訂閱,可以使用unsubscribe方法取消訂閱。

//取消訂閱
consumer.unsubscribe();

集合訂閱的方式、正則表達(dá)式的訂閱方式和指定分區(qū)的訂閱方式,分別代表了3種不同的訂閱狀態(tài):AUTO_TOPICS、AUTO_PATTERN、USER_ASSIGNED,如果沒有訂閱那么狀態(tài)為NONE。這三種狀態(tài)是互斥的,在一個(gè)消費(fèi)者中,只能使用其中的一種。通過sbscribe方法訂閱的主題具有消費(fèi)者自動(dòng)再均衡的功能,在多個(gè)消費(fèi)者的情況下根據(jù)分區(qū)策略來自動(dòng)分配各個(gè)消費(fèi)者與分區(qū)的關(guān)系。當(dāng)消費(fèi)組內(nèi)的消費(fèi)者增加或減少時(shí),分區(qū)分配關(guān)系會(huì)自動(dòng)調(diào)整,以實(shí)現(xiàn)消費(fèi)負(fù)載均衡及故障自動(dòng)轉(zhuǎn)移。而通過assign方法訂閱分區(qū)時(shí),是不具備消費(fèi)者自動(dòng)均衡的功能。

反序列化

「kafka」kafka-clients,java編寫生產(chǎn)者客戶端及原理剖析我們講過了生產(chǎn)者的序列化與消費(fèi)者的反序列化程序demo。Kafka提供的反序列器有ByteBufferDeserializer、ByteArrayDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、LongDeserializer、ShortDeserializer、StringDeserializer,這些反序列化器都實(shí)現(xiàn)了Deserializer接口,該接口有三個(gè)方法:

 void configure(Map<String, ?> var1, boolean var2);//用來配置當(dāng)前類
 T deserialize(String var1, byte[] var2);//用來執(zhí)行反序列化
 void close();//關(guān)閉當(dāng)前序列化器

我們來看一下StringDeserilizer的源碼:

public class StringDeserializer implements Deserializer<String> {
    private String encoding = "UTF8";

    public StringDeserializer() {
    }

    public void configure(Map<String, ?> configs, boolean isKey) {
        String propertyName = isKey ? "key.deserializer.encoding" : "value.deserializer.encoding";
        Object encodingValue = configs.get(propertyName);
        if (encodingValue == null) {
            encodingValue = configs.get("deserializer.encoding");
        }

        if (encodingValue instanceof String) {
            this.encoding = (String)encodingValue;
        }

    }
    public String deserialize(String topic, byte[] data) {
        try {
            return data == null ? null : new String(data, this.encoding);
        } catch (UnsupportedEncodingException var4) {
            throw new SerializationException("Error when deserializing byte[] to string due to unsupported encoding " + this.encoding);
        }
    }
    public void close() {
    }
}

configure方法用來定義編碼格式,默認(rèn)就UTF-8就好了,不用管這個(gè)。我們看一下自定義反序列化器,只要實(shí)現(xiàn)了Deserializer接口即可:

public class UserDeserializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public User deserialize(String s, byte[] bytes) {
        ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
        int nameLength = byteBuffer.getInt();
        byte name[] = new byte[nameLength];
        byteBuffer.get(name,0,nameLength);
        int age = byteBuffer.getInt();

        return new User().setAge(age).setName(new String(name));
    }

    @Override
    public void close() {

    }
}


public class User {
    private String name;
    private int age = -1;

    public String getName() {
        return name;
    }

    public User setName(String name) {
        this.name = name;
        return this;
    }

    public int getAge() {
        return age;
    }

    public User setAge(int age) {
        this.age = age;return this;
    }
}

總之,就是將kafka返回的字節(jié)序列轉(zhuǎn)化成你的業(yè)務(wù)對象。關(guān)于序列化,我會(huì)在之后寫一篇當(dāng)下流行的序列化方法匯總的博文,比如Avro、JSON、Thrif、ProtoBuf或Protostuff等,歡迎關(guān)注。
這里簡單舉一例,用Protostuff來實(shí)現(xiàn)序列化與反序列化:

//依賴
    <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>1.7.2</version>
        </dependency>
        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>1.7.2</version>
        </dependency>

序列化:

public class ProtostuffUserSerializer implements Serializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String s, User user) {
        if (user == null){
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(user.getClass());
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        byte[] protostuff = null;
        protostuff = ProtostuffIOUtil.toByteArray(user,schema,buffer);
        buffer.clear();
        return protostuff;
    }

    @Override
    public void close() {

    }
}

反序列化

public class ProtostuffUserDesirializer implements Deserializer<User> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public User deserialize(String s, byte[] bytes) {

        if (bytes == null) {
            return null;
        }
        Schema schema = RuntimeSchema.getSchema(User.getClass());
        User user = new User();
        ProtostuffIOUtil.mergeFrom(bytes, user, schema);
        return user;
    }

    @Override
    public void close() {

    }
}

消息消費(fèi)

Kafka的消費(fèi)是基于拉模式的。消息的消費(fèi)一般有兩種模式:推模式和拉模式。推模式是服務(wù)器主動(dòng)將消息推送給消費(fèi)者,拉模式是消費(fèi)者向服務(wù)端發(fā)送請求拉取消息。

從代碼示例中可以看出,消費(fèi)是一個(gè)不斷輪詢的過程,消費(fèi)者重復(fù)調(diào)用poll方法,返回的是所訂閱主題(分區(qū))上的一組消息。

返回的消息類型為ConsumerRecord,源碼如下所示:


public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = -1L;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;
    private final String topic;//主題
    private final int partition;//分區(qū)
    private final long offset;//所屬分區(qū)偏移量
    private final long timestamp;//時(shí)間戳
    //兩種類型,CreateTime  和 LogAppendTime
   //分別代表消息創(chuàng)建的時(shí)間,追加到日志的時(shí)間
    private final TimestampType timestampType;
    private final int serializedKeySize;//key經(jīng)過序列化后的大小,如果key為空,該值為-1
    private final int serializedValueSize;//value經(jīng)過序列化后的大小,如果value為空,該值為-1
    private final Headers headers;//消息的頭部內(nèi)容
    private final K key;//消息的鍵
    private final V value;//消息的值
    private final Optional<Integer> leaderEpoch;
    private volatile Long checksum;//CRC32的校驗(yàn)值
部分省略

實(shí)例代碼中,我們通過遍歷消息集合處理每一條消息,除此之外,我們還可以按照分區(qū)維度來進(jìn)行消費(fèi),這一點(diǎn)很有用,在手動(dòng)提交位移時(shí)尤為明顯,ConsumerRecords提供了一個(gè)records(TopicPartition)方法來獲取消息中指定分區(qū)的消息,此方法的定義如下:

    public List<ConsumerRecord<K, V>> records(TopicPartition partition) {
        List<ConsumerRecord<K, V>> recs = (List)this.records.get(partition);
        return recs == null ? Collections.emptyList() : Collections.unmodifiableList(recs);
    }

修改實(shí)例代碼,將所有消息按分區(qū)處理:

 //按分區(qū)處理消息
   for (TopicPartition tp : records.partitions()){//獲取所有分區(qū)
        for (ConsumerRecord<String, String> record : records.records(tp)){//獲取指定分區(qū)的消息
            System.out.println("partition:"+record.partition()+"----value:"+record.value());
        }
    }

此外,ConsumerRecords類中還提供了幾個(gè)方法來方便開發(fā)人員對消息集進(jìn)行處理:

  • count方法,獲取消息個(gè)數(shù)
  • isEmpty方法,判斷返回的消息是否為空
  • empty方法,獲取一個(gè)空的消息集

到目前為止,可以建單人位,poll方法只是拉取一下消息而已,但就其內(nèi)部邏輯而言并不簡單,它涉及消費(fèi)位移、消費(fèi)者協(xié)調(diào)器、組協(xié)調(diào)器、消費(fèi)者選舉、分區(qū)分配的分發(fā)、再均衡的邏輯、心跳等內(nèi)容。后續(xù)會(huì)詳細(xì)介紹。

位移提交

對于Kafka的分區(qū)而言,它的每條消息都有唯一的offset,用來表示消息在分區(qū)中對應(yīng)的位置。消費(fèi)者使用offset來表示消費(fèi)到分區(qū)中某個(gè)消息所在的位置。offset,顧名思義,偏移量,也可翻譯為位移。在每次調(diào)用poll()方法時(shí),它返回的是還沒有消費(fèi)過的消息集,要做到這一點(diǎn),就需要記錄上一次消費(fèi)過的位移。并且這個(gè)位移必須做持久化保存,而不是單單保存在內(nèi)存中,否則消費(fèi)者重啟之后就無法知道之前的消費(fèi)位移了。

當(dāng)加入新的消費(fèi)者的時(shí),必然會(huì)有再均衡的動(dòng)作,對于同一分區(qū)而言,它可能在再均衡動(dòng)作之后分配給新的消費(fèi)者,如果不持久化保存消費(fèi)位移,那么這個(gè)新的消費(fèi)者也無法知道之前的消費(fèi)位移。消費(fèi)者位移存儲(chǔ)在Kafka內(nèi)部的主題_consumer_offsets中。

這種把消費(fèi)位移存儲(chǔ)起來(持久化)的動(dòng)作稱為“提交”,消費(fèi)者再消費(fèi)完消息之后需要執(zhí)行消費(fèi)位移的提交。
如下圖,假設(shè)當(dāng)前消費(fèi)者已經(jīng)消費(fèi)了x位置的消息,那么我們就可以說消費(fèi)者的消費(fèi)位移為x。


不過,需要明確的是,當(dāng)前消費(fèi)者需要提交的消費(fèi)位移并不是x,而是x+1,對應(yīng)上圖的position,他表示下一條需要拉取的消息的位置。在消費(fèi)者中還有一個(gè)commited offset的概念,它表示已經(jīng)提交過的消費(fèi)位移。

KafkaConsumer類提供了position(TopicPartition)和commited(TopicPartition)兩個(gè)方法來分別獲取上面所說的position和commiited offset的值。
為了論證lastConsumedOffset、commited offset 和position之間的關(guān)系,我們使用上面兩個(gè)方法來做相關(guān)演示。我們向主題中分區(qū)編號(hào)為0的分區(qū)發(fā)送若干消息,之后再創(chuàng)建一個(gè)消費(fèi)者去消費(fèi)其中的消息,等待消費(fèi)完這些消息之后,同步提交消費(fèi)位移。最后觀察上面三者的值。

//定義主題topic-demo,分區(qū)編號(hào)為0
 TopicPartition topicPartition = new TopicPartition("topic-demo",0);
 consumer.assign(Arrays.asList(topicPartition));//訂閱主題topic-demo的分區(qū)0
 long lastConsumedOffset = -1;//當(dāng)前消費(fèi)到的位移
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
 //獲取主題topic-demo的分區(qū)0的消息
 List<ConsumerRecord<String, String>> partitionRecords = records.records(topicPartition);
//獲取最后一條消息的偏移量,該消息是已經(jīng)消費(fèi)的最后一條消息
 lastConsumedOffset = partitionRecords.get(partitionRecords.size()-1).offset();
 consumer.commitSync();//同步提交消費(fèi)位移
 System.out.println("consumed Offset is  "+lastConsumedOffset);
 OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
 System.out.println("commited Offset is "+offsetAndMetadata.offset());
 long position = consumer.position(topicPartition);
 System.out.println("the offset of the netxt record is  "+position);

打印結(jié)果

consumed Offset is  182
commited Offset is 183
the offset of the netxt record is  183

可以看出,消費(fèi)者消費(fèi)到此分區(qū)的最大偏移量為182,對應(yīng)的消費(fèi)位移lastConsumedOffset 也就是182。在消費(fèi)完之后執(zhí)行同步提交,但是最終結(jié)果顯示所提交的位移commited offset 為183,并且下一次所要拉取的消息的起始偏移量position為183,結(jié)論
position = commited offset = lastConsumedOffset + 1
當(dāng)然,position和commited offset的值不會(huì)一直相同,這一點(diǎn)會(huì)在下面的示例中有所體現(xiàn)。

對于位移提交具體時(shí)機(jī)的把握也很有講究,有可能造成重復(fù)消費(fèi)和消息丟失的現(xiàn)象。參考下圖所示,x代表上一次提交的消費(fèi)位移,說明已經(jīng)完成了x-1之前的所有消息的消費(fèi)。x+3表示當(dāng)前正在處理的位置。如果poll拉取到消息之后就進(jìn)行了位移提交,即提交了x+7,那么當(dāng)前消費(fèi)x+3的時(shí)候遇到了異常,在故障恢復(fù)之后, 我們重新拉取到的消息是從x+7開始的。也就是說,x+3到x+6之間的消息并未消費(fèi),如此便發(fā)生了消息丟失的現(xiàn)象。


再考慮另一種情形,位移提交的動(dòng)作是在消費(fèi)完所有拉取到的消息之后才執(zhí)行的,那么當(dāng)消費(fèi)x+3的時(shí)候遇到了異常,在故障恢復(fù)之后,我們重新拉取的消息是從x開始的。也就是說 x到x+2之間的消息又重新消費(fèi)了一遍,故而發(fā)生了重復(fù)消費(fèi)的現(xiàn)象。

而實(shí)際情況可能更加復(fù)雜。在kafka中默認(rèn)的消費(fèi)位移的提交方式是自動(dòng)提交,這個(gè)由消費(fèi)客戶端參數(shù)enable.auto.commit配置,默認(rèn)為true。當(dāng)然這個(gè)默認(rèn)的自動(dòng)提交不是每消費(fèi)一條消息就提交一次,而是定期提交,這個(gè)定期的周期時(shí)間由客戶端參數(shù)auto.commit.interval.ms配置,默認(rèn)值為5秒,此參數(shù)生效的前提是enable.auto.commit為true。

在默認(rèn)情況下,消費(fèi)者客戶端每隔5秒會(huì)將拉取到的每個(gè)分區(qū)中的最大的消息位移進(jìn)行提交。自動(dòng)位移提交的動(dòng)作實(shí)在poll方法的邏輯里完成的,在每次真正向服務(wù)器發(fā)起拉取請求之前會(huì)檢查是否可以進(jìn)行位移提交,如果可以,那么就會(huì)提交上一次輪詢的位移。

在kafka消費(fèi)的編程邏輯中位移是一大難點(diǎn),自動(dòng)提交消費(fèi)位移的方式非常簡便,它免去了復(fù)雜的位移提交邏輯,讓代碼更簡潔。但隨之而來的是重復(fù)消費(fèi)和消費(fèi)丟失的問題。假設(shè)剛提交完一次消費(fèi)位移,然后拉取一批消息進(jìn)行消費(fèi),在下一次自動(dòng)提交消費(fèi)位移之前,消費(fèi)者崩潰了,那么又得從上一次位移提交的地方重新開始消費(fèi)。我們可以通過減少位移提交的時(shí)間間隔來減少重復(fù)消息的窗口大小,但這樣并不能避免重復(fù)消費(fèi)的發(fā)送,而且也會(huì)使位移提交更加頻繁。

自動(dòng)位移提交的方式在正常情況下不會(huì)發(fā)生消息丟失和重復(fù)消費(fèi)的現(xiàn)象,但是在編程的世界里異常不可避免。自動(dòng)提交無法做到精確的位移管理。Kafka提供了手動(dòng)管理位移提交的操作,這樣可以使開發(fā)人員對消費(fèi)位移的管理控制更加靈活。很多時(shí)候并不是說poll拉取到消息就算消費(fèi)完成,而是需要將消息寫入到數(shù)據(jù)庫、寫入本地緩存,或者是更加復(fù)雜的業(yè)務(wù)處理。在這些場景下,所有的業(yè)務(wù)處理完成才能認(rèn)為消息被成功消費(fèi),手動(dòng)的提交方式讓開發(fā)人員根據(jù)程序的邏輯在合適的地方進(jìn)行位移提交。手動(dòng)提交功能的前提是enable.auto.commit配置為false,手動(dòng)提交分為同步提交和異步提交,對應(yīng)于KafkaConsumer中的commitSync和commitAsync兩個(gè)方法。

  • commitSync
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//                按分區(qū)處理消息
  for (TopicPartition tp : records.partitions()){
          for (ConsumerRecord<String, String> record : records.records(tp)){
                System.out.println("partition:"+record.partition()+"----value:"+record.value());
          }
  }
  consumer.commitSync();

先將拉取到的每一條消息進(jìn)行處理,然后對整個(gè)消息集做同步提交。針對上面的示例還可以修改為批量處理+批量提交的方式。

final int minBatchSize = 200;
List<ConsumerRecord> buffer = new Arraylist<>();
whie(true){
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
  for (TopicPartition tp : records.partitions()){
          for (ConsumerRecord<String, String> record : records.records(tp)){
            buffer.add(record);          
          }
    }
if(buffer.size() >= minBatchSize){
    //do local processing with buffer
  consumer.commitSync();
  buffer.clear;
}
}

上面的示例中,將拉取到的消息存入緩存buffer,等到累積到足夠多的時(shí)候,再做相應(yīng)的批量處理,之后再做批量提交。

這兩個(gè)示例都有重復(fù)消費(fèi)的問題,如果在業(yè)務(wù)邏輯處理完之后,并且在同步位移提交之前,程序出現(xiàn)了崩潰。那么恢復(fù)之后,只能從上一次位移提交的地方拉取消息。

commitSync方法會(huì)根據(jù)poll拉取到的最新位移來進(jìn)行提交,即position的位置,只要沒有發(fā)生不可恢復(fù)的錯(cuò)誤,它就會(huì)阻塞消費(fèi)者線程直至位移提交完成。對于不可恢復(fù)的錯(cuò)誤,如CommitFailedException/WakeupException/InterruptException/AuthenticationException/AuthorizationException等,我們可以將其捕獲并做針對性的處理。

commitSync提交位移的頻率和拉取批次消息、處理批次消息的頻率是一致的,如果想尋求更細(xì)粒度、更準(zhǔn)確的提交,那么就需要commitSync另一個(gè)含參的方法,

    public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets)

參數(shù)offsets用來提交指定分區(qū)的位移。無參的commitSync方法只能提交當(dāng)前批次對應(yīng)的position值。如果需要提交一個(gè)中間值,比如業(yè)務(wù)每消費(fèi)一條消息就提交一次位移,那么就可以使用這種方式

  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
  records.forEach(record->{
       System.out.println("topic="+record.topic()+",  partition="+record.partition()+",  offset="+record.offset());
       long offset = record.offset();
       TopicPartition partition = new TopicPartition(record.topic(), record.partition());
       //每消費(fèi)一條消息提交一次位移
       consumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)));
       System.out.println("_______________________________");
    });

在實(shí)際應(yīng)用中,很少有這種每消費(fèi)一條消息,就提交一次消費(fèi)位移的場景。commitSync方法本身是同步進(jìn)行的,會(huì)消耗一定的性能。更多的時(shí)候,是按照分區(qū)的粒度劃分提交位移的界限,

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
// 按分區(qū)處理消息
for (TopicPartition tp : records.partitions()){
   //獲取當(dāng)前分區(qū)的所有消息
  List<ConsumerRecord<String,String>> partitionRecords = records.records(tp);
  for (ConsumerRecord<String, String> record : partitionRecords){
       System.out.println("partition:"+record.partition()+"----value:"+record.value());
  }
  //當(dāng)前分區(qū)最后一條消息的位移
  long lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();
  //按分區(qū)的粒度,進(jìn)行位移提交
  consumer.commitSync(Collections.singletonMap(tp,new OffsetAndMetadata(lastConsumedOffset+1)));
 }

與commitSync相反,異步提交的方式commitAsync在執(zhí)行的時(shí)候,消費(fèi)者線程不會(huì)阻塞,可能在提交消費(fèi)位移的結(jié)果返回之前就開始了新一輪的拉取操作??梢允窍M(fèi)者的性能增強(qiáng)。

void commitAsync();
void commitAsync(OffsetCommitCallback var1);
void commitAsync(Map<TopicPartition, OffsetAndMetadata> var1, OffsetCommitCallback var2);

第一個(gè)無參的方法和第三個(gè)方法中的offsets都很好理解,對照commitSync方法即可。關(guān)鍵是這里第二個(gè)方法和第三個(gè)方法中的OffsetCommitCallback參數(shù),它提供了一個(gè)異步提交的回調(diào)方法,當(dāng)位移提交完成后回調(diào)OffsetCommitCallback里的onComplete方法:

      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
//                按分區(qū)處理消息
                for (TopicPartition tp : records.partitions()){
                    //獲取當(dāng)前分區(qū)的所有消息
                    List<ConsumerRecord<String,String>> partitionRecords = records.records(tp);
                    for (ConsumerRecord<String, String> record : partitionRecords){
                        System.out.println("partition:"+record.partition()+"----value:"+record.value());
                    }
                    //當(dāng)前分區(qū)最后一條消息的位移
                    long lastConsumedOffset = partitionRecords.get(partitionRecords.size() -1).offset();
                    //按分區(qū)的粒度,進(jìn)行位移提交
                    consumer.commitAsync(Collections.singletonMap(tp, new OffsetAndMetadata(lastConsumedOffset + 1)), new OffsetCommitCallback() {
                        @Override
                        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                            if(e == null){
                                System.out.println(map);
                            }else{
                                System.out.println("提交失敗");
                            }
                        }
                    });
                }

控制或關(guān)閉消費(fèi)

KafkaConsumer提供了對消費(fèi)速度進(jìn)行控制的方法,有些場景,需要我們暫停某些分區(qū)的消費(fèi)而先消費(fèi)其他分區(qū),當(dāng)達(dá)到一定條件時(shí)再恢復(fù)這些分區(qū)的消費(fèi)。pause()和resume()方法來分別實(shí)現(xiàn)暫停某些分區(qū)在拉取操作時(shí)返回?cái)?shù)據(jù)給客戶端和恢復(fù)某些分區(qū)向客戶端返回?cái)?shù)據(jù)的操作。

    void pause(Collection<TopicPartition> var1);
    void resume(Collection<TopicPartition> var1);

還有一個(gè)無參的paused方法返回被暫停的分區(qū)集合

Set<TopicPartition> paused();

之前的示例展示的都是使用一個(gè)while循環(huán)來包裹住poll方法及相應(yīng)的消費(fèi)邏輯,如何優(yōu)雅的退出這個(gè)循環(huán)也很有考究。還有一種方式是調(diào)用KafkaConsumer的wakeup方法,調(diào)用該方法可以退出poll的邏輯,并拋出WakeupException異常,我們不需要處理這個(gè)異常,它只是跳出循環(huán)的方式。

跳出循環(huán)以后一定要顯示執(zhí)行關(guān)閉動(dòng)作以釋放運(yùn)行過程中占用的各種系統(tǒng)資源,包括內(nèi)存資源,socket連接等等。KafkaConsumer提供了close方法實(shí)現(xiàn)關(guān)閉

指定位移消費(fèi)

正是有了消費(fèi)位移的持久化,才使消費(fèi)者在關(guān)閉、崩潰或者遇到再均衡的時(shí)候,可以讓接替的消費(fèi)者能夠根據(jù)存儲(chǔ)的消費(fèi)位移繼續(xù)進(jìn)行消費(fèi)。

當(dāng)一個(gè)新的消費(fèi)組建立的時(shí)候,它根本沒有可以查找的消費(fèi)位移。或者消費(fèi)組內(nèi)的一個(gè)新消費(fèi)者訂閱了一個(gè)新的主題,它也沒有可以查找的消費(fèi)位移。

當(dāng)消費(fèi)者查找不到所記錄的消費(fèi)位移的時(shí)候,就會(huì)根據(jù)消費(fèi)者客戶端參數(shù)auto.offset.reset的配置來決定從何處開始進(jìn)行消費(fèi),這個(gè)參數(shù)的默認(rèn)值為latest,表示從分區(qū)末尾開始消費(fèi)。



如圖,按照默認(rèn)的配置,消費(fèi)者會(huì)從8開始消費(fèi),更加確切的說是從8開始拉取消息。如果將auto.offset.reset設(shè)置成earliest,那么消費(fèi)者會(huì)從起始處,也就是0開始消費(fèi)。

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

到目前為止,我們知道消息的拉取是根據(jù)poll方法中的邏輯來處理的,這個(gè)邏輯對于普通開發(fā)人員來說是個(gè)黑盒子,無法精確的掌控其消費(fèi)的起始位置。有些場景我們需要更細(xì)粒度的掌控,可以讓我們從特定的位移處開始拉取消息,seek方法正好提供了這個(gè)功能,讓我們得以追前消費(fèi)或回溯消費(fèi)。

void seek(TopicPartition var1, long offset);

seek方法中的參數(shù)partition表示分區(qū),而offset參數(shù)用來指定從分區(qū)的哪個(gè)位置開始消費(fèi)。seek方法只能重置消費(fèi)者分配到的分區(qū)的消費(fèi)位置,而分區(qū)的分配是在poll方法的調(diào)用過程中實(shí)現(xiàn)的。也就是說在執(zhí)行seek方法之前需要先執(zhí)行一次poll方法,等到分配到分區(qū)之后才可以重置消費(fèi)位置。

 consumer.poll(Duration.ofMillis(10000));
 Set<TopicPartition> assignment = consumer.assignment();
 for(TopicPartition tp : assignment){
      consumer.seek(tp,2);
 }
 while (true){
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                .....
 }

consumer.seek(tp,2)設(shè)置每個(gè)分區(qū)消費(fèi)的位置是2。

如果消費(fèi)組內(nèi)的消費(fèi)者在啟動(dòng)的時(shí)候能夠找到消費(fèi)位移,除非發(fā)生越界,否則auto.offset.reset參數(shù)并不會(huì)奏效,此時(shí)如果想指定從開頭或末尾開始消費(fèi),就需要seek方法的幫助了,如下代碼所示:

  Set<TopicPartition> set = new HashSet<>();
        while (set.size() == 0){
            consumer.poll(Duration.ofMillis(10000));
            set = consumer.assignment();
        }
         Map<TopicPartition, Long> topicPartitionLongMap = consumer.endOffsets(set);
        for (TopicPartition tp: set){
            consumer.seek(tp,topicPartitionLongMap.get(tp));
        }

endOffsets方法用來獲取指定分區(qū)的末尾的消息位置,與endOffsets對應(yīng)的是beginningOffsets,一個(gè)分區(qū)的起始為止起初是0,但并不代表每時(shí)每刻都是0,因?yàn)槿罩厩謇淼膭?dòng)作會(huì)清理舊的數(shù)據(jù),所以分區(qū)的位置會(huì)自然而然的增加。

有時(shí)候我們并不知道特定的消費(fèi)位置,卻知道一個(gè)相關(guān)的時(shí)間點(diǎn),比如我們想要消費(fèi)昨天8點(diǎn)之后的消息,KafkaConsumer提供了一個(gè)offsetForTimes方法:

    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch);
    Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> var1, Duration timestampsToSearch);

timestampsToSearch是一個(gè)Map類型,key為待查詢的分區(qū),而value為待查詢的時(shí)間戳,該方法會(huì)返回時(shí)間戳大于等于待查詢時(shí)間的第一條消息對應(yīng)的位置和時(shí)間戳,對應(yīng)于OffsetAndTimestamp中的offset和timestamp字段。

 Set<TopicPartition> set = new HashSet<>();
        Map<TopicPartition,Long> map = new HashMap<>();
        while (set.size() == 0){
            consumer.poll(Duration.ofMillis(10000));
            set = consumer.assignment();
        }
        for (TopicPartition tp: set){
            map.put(tp,System.currentTimeMillis()-1*24*3600*1000);
        }
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(map);

        for (TopicPartition tp: set){
             OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp);
             if (offsetAndTimestamp!=null)
                 consumer.seek(tp,offsetAndTimestamp.offset());

        }

消費(fèi)者攔截器

消費(fèi)者攔截器主要在消費(fèi)到消息或在提交消費(fèi)位移的時(shí)候進(jìn)行一些定制化的工作。
消費(fèi)者攔截器需要實(shí)現(xiàn)ConsumerInterceptor接口,該接口有三個(gè)方法:

public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable {
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
    void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
    void close();
}

KafkaConsumer會(huì)在poll方法返回之前調(diào)用攔截器的onConsume方法來對消息進(jìn)行相應(yīng)的定制化操作,比如修改返回的內(nèi)容、按照某種規(guī)則過濾消息。如果onConsume方法拋出異常,那么會(huì)被捕獲并記錄到日志,但是異常不會(huì)在向上傳遞。

KafkaConsumer會(huì)在提交完消費(fèi)位移之后調(diào)用調(diào)用攔截器的onCommit方法,可以使用這個(gè)方法來記錄跟蹤所提交的位移信息,比如當(dāng)消費(fèi)者調(diào)用commitSync的無參方法時(shí),我們不知道提交的具體細(xì)節(jié),可以使用攔截器onCommit方法做到這一點(diǎn)。

在某些場景中,會(huì)對消息設(shè)置一個(gè)有效期的屬性,如果某條消息在既定的時(shí)間窗口內(nèi)無法到達(dá),那么就被視為無效,它也不需要再被繼續(xù)處理了。下面使用消費(fèi)者攔截器實(shí)現(xiàn)一個(gè)簡單的TTL功能


public class ConsumerInterceptorTTL implements ConsumerInterceptor<String,String> {

    private  static  final  long EXPIRE_INTERVAL = 10 * 1000;
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        long now = System.currentTimeMillis();
        //構(gòu)建新的分區(qū)消息映射表
        Map<TopicPartition, List<ConsumerRecord<String,String>>> newRecords = new HashMap<>();
        //遍歷分區(qū)
        for (TopicPartition tp : consumerRecords.partitions()){
            //獲取分區(qū)內(nèi)的消息
            List<ConsumerRecord<String,String>> tpRecords = consumerRecords.records(tp);
            List<ConsumerRecord<String,String>> newTpRecords = new ArrayList<>();
            //遍歷消息,做判斷
            for (ConsumerRecord<String, String> tpRecord : tpRecords) {
                //拿到10秒以內(nèi)的消息
                if (now - tpRecord.timestamp() < EXPIRE_INTERVAL){
                    newTpRecords.add(tpRecord);
                }
            }
            if (!newTpRecords.isEmpty()){
                newRecords.put(tp, newTpRecords);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach((tp,offset)->{
            System.out.println(tp+":"+offset.offset());
        });
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> map) {
    }
}

我們使用消息的timestamp字段來判定是否過期,如果消息的時(shí)間戳與當(dāng)前的時(shí)間戳相差超過10秒則判定為過期,那么這條消息也就被過濾掉而不返回給消費(fèi)者客戶端。

自定義攔截器實(shí)現(xiàn)后,需要在KafkaConsumer中配置該攔截器,通過參數(shù)interceptor.classes參數(shù)實(shí)現(xiàn):

properties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorTTL.class);

關(guān)于KafkaConsumer的多線程實(shí)現(xiàn)

KafkaProducer是線程安全的,然而KafkaConsumer是非線程安全的。KafkaConsumer當(dāng)中定義了一個(gè)acquire方法,用來檢測當(dāng)前是否只有一個(gè)線程在操作,若有其他線程正在操作則會(huì)拋出異常。KafkaConsumer中的每個(gè)公用方法在執(zhí)行所要執(zhí)行的動(dòng)作之前都會(huì)調(diào)用這個(gè)方法,只有wakeup方法是個(gè)例外。

   private void acquire() {
        long threadId = Thread.currentThread().getId();
        if (threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        } else {
            this.refcount.incrementAndGet();
        }
    }

KafkaConsumer非線程安全并不意味著我們在消費(fèi)消息的時(shí)候只能以單線程的方式運(yùn)行。如果生產(chǎn)者發(fā)送消息的速度大于消費(fèi)者處理消息的速度,那么就會(huì)有越來越多的消息得不到及時(shí)的處理,造成一定的時(shí)延。除此之外,kafka中存在消息保留機(jī)制,有些消息有可能在被消費(fèi)之前就被清理了,從而造成消息的丟失。我們可以通過多線程的方式實(shí)現(xiàn)消息消費(fèi),多線程的目的就是提高整體的消費(fèi)能力。多線程的實(shí)現(xiàn)方式有多種,第一種也是最常見的方式:線程封閉,即為每個(gè)線程實(shí)例化一個(gè)KafkaConsumer對象。

一個(gè)線程對一個(gè)KafkaConsumer實(shí)例,我們可以稱為消費(fèi)線程。一個(gè)消費(fèi)線程可以消費(fèi)一個(gè)或多個(gè)分區(qū)中的消息,所有的消費(fèi)線程都隸屬于同一個(gè)消費(fèi)組。這種方式實(shí)現(xiàn)的并發(fā)度受限于分區(qū)的實(shí)際個(gè)數(shù),文章開頭講過,當(dāng)消費(fèi)者個(gè)數(shù)大于分區(qū)個(gè)數(shù)時(shí),就會(huì)有部分消費(fèi)線程一直處于空閑的狀態(tài)。

第二種方式是,多個(gè)消費(fèi)線程同時(shí)消費(fèi)同一個(gè)分區(qū),這個(gè)通過assign、seek等方法實(shí)現(xiàn),這樣可以打破原有的消費(fèi)線程的個(gè)數(shù)不能超過分區(qū)數(shù)的限制,不過這種方式對于位移提交和順序控制的處理就會(huì)變得非常復(fù)雜,實(shí)際應(yīng)用的很少。一般而言,分區(qū)時(shí)消費(fèi)線程的最小劃分單位。我們通過實(shí)際編碼實(shí)現(xiàn)第一種:


public class MultiConsumerThreadDemo {
    public static final String brokerList = "192.168.3.8:9092";
    public static final String topic = "topic";
    public static final String group = "group-id42";
    public static final String client = "client-id2";
    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消費(fèi)位移自動(dòng)提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        for (int i=0;i<consumerThreadNum;i++){
            new KafkaConsumerThread(props,topic).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{

        private  KafkaConsumer<String,String> kafkaConsumer;

        public  KafkaConsumerThread(Properties props,String topic){
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }

        @Override
        public void run() {

            try {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    records.forEach(record->{
                        System.out.println("topic="+record.topic()+",  partition="+record.partition()+",  offset="+record.offset());
                        //消息處理
                    });

                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.kafkaConsumer.close();
            }

        }
    }


}

內(nèi)部類KafkaConsumerThread代表消費(fèi)線程,內(nèi)部包裹著一 個(gè)獨(dú)立的KafkaConsumer實(shí)例。通過main方法來啟動(dòng)多個(gè)消費(fèi)線程,一般來講一個(gè)主題的分區(qū)數(shù)在開發(fā)時(shí)就是確定的,可以將consumerThreadNum設(shè)置成不大于分區(qū)數(shù)的值,如果不知道主題的分區(qū)數(shù),也可以通過之前講的partitionsFor方法來動(dòng)態(tài)獲取。

這種方式的優(yōu)點(diǎn)是每個(gè)線程可以按順序消費(fèi)各個(gè)分區(qū)中的消息。缺點(diǎn)是,每個(gè)消費(fèi)線程都要維護(hù)一個(gè)獨(dú)立的TCP鏈接,如果分區(qū)數(shù)和consumerThreadNum都很大,那么會(huì)造成不小的系統(tǒng)開銷。

如果消費(fèi)者對消息處理的速度很快,那么poll拉取的頻次也會(huì)更高,進(jìn)而整體消費(fèi)性能也會(huì)提升。相反,如果客戶端對消息的處理速度很慢,比如進(jìn)行一個(gè)事務(wù)性操作,或者等待一個(gè)RPC的同步響應(yīng),那么poll的拉取頻次也會(huì)下降,進(jìn)而造成整體的性能下降。一般而言,poll拉取的速度是相當(dāng)快的,而整體消費(fèi)的瓶頸也正是消息處理這一塊,我們可以將處理消息部分改成多線程的實(shí)現(xiàn)方式,如下圖所示



代碼如下:

public class MultiConsumerThreadDemo1 {
    public static final String brokerList = "192.168.3.8:9092";
    public static final String topic = "topic";
    public static final String group = "group-id42";
    public static final String client = "client-id2";
    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消費(fèi)位移自動(dòng)提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }

    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        for (int i=0;i<consumerThreadNum;i++){
            new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors()).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{

        private  KafkaConsumer<String,String> kafkaConsumer;
        private  ExecutorService executorService;
        private int threadNum;

        public  KafkaConsumerThread(Properties props, String topic, int processorNum){
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
            this.threadNum = processorNum;
            executorService = new ThreadPoolExecutor(
                    threadNum,
                    threadNum,
                    0L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {

            try {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                        //消息處理
                        executorService.submit(new RecordsHandler(records));
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.kafkaConsumer.close();
            }
        }
    }

    public static class RecordsHandler implements Runnable {
        private final  ConsumerRecords<String,String > records;
        public RecordsHandler(ConsumerRecords<String, String> records) {
            this.records = records;
        }
        @Override
        public void run() {
            //真正處理records的地方
        }
    }
}

RecordHandler類是用來處理消息的,而KafkaConsumerThread類對應(yīng)的是一個(gè)消費(fèi)線程,里面通過線程池的方式來調(diào)用RecordHandler處理一批批消息。注意KafkaConsumerThread類中ThreadPollExecutor里的最后一個(gè)參數(shù)設(shè)置的是CallerRunsPolicy,這樣可以防止線程池的總體消費(fèi)能力跟不上poll拉取的能力從而導(dǎo)致異?,F(xiàn)象的發(fā)生。但是這種方式對消息的順序處理能力就比較困難了。注意,代碼中的參數(shù)配置properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);,旨在說明在具體實(shí)現(xiàn)的時(shí)候并沒有考慮位移提交的情況。對于第一種實(shí)現(xiàn)方式而言,如果要做具體的位移提交,直接在KafkaConsumerThread中的run方法里實(shí)現(xiàn)即可。我們引入一個(gè)共享變量offsets來參與提交


每一個(gè)處理消息的RecordHandler類在處理完消息之后都將對應(yīng)的消費(fèi)位移保存到共享變量offsets中,KafkaConsumerThread在每一次poll方法之后都讀取offsets中的內(nèi)容并對其進(jìn)行位移提交。注意在實(shí)現(xiàn)過程中需要對其進(jìn)行加鎖操作,防止出現(xiàn)并發(fā)問題。并且在寫入offsets的時(shí)候需要注意位移覆蓋的問題

public class MultiConsumerThreadDemo1 {
    public static final String brokerList = "192.168.3.8:9092";
    public static final String topic = "topic";
    public static final String group = "group-id42";
    public static final String client = "client-id2";
    public static Properties initConfig(){

        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //消費(fèi)位移自動(dòng)提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,client);
        return  properties;
    }

    static Map<TopicPartition, OffsetAndMetadata> offsets =new HashMap<>();

    public static void main(String[] args) {
        Properties props = initConfig();
        int consumerThreadNum = 4;
        for (int i=0;i<consumerThreadNum;i++){
            new KafkaConsumerThread(props,topic,Runtime.getRuntime().availableProcessors()).start();
        }
    }
    public static class KafkaConsumerThread extends Thread{

        private  KafkaConsumer<String,String> kafkaConsumer;
        private  ExecutorService executorService;
        private int threadNum;

        public  KafkaConsumerThread(Properties props, String topic, int processorNum){
            this.kafkaConsumer = new KafkaConsumer<String, String>(props);
            this.kafkaConsumer.subscribe(Arrays.asList(topic));
            this.threadNum = processorNum;
            executorService = new ThreadPoolExecutor(
                    threadNum,
                    threadNum,
                    0L,
                    TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<>(1000),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {

            try {
                while (true){
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                        //消息處理
                        executorService.submit(new RecordsHandler(records));
                        //位移提交工作
                        synchronized (offsets){
                            if(!offsets.isEmpty()){
                                kafkaConsumer.commitSync(offsets);
                                offsets.clear();
                            }
                        }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                this.kafkaConsumer.close();
            }
        }
    }

    public static class RecordsHandler implements Runnable {
        private final  ConsumerRecords<String,String > records;
        public RecordsHandler(ConsumerRecords<String, String> records) {
            this.records = records;
        }
        @Override
        public void run() {
            //真正處理records的地方

            //處理完后進(jìn)行位移操作
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> tpRecords = this.records.records(partition);
                long lastConsumedOffset = tpRecords.get(tpRecords.size()-1).offset();
                synchronized (offsets){
                    if (!offsets.containsKey(partition)){
                        offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
                    }else{
                        long position = offsets.get(partition).offset();
                        if (position<lastConsumedOffset+1){
                            offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset+1));
                        }
                    }
                }
                
            }
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容