(4)kafka的應(yīng)用

下面展示的例子是kafka的客戶(hù)端的使用,包含了發(fā)送端的同步發(fā)送消息和異步發(fā)送消息的使用,以及接收端的消費(fèi)消息的使用,以及自定分區(qū)的使用

1.環(huán)境的搭建

需要配置kafka的集群環(huán)境: 可以參考http://www.itdecent.cn/p/d39ade36f606
需要依賴(lài)kafka的客戶(hù)端的jar,maven的依賴(lài)如下:

   <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.0.0</version>
    </dependency>

2.kafka的發(fā)送端的同步發(fā)送和異步發(fā)送

這里可以參考kafkaProducer的api : http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

在代碼執(zhí)行之前,我默認(rèn)在kafka自己創(chuàng)建了三個(gè)分區(qū)的mytopic,并且副本為1

 bin/kafka-topics.sh  --zookeeper 192.168.44.129:2181 --partitions 3 --replication-factor 1 --create --topic my-topic

代碼入下:

/**
 * @Project: kafka
 * @description:  kafka的producer的同步發(fā)送和異步發(fā)送
 * @author: sunkang
 * @create: 2018-12-16 21:24
 * @ModificationHistory who      when       What
 **/
public class KafkaProducerDemo extends Thread {
    private final KafkaProducer<Integer,String> producer;
    private  String topic;
    //是否為異步發(fā)送
    private boolean async;

    public KafkaProducerDemo(String topic,boolean async){
        Properties properties = new Properties();
        //bootstrap.servers  kafka的集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
        //client.id 客戶(hù)端id
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
        //acks =-1,表示集群中的所有成員都需要確認(rèn)
        properties.put(ProducerConfig.ACKS_CONFIG,"-1");
        //發(fā)送到同一分區(qū),批量發(fā)送數(shù)據(jù)包的大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
        //兩次發(fā)送的時(shí)間間隔內(nèi),把所有的request進(jìn)行聚合在發(fā)送
        properties.put(ProducerConfig.LINGER_MS_CONFIG,10);
        //發(fā)送的消息的key的序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.IntegerSerializer");
        //發(fā)送消息的value的序列化
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        producer  = new KafkaProducer<Integer, String>(properties);
        this.topic = topic;
        this.async = async;
    }

    public void run(){
        int num  = 0;
        while (true){
            String message = "message_"+num;
            System.out.println("begin send message"+ message);
            //異步發(fā)送
            if(async){
                producer.send(new ProducerRecord<Integer, String>(topic, num, message), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(e != null){
                            e.printStackTrace();
                        }else{
                            System.out.println("async-offset:"+ recordMetadata.offset()+"partition:"+recordMetadata.partition());
                        }
                    }
                });
            }else{   //同步發(fā)送
                Future<RecordMetadata>  recordMetadataFuture = producer.send(new ProducerRecord<Integer, String>(topic,num,message));
                try {
                    RecordMetadata  recordMetadata =  recordMetadataFuture.get();
                    System.out.println("sync-offset:"+ recordMetadata.offset()+"partition:"+ recordMetadata.partition());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            num++;
            try {//間隔一秒之后在發(fā)送
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        //異步發(fā)送
        new KafkaProducerDemo("my-topic",true).start();
    }
 }

輸出如下:默認(rèn)是有三個(gè)分區(qū)的,可以看到消息存儲(chǔ)的分區(qū)都不一樣,實(shí)現(xiàn)了消息的分片的作用

begin send messagemessage_0
async-offset:56->partition:1
begin send messagemessage_1
async-offset:59->partition:0
begin send messagemessage_2
async-offset:57->partition:2

3.kafka的消費(fèi)端的消費(fèi)

消費(fèi)端的例子可以參考官網(wǎng)KafkaConsumer的api: http://kafka.apache.org/21/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

  • 消費(fèi)端的自動(dòng)偏移量提交和手動(dòng)偏移提交
/**
 * @Project: kafka
 * @description:消費(fèi)端的自動(dòng)偏移量提交和手動(dòng)偏移提交
 * @author: sunkang
 * @create: 2018-12-16 21:34
 * @ModificationHistory who      when       What
 **/
public class KafkaConsumerDemo  extends Thread{

    private final KafkaConsumer kafkaConsumer;

    private  final   boolean autoOffesetCommit;


    public KafkaConsumerDemo(String topic, boolean autoOffesetCommitt) {
        Properties properties=new Properties();
        //服務(wù)地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094");
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
        this.autoOffesetCommit = autoOffesetCommitt;
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,  autoOffesetCommit == true? "true":"false");
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.IntegerDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        kafkaConsumer=new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Collections.singletonList(topic));
    }

    public void run(){
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        //一次性批量確認(rèn)
        final int minBatchSize = 10;

        while (true){
            if(this.autoOffesetCommit){//自動(dòng)偏移提交
                //從broker拉取消息
                ConsumerRecords<Integer,String> consumerRecords= kafkaConsumer.poll(100);
                for(ConsumerRecord record : consumerRecords ){
                    System.out.println("message  receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition());                }
            }else{//需要手動(dòng)偏移量控制
                ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("message  receive:"+ record.value()+"->offset:"+record.offset()+"->partition:"+record.partition());
                    buffer.add(record);
                }
                if (buffer.size() >= minBatchSize) {
                  //當(dāng)接收消息需要處理的進(jìn)行邏輯處理的時(shí)候,需要手動(dòng)偏移量控制,比如當(dāng)消息插入數(shù)據(jù)庫(kù)完全成功的時(shí)候, 才認(rèn)為消息完全消費(fèi)了
                 //   insertIntoDb(buffer);
                    kafkaConsumer.commitSync();
                    buffer.clear();
                }
            }

        }
    }
    public static void main(String[] args) {
        new KafkaConsumerDemo("my-topic",true).start();
    }

}

4.kafka自定義分區(qū)

自定義分區(qū)策略是根據(jù)消息的key來(lái)映射具體的分區(qū),需要實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口

/**
 * 自定義分區(qū)策略
 */
public class MyPartition implements Partitioner {
    private Random random=new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //獲得分區(qū)列表
        List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(topic);
        int partitionNum=0;
        if(key==null){
            partitionNum=random.nextInt(partitionInfos.size()); //隨機(jī)分區(qū)
        }else{
            partitionNum=Math.abs((key.hashCode())%partitionInfos.size());
        }
        System.out.println("key ->"+key+"->value->"+value+"->"+partitionNum);
        return partitionNum;  //指定發(fā)送的分區(qū)值
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
}

如果要指定自己實(shí)現(xiàn)的自定義分區(qū)策略,需要增加partitioner.class的配置屬性

Properties properties=new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.kafka.partion.MyPartition");
producer=new KafkaProducer<Integer, String>(properties);

5.配置信息分析

發(fā)送端的可選配置信息分析
  • acks

acks 配置表示 producer 發(fā)送消息到 broker 上以后的確認(rèn)值。有三個(gè)可選項(xiàng) :
0:表示 producer 不需要等待 broker 的消息確認(rèn)。這個(gè)選項(xiàng)時(shí)延最小但同時(shí)風(fēng)險(xiǎn)最大(因?yàn)楫?dāng) server 宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)。
1:表示 producer 只需要獲得 kafka 集群中的 leader 節(jié)點(diǎn)確認(rèn)即可,這個(gè)選擇時(shí)延較小同時(shí)確保了 leader 節(jié)點(diǎn)確認(rèn)接收成功
all(-1):需要 ISR 中所有的 Replica 給予接收確認(rèn),速度最慢,安全性最

但是由于 ISR 可能會(huì)縮小到僅包含一個(gè) Replica,所以設(shè)置參數(shù)為 all 并不能一定避免數(shù)據(jù)丟失

  • batch.size

生產(chǎn)者發(fā)送多個(gè)消息到 broker 上的同一個(gè)分區(qū)時(shí),為了減少網(wǎng)絡(luò)請(qǐng)求帶來(lái)的性能開(kāi)銷(xiāo),通過(guò)批量的方式來(lái)提交消息,可以通過(guò)這個(gè)參數(shù)來(lái)控制批量提交的字節(jié)數(shù)大小,默認(rèn)大小是 16384byte,也就是 16kb,意味著當(dāng)一批消息大小達(dá)到指定的 batch.size 的時(shí)候會(huì)統(tǒng)一發(fā)送

  • linger.ms

Producer 默認(rèn)會(huì)把兩次發(fā)送時(shí)間間隔內(nèi)收集到的所有 Requests 進(jìn)行一次聚合然后再發(fā)送,以此提高吞吐量,而 linger.ms 就是為每次發(fā)送到 broker 的請(qǐng)求,增加一些 delay,以此來(lái)聚合更多的Message 請(qǐng)求。 這個(gè)有點(diǎn)想 TCP 里面的Nagle 算法,在 TCP 協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送,采用了Nagle 算法,也就是基于小包的等停協(xié)議

batch.size 和 linger.ms 這兩個(gè)參數(shù)是 kafka 性能優(yōu)化的關(guān)鍵參數(shù),很多同學(xué)會(huì)發(fā)現(xiàn) batch.size 和 linger.ms 這兩者的作用是一樣的,如果兩個(gè)都配置了,那么怎么工作的呢?實(shí)際上,當(dāng)二者都配置的時(shí)候,只要滿(mǎn)足其中一個(gè)要求,就會(huì)發(fā)送請(qǐng)求到 broker 上

  • max.request.size

設(shè)置請(qǐng)求的數(shù)據(jù)的最大字節(jié)數(shù),為了防止發(fā)生較大的數(shù)據(jù)包影響到吞吐量,默認(rèn)值為 1MB

消費(fèi)端的可選配置分析
  • group.id

consumer group 是 kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個(gè)公共的 ID,即 group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)訂閱主題(subscribed topics)的所有分區(qū)(partition)。當(dāng)然,每個(gè)分區(qū)只能由同一個(gè)消費(fèi)

如下圖所示,分別有三個(gè)消費(fèi)者,屬于兩個(gè)不同的 group,那么對(duì)于 firstTopic 這個(gè) topic 來(lái)說(shuō),這兩個(gè)組的消費(fèi)者都能同時(shí)消費(fèi)這個(gè) topic 中的消息,對(duì)于此事的架構(gòu)來(lái)說(shuō),這個(gè) firstTopic 就類(lèi)似于 ActiveMQ 中的 topic 概念。

如最下圖所示,如果 3 個(gè)消費(fèi)者都屬于同一個(gè)group,那么此事 firstTopic 就是一個(gè) Queue 的概念

  • enable.auto.commit

消費(fèi)者消費(fèi)消息以后自動(dòng)提交,只有當(dāng)消息提交以后,該消息才不會(huì)被再次接收到,還可以配合 auto.commit.interval.ms 控制自動(dòng)提交的頻率。

當(dāng)然,我們也可以通過(guò) consumer.commitSync()的方式實(shí)現(xiàn)手動(dòng)提交

  • auto.offset.reset

這個(gè)參數(shù)是針對(duì)新的 groupid 中的消費(fèi)者而言的,當(dāng)有新 groupid 的消費(fèi)者來(lái)消費(fèi)指定的 topic 時(shí),對(duì)于該參數(shù)的配置,會(huì)有不同的語(yǔ)義

auto.offset.reset=latest 情況下,新的消費(fèi)者將會(huì)從其他消費(fèi)者最后消費(fèi)的offset 處開(kāi)始消費(fèi) Topic 下的消息
auto.offset.reset= earliest 情況下,新的消費(fèi)者會(huì)從該 topic 最早的消息開(kāi)始消費(fèi)
auto.offset.reset=none 情況下,新的消費(fèi)者加入以后,由于之前不存在offset,則會(huì)直接拋出異常。

  • max.poll.records

此設(shè)置限制每次調(diào)用 poll 返回的消息數(shù),這樣可以更容易的預(yù)測(cè)每次 poll 間隔要處理的最大值。通過(guò)調(diào)整此值,可以減少 poll 間隔

6.與spring-kafka集成

  • 依賴(lài)的maven的配置如下
 <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.1.0</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>2.1.7.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-context</artifactId>
      <version>5.0.4.RELEASE</version>
    </dependency>
  • 發(fā)送端的producerKafka.xml的spring配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="producerProperties" class="java.util.HashMap">
      <constructor-arg>
          <map>
                <entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
                <entry key="client.id" value="sping-kafka-producer"/>
                <entry key="acks" value="-1"/>
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
          </map>
      </constructor-arg>

    </bean>

    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg ref="producerProperties"/>
    </bean>

    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
    </bean>

</beans>
  • 消費(fèi)端的配置consumerKafka.xml的配置如下
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">

    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="192.168.44.129:9092,192.168.44.129:9093,192.168.44.129:9094"/>
                <entry key="group.id" value="registryConsumer"/>
                <entry key="enable.auto.commit" value="true"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>

    </bean>

    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg ref="consumerProperties"/>
    </bean>


    <bean id="registryListener" class="com.kafka.spring.RegistryListener"/>

    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg name="topics" value="my-topic"/>
        <property name="messageListener" ref="registryListener"/>

    </bean>

    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
    </bean>
</beans>
  • 發(fā)送端的啟動(dòng)代碼
/**
 * @Project: kafka
 * @description:  通過(guò)KafkaTemplate進(jìn)行發(fā)送消息
 * @author: sunkang
 * @create: 2018-12-23 18:52
 * @ModificationHistory who      when       What
 **/
public class SpringKafkaProducerDemo {
    public static void main(String[] args) {

        ApplicationContext context  = new ClassPathXmlApplicationContext("classpath:producerKafka.xml");

        KafkaTemplate  kafkaTemplate =  context.getBean("kafkaTemplate", KafkaTemplate.class);

        kafkaTemplate.send("my-topic",1,"message_1");
        kafkaTemplate.send("my-topic",2,"message_2");
    }
}
  • 消費(fèi)端的啟動(dòng)代碼

/**
 * @Project: kafka
 * @description:  消費(fèi)端的啟動(dòng)代碼
 * @author: sunkang
 * @create: 2018-12-23 18:51
 * @ModificationHistory who      when       What
 **/
public class SpringKafkaConsumerDemo {
    public static void main(String[] args) {
        ApplicationContext context  = new ClassPathXmlApplicationContext("consumerKafka.xml");
    }
}
//設(shè)置消息監(jiān)聽(tīng)類(lèi)
class RegistryListener implements MessageListener<Integer,String> {

    @Override
    public void onMessage(ConsumerRecord<Integer, String> integerStringConsumerRecord) {
        System.out.println("收到了消息");
        System.out.println("key:"+integerStringConsumerRecord.key()+"->value:"+integerStringConsumerRecord.value());
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容