kafka的實(shí)現(xiàn)原理

什么是Kafka

Kafka是一款分布式消息發(fā)布和訂閱系統(tǒng),它的特點(diǎn)是高性能、高吞吐量。

最早設(shè)計(jì)的目的是作為L(zhǎng)inkedIn的活動(dòng)流和運(yùn)營(yíng)數(shù)據(jù)的處理管道。這些數(shù)據(jù)主要是用來(lái)對(duì)用戶做用戶畫像分析以及服務(wù)器性能數(shù)據(jù)的一些監(jiān)控。

所以kafka一開(kāi)始設(shè)計(jì)的目標(biāo)就是作為一個(gè)分布式、高吞吐量的消息系統(tǒng),所以適合運(yùn)用在大數(shù)據(jù)傳輸場(chǎng)景。

Kafka的應(yīng)用場(chǎng)景

由于kafka具有更好的吞吐量、內(nèi)置分區(qū)、冗余及容錯(cuò)性的優(yōu)點(diǎn)(kafka每秒可以處理幾十萬(wàn)消息),讓kafka成為了一個(gè)很好的大規(guī)模消息處理應(yīng)用的解決方案。所以在企業(yè)級(jí)應(yīng)用長(zhǎng),主要會(huì)應(yīng)用于如下幾個(gè)方面

  • 行為跟蹤:kafka可以用于跟蹤用戶瀏覽頁(yè)面、搜索及其他行為。通過(guò)發(fā)布-訂閱模式實(shí)時(shí)記錄到對(duì)應(yīng)的topic中,通過(guò)后端大數(shù)據(jù)平臺(tái)接入處理分析,并做更進(jìn)一步的實(shí)時(shí)處理和監(jiān)控

  • 日志收集:日志收集方面,有很多比較優(yōu)秀的產(chǎn)品,比如Apache Flume,很多公司使用kafka代理日志聚合。日志聚合表示從服務(wù)器上收集日志文件,然后放到一個(gè)集中的平臺(tái)(文件服務(wù)器)進(jìn)行處理。在實(shí)際應(yīng)用開(kāi)發(fā)中,我們應(yīng)用程序的log都會(huì)輸出到本地的磁盤上,排查問(wèn)題的話通過(guò)linux命令來(lái)搞定,如果應(yīng)用程序組成了負(fù)載均衡集群,并且集群的機(jī)器有幾十臺(tái)以上,那么想通過(guò)日志快速定位到問(wèn)題,就是很麻煩的事情了。所以一般都會(huì)做一個(gè)日志統(tǒng)一收集平臺(tái)管理log日志用來(lái)快速查詢重要應(yīng)用的問(wèn)題。所以很多公司的套路都是把應(yīng)用日志集中到kafka上,然后分別導(dǎo)入到es和hdfs上,用來(lái)做實(shí)時(shí)檢索分析和離線統(tǒng)計(jì)數(shù)據(jù)備份等。而另一方面,kafka本身又提供了很好的api來(lái)集成日志并且做日志收集。


    image.png

Kafka的架構(gòu)

一個(gè)典型的kafka集群包含若干Producer(可以是應(yīng)用節(jié)點(diǎn)產(chǎn)生的消息,也可以是通過(guò)Flume收集日志產(chǎn)生的事件),若干個(gè)Broker(kafka支持水平擴(kuò)展)、若干個(gè)Consumer Group,以及一個(gè)zookeeper集群。kafka通過(guò)zookeeper管理集群配置及服務(wù)協(xié)同。Producer使用push模式將消息發(fā)布到broker,consumer通過(guò)監(jiān)聽(tīng)使用pull模式從broker訂閱并消費(fèi)消息。

多個(gè)broker協(xié)同工作,producer和consumer部署在各個(gè)業(yè)務(wù)邏輯中。三者通過(guò)zookeeper管理協(xié)調(diào)請(qǐng)求和轉(zhuǎn)發(fā)。這樣就組成了一個(gè)高性能的分布式消息發(fā)布和訂閱系統(tǒng)。

圖上有一個(gè)細(xì)節(jié)是和其他mq中間件不同的點(diǎn),producer 發(fā)送消息到broker的過(guò)程是push,而consumer從broker消費(fèi)消息的過(guò)程是pull,主動(dòng)去拉數(shù)據(jù)。而不是broker把數(shù)據(jù)主動(dòng)發(fā)送給consumer。


image.png

名詞解釋

1)Broker
Kafka集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為broker。broker端不維護(hù)數(shù)據(jù)的消費(fèi)狀態(tài),提升了性能。直接使用磁盤進(jìn)行存儲(chǔ),線性讀寫,速度快:避免了數(shù)據(jù)在JVM內(nèi)存和系統(tǒng)內(nèi)存之間的復(fù)制,減少耗性能的創(chuàng)建對(duì)象和垃圾回收。
2)Producer
負(fù)責(zé)發(fā)布消息到Kafka broker
3)Consumer
消息消費(fèi)者,向Kafka broker讀取消息的客戶端,consumer從broker拉取(pull)數(shù)據(jù)并進(jìn)行處理。
4)Topic
每條發(fā)布到Kafka集群的消息都有一個(gè)類別,這個(gè)類別被稱為Topic。(物理上不同Topic的消息分開(kāi)存儲(chǔ),邏輯上一個(gè)Topic的消息雖然保存于一個(gè)或多個(gè)broker上但用戶只需指定消息的Topic即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
5)Partition
Parition是物理上的概念,每個(gè)Topic包含一個(gè)或多個(gè)Partition.
6)Consumer Group
每個(gè)Consumer屬于一個(gè)特定的Consumer Group(可為每個(gè)Consumer指定group name,若不指定group name則屬于默認(rèn)的group)
7)Topic & Partition
Topic在邏輯上可以被認(rèn)為是一個(gè)queue,每條消費(fèi)都必須指定它的Topic,可以簡(jiǎn)單理解為必須指明把這條消息放進(jìn)哪個(gè)queue里。為了使得Kafka的吞吐率可以線性提高,物理上把Topic分成一個(gè)或多個(gè)Partition,每個(gè)Partition在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè)Partition的所有消息和索引文件。若創(chuàng)建topic1和topic2兩個(gè)topic,且分別有13個(gè)和19個(gè)分區(qū),則整個(gè)集群上會(huì)相應(yīng)會(huì)生成共32個(gè)文件夾(本文所用集群共8個(gè)節(jié)點(diǎn),此處topic1和topic2 replication-factor均為1)。

Java中使用kafka進(jìn)行通信

依賴

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.0.0</version>
</dependency>

發(fā)送端代碼

public class Producer extends Thread{
      private final KafkaProducer<Integer,String> producer;
      private final String topic;
      public Producer(String topic) {
            Properties properties=new Properties();
 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
            properties.put(ProducerConfig.CLIENT_ID_CONFIG,"practice-producer");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        IntegerSerializer.class.getName());
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class.getName());
            producer=new KafkaProducer<Integer, String>(properties);
            this.topic = topic;
         
    }
    @Override
      public void run() {
            int num=0;
            while(num<50){
                  String msg="pratice test message:"+num;
                  try {
                        producer.send(new ProducerRecord<Integer, String>
                (topic,msg)).get();
                        TimeUnit.SECONDS.sleep(2);
                        num++;
                     
            }
            catch (InterruptedException e) {
                        e.printStackTrace();
                     
            }
            catch (ExecutionException e) {
                        e.printStackTrace();
                     
            }              
        }        
    }
      public static void main(String[] args) {
            new Producer("test").start();        
    }
}

消費(fèi)端代碼

public class Consumer extends Thread{
      private final KafkaConsumer<Integer,String> consumer;
      private final String topic;
      public Consumer(String topic){
            Properties properties=new Properties();
          
         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192
.168.13.103:9092,192.168.13.104:9092");
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "practice-consumer");
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //設(shè)置offset自動(dòng)提交
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //自動(dòng)提交間隔時(shí)間
            properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.IntegerDeserializer");
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        "org.apache.kafka.common.serialization.StringDeserializer");
            properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        //對(duì)于當(dāng)前groupid來(lái)說(shuō),消息的offset從最早的消息開(kāi)始消費(fèi)
            consumer= new KafkaConsumer<>(properties);
            this.topic=topic;
         
    }
    @Override
      public void run() {
            while(true) {
                  consumer.subscribe(Collections.singleton(this.topic));
                  ConsumerRecords<Integer, String> records =
            consumer.poll(Duration.ofSeconds(1));
                  records.forEach(record -> {
                        System.out.println(record.key() + " " + record.value() + " ->
offset:" + record.offset());
                     
            }
            );
               
        }
         
    }
      public static void main(String[] args) {
            new Consumer("test").start();
         
    }
}

異步發(fā)送

kafka對(duì)于消息的發(fā)送,可以支持同步和異步,前面演示的案例中,我們是基于同步發(fā)送消息。同步會(huì)需要阻塞,而異步不需要等待阻塞的過(guò)程。
從本質(zhì)上來(lái)說(shuō),kafka都是采用異步的方式來(lái)發(fā)送消息到broker,但是kafka并不是每次發(fā)送消息都會(huì)直接發(fā)送到broker上,而是把消息放到了一個(gè)發(fā)送隊(duì)列中,然后通過(guò)一個(gè)后臺(tái)線程不斷從隊(duì)列取出消息進(jìn)行發(fā)送,發(fā)送成功后會(huì)觸發(fā)callback。kafka客戶端會(huì)積累一定量的消息統(tǒng)一組裝成一個(gè)批量消息發(fā)送出去,觸發(fā)條件是前面提到的batch.size和linger.ms。
而同步發(fā)送的方法,無(wú)非就是通過(guò)future.get()來(lái)等待消息的發(fā)送返回結(jié)果,但是這種方法會(huì)嚴(yán)重影響消息發(fā)送的性能。

public void run() {
        int num=0;
        while(num<50){
              String msg="pratice test message:"+num;
              try {
                    producer.send(new ProducerRecord<>(topic, msg), new Callback() {
                          @Override
                          public void onCompletion(RecordMetadata recordMetadata,
                Exception e) {
                                System.out.println("callback:
"+recordMetadata.offset()+"->"+recordMetadata.partition());
                             
                }
                       
            }
            );
                    TimeUnit.SECONDS.sleep(2);
                    num++;
                 
        }
        catch (InterruptedException e) {
                    e.printStackTrace();
                 
        }
           
    }
     
}

batch.size

生產(chǎn)者發(fā)送多個(gè)消息到broker上的同一個(gè)分區(qū)時(shí),為了減少網(wǎng)絡(luò)請(qǐng)求帶來(lái)的性能開(kāi)銷,通過(guò)批量的方式來(lái)提交消息,可以通過(guò)這個(gè)參數(shù)來(lái)控制批量提交的字節(jié)數(shù)大小,默認(rèn)大小是16384byte,也就是16kb,意味著當(dāng)一批消息大小達(dá)到指定的batch.size的時(shí)候會(huì)統(tǒng)一發(fā)送

linger.ms

Producer默認(rèn)會(huì)把兩次發(fā)送時(shí)間間隔內(nèi)收集到的所有Requests進(jìn)行一次聚合然后再發(fā)送,以此提高吞吐量,而linger.ms就是為每次發(fā)送到broker的請(qǐng)求增加一些delay,以此來(lái)聚合更多的Message請(qǐng)求。這個(gè)有點(diǎn)想TCP里面的Nagle算法,在TCP協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送,采用了Nagle算法,也就是基于小包的等-停協(xié)議。
batch.size和linger.ms這兩個(gè)參數(shù)是kafka性能優(yōu)化的關(guān)鍵參數(shù),batch.size和linger.ms這兩者的作用是一樣的,如果兩個(gè)都配置了,那么怎么工作的呢?實(shí)際上,當(dāng)二者都配置的時(shí)候,只要滿足其中一個(gè)要求,就會(huì)發(fā)送請(qǐng)求到broker上

一些基礎(chǔ)配置分析

group.id

consumer group是kafka提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制。既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個(gè)公共的ID,即group ID。組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)訂閱主題(subscribed topics)的所有分區(qū)(partition)。當(dāng)然,每個(gè)分區(qū)只能由同一個(gè)消費(fèi)組內(nèi)的一個(gè)consumer來(lái)消費(fèi).如下圖所示,分別有三個(gè)消費(fèi)者,屬于兩個(gè)不同的group,那么對(duì)于firstTopic這個(gè)topic來(lái)說(shuō),這兩個(gè)組的消費(fèi)者都能同時(shí)消費(fèi)這個(gè)topic中的消息,對(duì)于此時(shí)的架構(gòu)來(lái)說(shuō),這個(gè)firstTopic就類似于ActiveMQ中的topic概念。如右圖所示,如果3個(gè)消費(fèi)者都屬于同一個(gè)group,那么此時(shí)firstTopic就是一個(gè)Queue的概念


image.png
image.png

enable.auto.commit

消費(fèi)者消費(fèi)消息以后自動(dòng)提交,只有當(dāng)消息提交以后,該消息才不會(huì)被再次接收到,還可以配合auto.commit.interval.ms控制自動(dòng)提交的頻率。
當(dāng)然,我們也可以通過(guò)consumer.commitSync()的方式實(shí)現(xiàn)手動(dòng)提交

auto.offset.reset

這個(gè)參數(shù)是針對(duì)新的groupid中的消費(fèi)者而言的,當(dāng)有新groupid的消費(fèi)者來(lái)消費(fèi)指定的topic時(shí),對(duì)于該參數(shù)的配置,會(huì)有不同的語(yǔ)義。
auto.offset.reset=latest情況下,新的消費(fèi)者將會(huì)從其他消費(fèi)者最后消費(fèi)的offset處開(kāi)始消費(fèi)Topic下的消息。
auto.offset.reset= earliest情況下,新的消費(fèi)者會(huì)從該topic最早的消息開(kāi)始消費(fèi)。
auto.offset.reset=none情況下,新的消費(fèi)者加入以后,由于之前不存在offset,則會(huì)直接拋出異常。

max.poll.records

此設(shè)置限制每次調(diào)用poll返回的消息數(shù),這樣可以更容易的預(yù)測(cè)每次poll間隔要處理的最大值。通過(guò)調(diào)整此值,可以減少poll間隔

原理分析

從前面的整個(gè)演示過(guò)程來(lái)看,只要不是超大規(guī)模的使用kafka,那么基本上沒(méi)什么大問(wèn)題,否則,對(duì)于kafka本身的運(yùn)維的挑戰(zhàn)會(huì)很大,同時(shí),針對(duì)每一個(gè)參數(shù)的調(diào)優(yōu)也顯得很重要。
據(jù)我了解,快手在使用kafka集群規(guī)模是挺大的,他們?cè)?9年的開(kāi)發(fā)者大會(huì)上有提到

總機(jī)器數(shù)大概2000 臺(tái);30 多個(gè)集群;topic 12000 個(gè);一共大概 20 萬(wàn) TP(topic partition);每天總處理的消息數(shù)超過(guò) 4 萬(wàn)億條;峰值超過(guò) 1 億條

文章出處

關(guān)于Topic和Partition

Topic

在kafka中,topic是一個(gè)存儲(chǔ)消息的邏輯概念,可以認(rèn)為是一個(gè)消息集合。每條消息發(fā)送到kafka集群的消息都有一個(gè)類別。物理上來(lái)說(shuō),不同的topic的消息是分開(kāi)存儲(chǔ)的,
每個(gè)topic可以有多個(gè)生產(chǎn)者向它發(fā)送消息,也可以有多個(gè)消費(fèi)者去消費(fèi)其中的消息。


image.png

Partition

每個(gè)topic可以劃分多個(gè)分區(qū)(每個(gè)Topic至少有一個(gè)分區(qū)),同一topic下的不同分區(qū)包含的消息是不同的。每個(gè)消息在被添加到分區(qū)時(shí),都會(huì)被分配一個(gè)offset(稱之為偏移量),它是消息在此分區(qū)中的唯一編號(hào),kafka通過(guò)offset保證消息在分區(qū)內(nèi)的順序,offset的順序不跨分區(qū),即kafka只保證在同一個(gè)分區(qū)內(nèi)的消息是有序的。

下圖中,對(duì)于名字為test的topic,做了3個(gè)分區(qū),分別是p0、p1、p2.
每一條消息發(fā)送到broker時(shí),會(huì)根據(jù)partition的規(guī)則選擇存儲(chǔ)到哪一個(gè)partition。如果partition規(guī)則設(shè)置合理,那么所有的消息會(huì)均勻的分布在不同的partition中,這樣就有點(diǎn)類似數(shù)據(jù)庫(kù)的分庫(kù)分表的概念,把數(shù)據(jù)做了分片處理。


image.png

Topic&Partition的存儲(chǔ)

Partition是以文件的形式存儲(chǔ)在文件系統(tǒng)中,比如創(chuàng)建一個(gè)名為firstTopic的topic,其中有3個(gè)partition,那么在kafka的數(shù)據(jù)目錄(/tmp/kafka-log)中就有3個(gè)目錄,firstTopic-0~3, 命名規(guī)則是<topic_name>-<partition_id>

sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 --topic firstTopic

關(guān)于消息分發(fā)

kafka消息分發(fā)策略

消息是kafka中最基本的數(shù)據(jù)單元,在kafka中,一條消息由key、value兩部分構(gòu)成,在發(fā)送一條消息時(shí),我們可以指定這個(gè)key,那么producer會(huì)根據(jù)key和partition機(jī)制來(lái)判斷當(dāng)前這條消息應(yīng)該發(fā)送并存儲(chǔ)到哪個(gè)partition中。我們可以根據(jù)需要進(jìn)行擴(kuò)展producer的partition機(jī)制。

自定義Partitioner

public class MyPartitioner implements Partitioner {
      private Random random = new Random();
      @Override
      public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
            //獲取集群中指定topic的所有分區(qū)信息
            List<PartitionInfo> partitionInfos=cluster.partitionsForTopic(s);
            int numOfPartition=partitionInfos.size();
            int partitionNum=0;
            if(o==null){
                   //key沒(méi)有設(shè)置
                  partitionNum=random.nextint(numOfPartition);
                //隨機(jī)指定分區(qū)               
            } else{
                  partitionNum=Math.abs((o1.hashCode()))%numOfPartition;               
            }
            System.out.println("key->"+o+",value->"+o1+"->send to partition:"+partitionNum);
            return partitionNum;         
    }
}

發(fā)送端代碼添加自定義分區(qū)

public KafkaProducerDemo(String topic,Boolean isAysnc){
      Properties properties=new Properties();
      properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
          "192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
      properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
      properties.put(ProducerConfig.ACKS_CONFIG,"-1");
      properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.IntegerSerializer");
      properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer");
     properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.wei.kafka.MyPartitioner");
      producer=new KafkaProducer<Integer, String>(properties);
      this.topic=topic;
      this.isAysnc=isAysnc;
}

消息默認(rèn)的分發(fā)機(jī)制

默認(rèn)情況下,kafka采用的是hash取模的分區(qū)算法。如果Key為null,則會(huì)隨機(jī)分配一個(gè)分區(qū)。這個(gè)隨機(jī)是在這個(gè)參數(shù)”metadata.max.age.ms”的時(shí)間范圍內(nèi)隨機(jī)選擇一個(gè)。對(duì)于這個(gè)時(shí)間段內(nèi),如果key為null,則只會(huì)發(fā)送到唯一的分區(qū)。這個(gè)值值哦默認(rèn)情況下是10分鐘更新一次。

關(guān)于Metadata,這個(gè)之前沒(méi)講過(guò),簡(jiǎn)單理解就是Topic/Partition和broker的映射關(guān)系,每一個(gè)topic的每一個(gè)partition,需要知道對(duì)應(yīng)的broker列表是什么,leader是誰(shuí)、follower是誰(shuí)。這些信息都是存儲(chǔ)在Metadata這個(gè)類里面。

消費(fèi)端如何消費(fèi)指定的分區(qū)

通過(guò)下面的代碼,就可以消費(fèi)指定該topic下的0號(hào)分區(qū)。其他分區(qū)的數(shù)據(jù)就無(wú)法接收

//消費(fèi)指定分區(qū)的時(shí)候,不需要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費(fèi)指定的分區(qū)
TopicPartition topicPartition=new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));

消息的消費(fèi)原理

在實(shí)際生產(chǎn)過(guò)程中,每個(gè)topic都會(huì)有多個(gè)partitions,多個(gè)partitions的好處在于,一方面能夠?qū)roker上的數(shù)據(jù)進(jìn)行分片有效減少了消息的容量從而提升io性能。另外一方面,為了提高消費(fèi)端的消費(fèi)能力,一般會(huì)通過(guò)多個(gè)consumer去消費(fèi)同一個(gè)topic ,也就是消費(fèi)端的負(fù)載均衡機(jī)制,也就是我們接下來(lái)要了解的,在多個(gè)partition以及多個(gè)consumer的情況下,消費(fèi)者是如何消費(fèi)消息的。
kafka存在consumer group的概念,也就是group.id一樣的consumer,這些consumer屬于一個(gè)consumer group,組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來(lái)消費(fèi)訂閱主題的所有分區(qū)。當(dāng)然每一個(gè)分區(qū)只能由同一個(gè)消費(fèi)組內(nèi)的consumer來(lái)消費(fèi),那么同一個(gè)consumergroup里面的consumer是怎么去分配該消費(fèi)哪個(gè)分區(qū)里的數(shù)據(jù)的呢?如下圖所示,3個(gè)分區(qū),3個(gè)消費(fèi)者,那么哪個(gè)消費(fèi)者消分哪個(gè)分區(qū)?


image.png

對(duì)于上面這個(gè)圖來(lái)說(shuō),這3個(gè)消費(fèi)者會(huì)分別消費(fèi)test這個(gè)topic 的3個(gè)分區(qū),也就是每個(gè)consumer消費(fèi)一個(gè)partition。

  • 演示1(3個(gè)partiton對(duì)應(yīng)3個(gè)consumer)
    ? 創(chuàng)建一個(gè)帶3個(gè)分區(qū)的topic
    ? 啟動(dòng)3個(gè)消費(fèi)者消費(fèi)同一個(gè)topic,并且這3個(gè)consumer屬于同一個(gè)組
    ? 啟動(dòng)發(fā)送者進(jìn)行消息發(fā)送

演示結(jié)果:consumer1會(huì)消費(fèi)partition0分區(qū)、consumer2會(huì)消費(fèi)partition1分區(qū)、consumer3會(huì)消費(fèi)partition2分區(qū)
如果是2個(gè)consumer消費(fèi)3個(gè)partition呢?會(huì)是怎么樣的結(jié)果?

  • 演示2(3個(gè)partiton對(duì)應(yīng)2個(gè)consumer)
    ? 基于上面演示的案例的topic不變
    ? 啟動(dòng)2個(gè)消費(fèi)這消費(fèi)該topic
    ? 啟動(dòng)發(fā)送者進(jìn)行消息發(fā)送
    演示結(jié)果:consumer1會(huì)消費(fèi)partition0/partition1分區(qū)、consumer2會(huì)消費(fèi)partition2分區(qū)

  • 演示3(3個(gè)partition對(duì)應(yīng)4個(gè)或以上consumer)
    演示結(jié)果:仍然只有3個(gè)consumer對(duì)應(yīng)3個(gè)partition,其他的consumer無(wú)法消費(fèi)消息
    通過(guò)這個(gè)演示的過(guò)程,引出接下來(lái)需要了解的kafka的分區(qū)分配策略(Partition Assignment Strategy)

consumer和partition的數(shù)量建議

  1. 如果consumer比partition多,是浪費(fèi),因?yàn)閗afka的設(shè)計(jì)是在一個(gè)partition上是不允許并發(fā)的,所以consumer數(shù)不要大于partition數(shù)
  2. 如果consumer比partition少,一個(gè)consumer會(huì)對(duì)應(yīng)于多個(gè)partitions,這里主要合理分配consumer數(shù)和partition數(shù),否則會(huì)導(dǎo)致partition里面的數(shù)據(jù)被取的不均勻。最好partiton數(shù)目是consumer數(shù)目的整數(shù)倍,所以partition數(shù)目很重要,比如取24,就很容易設(shè)定consumer數(shù)目
  3. 如果consumer從多個(gè)partition讀到數(shù)據(jù),不保證數(shù)據(jù)間的順序性,kafka只保證在一個(gè)partition上數(shù)據(jù)是有序的,但多個(gè)partition,根據(jù)你讀的順序會(huì)有不同
  4. 增減consumer,broker,partition會(huì)導(dǎo)致rebalance,所以rebalance后consumer對(duì)應(yīng)的partition會(huì)發(fā)生變化

什么是分區(qū)分配策略

通過(guò)前面的案例演示,我們應(yīng)該能猜到,同一個(gè)group中的消費(fèi)者對(duì)于一個(gè)topic中的多個(gè)partition,存在一定的分區(qū)分配策略。
在kafka中,存在三種分區(qū)分配策略,一種是Range(默認(rèn))、 另一種是RoundRobin(輪詢)、StickyAssignor(粘性)。 在消費(fèi)端中的ConsumerConfig中,通過(guò)這個(gè)屬性來(lái)指定分區(qū)分配策略

public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";

RangeAssignor(范圍分區(qū))

Range策略是對(duì)每個(gè)主題而言的,首先對(duì)同一個(gè)主題里面的分區(qū)按照序號(hào)進(jìn)行排序,并對(duì)消費(fèi)者按照字母順序進(jìn)行排序。

假設(shè)n = 分區(qū)數(shù)/消費(fèi)者數(shù)量
m= 分區(qū)數(shù)%消費(fèi)者數(shù)量
那么前m個(gè)消費(fèi)者每個(gè)分配n+l個(gè)分區(qū),后面的(消費(fèi)者數(shù)量-m)個(gè)消費(fèi)者每個(gè)分配n個(gè)分區(qū)

假設(shè)我們有10個(gè)分區(qū),3個(gè)消費(fèi)者,排完序的分區(qū)將會(huì)是0, 1, 2, 3, 4, 5, 6, 7, 8, 9;消費(fèi)者線程排完序?qū)?huì)是C1-0, C2-0, C3-0。然后將partitions的個(gè)數(shù)除于消費(fèi)者線程的總數(shù)來(lái)決定每個(gè)消費(fèi)者線程消費(fèi)幾個(gè)分區(qū)。如果除不盡,那么前面幾個(gè)消費(fèi)者線程將會(huì)多消費(fèi)一個(gè)分區(qū)。在我們的例子里面,我們有10個(gè)分區(qū),3個(gè)消費(fèi)者線程, 10 / 3 = 3,而且除不盡,那么消費(fèi)者線程 C1-0 將會(huì)多消費(fèi)一個(gè)分區(qū).
結(jié)果看起來(lái)是這樣的:
C1-0 將消費(fèi) 0, 1, 2, 3 分區(qū)
C2-0 將消費(fèi) 4, 5, 6 分區(qū)
C3-0 將消費(fèi) 7, 8, 9 分區(qū)

假如我們有11個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來(lái)是這樣的:
C1-0 將消費(fèi) 0, 1, 2, 3 分區(qū)
C2-0 將消費(fèi) 4, 5, 6, 7 分區(qū)
C3-0 將消費(fèi) 8, 9, 10 分區(qū)

假如我們有2個(gè)主題(T1和T2),分別有10個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來(lái)是這樣的:
C1-0 將消費(fèi) T1主題的 0, 1, 2, 3 分區(qū)以及 T2主題的 0, 1, 2, 3分區(qū)
C2-0 將消費(fèi) T1主題的 4, 5, 6 分區(qū)以及 T2主題的 4, 5, 6分區(qū)
C3-0 將消費(fèi) T1主題的 7, 8, 9 分區(qū)以及 T2主題的 7, 8, 9分區(qū)

可以看出,C1-0 消費(fèi)者線程比其他消費(fèi)者線程多消費(fèi)了2個(gè)分區(qū),這就是Range strategy的一個(gè)很明顯的弊端

RoundRobinAssignor(輪詢分區(qū))

輪詢分區(qū)策略是把所有partition和所有consumer線程都列出來(lái),然后按照hashcode進(jìn)行排序。最后通過(guò)輪詢算法分配partition給消費(fèi)線程。如果所有consumer實(shí)例的訂閱是相同的,那么partition會(huì)均勻分布。

在我們的例子里面,假如按照 hashCode 排序完的topic-partitions組依次為T1-5, T1-3, T1-0, T1-8, T1-2, T1-1, T1-4, T1-7, T1-6, T1-9,我們的消費(fèi)者線程排序?yàn)镃1-0, C1-1, C2-0, C2-1,最后分區(qū)分配的結(jié)果為:
C1-0 將消費(fèi) T1-5, T1-2, T1-6 分區(qū);
C1-1 將消費(fèi) T1-3, T1-1, T1-9 分區(qū);
C2-0 將消費(fèi) T1-0, T1-4 分區(qū);
C2-1 將消費(fèi) T1-8, T1-7 分區(qū);

使用輪詢分區(qū)策略必須滿足兩個(gè)條件:

  1. 每個(gè)主題的消費(fèi)者實(shí)例具有相同數(shù)量的流
  2. 每個(gè)消費(fèi)者訂閱的主題必須是相同的

StrickyAssignor 分配策略

kafka在0.11.x版本支持了StrickyAssignor, 翻譯過(guò)來(lái)叫粘滯策略,它主要有兩個(gè)目的:

  • 分區(qū)的分配盡可能的均勻
  • 分區(qū)的分配盡可能和上次分配保持相同

當(dāng)兩者發(fā)生沖突時(shí), 第 一 個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo)。 鑒于這兩個(gè)目標(biāo), StickyAssignor分配策略的具體實(shí)現(xiàn)要比RangeAssignor和RoundRobinAssi gn or這兩種分配策略要復(fù)雜得多,假設(shè)我們有這樣一個(gè)場(chǎng)景:

假設(shè)消費(fèi)組有3個(gè)消費(fèi)者:C0,C1,C2,它們分別訂閱了4個(gè)Topic(t0,t1,t2,t3),并且每個(gè)主題有兩個(gè)分區(qū)(p0,p1),也就是說(shuō),整個(gè)消費(fèi)組訂閱了8個(gè)分區(qū):tOpO 、 tOpl 、 tlpO 、 tlpl 、 t2p0 、t2pl 、t3p0 、 t3pl
那么最終的分配場(chǎng)景結(jié)果為
CO: tOpO、tlpl 、 t3p0
Cl: tOpl、t2p0 、 t3pl
C2: tlpO、t2pl
這種分配方式有點(diǎn)類似于輪詢策略,但實(shí)際上并不是,因?yàn)榧僭O(shè)這個(gè)時(shí)候,C1這個(gè)消費(fèi)者掛了,就勢(shì)必會(huì)造成重新分區(qū)(reblance),如果是輪詢,那么結(jié)果應(yīng)該是
CO: tOpO、tlpO、t2p0、t3p0
C2: tOpl、tlpl、t2pl、t3pl
然后,strickyAssignor它是一種粘滯策略,所以它會(huì)滿足`分區(qū)的分配盡可能和上次分配保持相同,所以分配結(jié)果應(yīng)該是
消費(fèi)者CO: tOpO、tlpl 、 t3p0、t2p0
消費(fèi)者C2: tlpO、t2pl、tOpl、t3pl
也就是說(shuō),C0和C2保留了上一次是的分配結(jié)果,并且把原來(lái)C1的分區(qū)分配給了C0和C2。 這種策略的好處是使得分區(qū)發(fā)生變化時(shí),由于分區(qū)的“粘性,減少了不必要的分區(qū)移動(dòng)

誰(shuí)來(lái)執(zhí)行Rebalance以及管理consumer的group呢?

Kafka提供了一個(gè)角色:coordinator來(lái)執(zhí)行對(duì)于consumer group的管理,Kafka提供了一個(gè)角色:coordinator來(lái)執(zhí)行對(duì)于consumer group的管理,當(dāng)consumer group的第一個(gè)consumer啟動(dòng)的時(shí)候,它會(huì)去和kafka server確定誰(shuí)是它們組的coordinator。之后該group內(nèi)的所有成員都會(huì)和該coordinator進(jìn)行協(xié)調(diào)通信

如何確定coordinator

consumer group如何確定自己的coordinator是誰(shuí)呢, 消費(fèi)者向kafka集群中的任意一個(gè)broker發(fā)送一個(gè)GroupCoordinatorRequest請(qǐng)求,服務(wù)端會(huì)返回一個(gè)負(fù)載最小的broker節(jié)點(diǎn)的id,并將該broker設(shè)置為coordinator

JoinGroup的過(guò)程

在rebalance之前,需要保證coordinator是已經(jīng)確定好了的,整個(gè)rebalance的過(guò)程分為兩個(gè)步驟,Join和Sync

join: 表示加入到consumer group中,在這一步中,所有的成員都會(huì)向coordinator發(fā)送joinGroup的請(qǐng)求。一旦所有成員都發(fā)送了joinGroup請(qǐng)求,那么coordinator會(huì)選擇一個(gè)consumer擔(dān)任leader角色,并把組成員信息和訂閱信息發(fā)送消費(fèi)者
leader選舉算法比較簡(jiǎn)單,如果消費(fèi)組內(nèi)沒(méi)有l(wèi)eader,那么第一個(gè)加入消費(fèi)組的消費(fèi)者就是消費(fèi)者leader,如果這個(gè)時(shí)候leader消費(fèi)者退出了消費(fèi)組,那么重新選舉一個(gè)leader,這個(gè)選舉很隨意,類似于隨機(jī)算法


image.png

protocol_metadata: 序列化后的消費(fèi)者的訂閱信息
leader_id: 消費(fèi)組中的消費(fèi)者,coordinator會(huì)選擇一個(gè)座位leader,對(duì)應(yīng)的就是member_id
member_metadata 對(duì)應(yīng)消費(fèi)者的訂閱信息
members:consumer group中全部的消費(fèi)者的訂閱信息
generation_id: 年代信息,類似于之前講解zookeeper的時(shí)候的epoch是一樣的,對(duì)于每一輪rebalance,generation_id都會(huì)遞增。主要用來(lái)保護(hù)consumer group。隔離無(wú)效的offset提交。也就是上一輪的consumer成員無(wú)法提交offset到新的consumer group中。

每個(gè)消費(fèi)者都可以設(shè)置自己的分區(qū)分配策略,對(duì)于消費(fèi)組而言,會(huì)從各個(gè)消費(fèi)者上報(bào)過(guò)來(lái)的分區(qū)分配策略中選舉一個(gè)彼此都贊同的策略來(lái)實(shí)現(xiàn)整體的分區(qū)分配,這個(gè)"贊同"的規(guī)則是,消費(fèi)組內(nèi)的各個(gè)消費(fèi)者會(huì)通過(guò)投票來(lái)決定

  • 在joingroup階段,每個(gè)consumer都會(huì)把自己支持的分區(qū)分配策略發(fā)送到coordinator
  • coordinator手機(jī)到所有消費(fèi)者的分配策略,組成一個(gè)候選集
  • 每個(gè)消費(fèi)者需要從候選集里找出一個(gè)自己支持的策略,并且為這個(gè)策略投票
  • 最終計(jì)算候選集中各個(gè)策略的選票數(shù),票數(shù)最多的就是當(dāng)前消費(fèi)組的分配策略

Synchronizing Group State階段

完成分區(qū)分配之后,就進(jìn)入了Synchronizing Group State階段,主要邏輯是向GroupCoordinator發(fā)送SyncGroupRequest請(qǐng)求,并且處理SyncGroupResponse響應(yīng),簡(jiǎn)單來(lái)說(shuō),就是leader將消費(fèi)者對(duì)應(yīng)的partition分配方案同步給consumer group 中的所有consumer


image.png

每個(gè)消費(fèi)者都會(huì)向coordinator發(fā)送syncgroup請(qǐng)求,不過(guò)只有l(wèi)eader節(jié)點(diǎn)會(huì)發(fā)送分配方案,其他消費(fèi)者只是打打醬油而已。當(dāng)leader把方案發(fā)給coordinator以后,coordinator會(huì)把結(jié)果設(shè)置到SyncGroupResponse中。這樣所有成員都知道自己應(yīng)該消費(fèi)哪個(gè)分區(qū)。

consumer group的分區(qū)分配方案是在客戶端執(zhí)行的!Kafka將這個(gè)權(quán)利下放給客戶端主要是因?yàn)檫@樣做可以有更好的靈活性

總結(jié)

我們?cè)賮?lái)總結(jié)一下consumer group rebalance的過(guò)程
? 對(duì)于每個(gè)consumer group子集,都會(huì)在服務(wù)端對(duì)應(yīng)一個(gè)GroupCoordinator進(jìn)行管理,GroupCoordinator會(huì)在zookeeper上添加watcher,當(dāng)消費(fèi)者加入或者退出consumer group時(shí),會(huì)修改zookeeper上保存的數(shù)據(jù),從而觸發(fā)GroupCoordinator開(kāi)始Rebalance操作
? 當(dāng)消費(fèi)者準(zhǔn)備加入某個(gè)Consumer group或者GroupCoordinator發(fā)生故障轉(zhuǎn)移時(shí),消費(fèi)者并不知道GroupCoordinator的在網(wǎng)絡(luò)中的位置,這個(gè)時(shí)候就需要確定GroupCoordinator,消費(fèi)者會(huì)向集群中的任意一個(gè)Broker節(jié)點(diǎn)發(fā)送ConsumerMetadataRequest請(qǐng)求,收到請(qǐng)求的broker會(huì)返回一個(gè)response作為響應(yīng),其中包含管理當(dāng)前ConsumerGroup的GroupCoordinator,
? 消費(fèi)者會(huì)根據(jù)broker的返回信息,連接到groupCoordinator,并且發(fā)送HeartbeatRequest,發(fā)送心跳的目的是要要奧噶蘇GroupCoordinator這個(gè)消費(fèi)者是正常在線的。當(dāng)消費(fèi)者在指定時(shí)間內(nèi)沒(méi)有發(fā)送心跳請(qǐng)求,則GroupCoordinator會(huì)觸發(fā)Rebalance操作。

? 發(fā)起join group請(qǐng)求,兩種情況

  • 如果GroupCoordinator返回的心跳包數(shù)據(jù)包含異常,說(shuō)明GroupCoordinator因?yàn)榍懊嬲f(shuō)的幾種情況導(dǎo)致了Rebalance操作,那這個(gè)時(shí)候,consumer會(huì)發(fā)起join group請(qǐng)求
  • 新加入到consumer group的consumer確定好了GroupCoordinator以后消費(fèi)者會(huì)向GroupCoordinator發(fā)起join group請(qǐng)求,GroupCoordinator會(huì)收集全部消費(fèi)者信息之后,來(lái)確認(rèn)可用的消費(fèi)者,并從中選取一個(gè)消費(fèi)者成為group_leader。并把相應(yīng)的信息(分區(qū)分配策略、leader_id、…)封裝成response返回給所有消費(fèi)者,但是只有g(shù)roup leader會(huì)收到當(dāng)前consumer group中的所有消費(fèi)者信息。當(dāng)消費(fèi)者確定自己是group leader以后,會(huì)根據(jù)消費(fèi)者的信息以及選定分區(qū)分配策略進(jìn)行分區(qū)分配
  • 接著進(jìn)入Synchronizing Group State階段,每個(gè)消費(fèi)者會(huì)發(fā)送SyncGroupRequest請(qǐng)求到GroupCoordinator,但是只有Group Leader的請(qǐng)求會(huì)存在分區(qū)分配結(jié)果,GroupCoordinator會(huì)根據(jù)Group Leader的分區(qū)分配結(jié)果形成SyncGroupResponse返回給所有的Consumer。
  • consumer根據(jù)分配結(jié)果,執(zhí)行相應(yīng)的操作

到這里為止,我們已經(jīng)知道了消息的發(fā)送分區(qū)策略,以及消費(fèi)者的分區(qū)消費(fèi)策略和rebalance。對(duì)于應(yīng)用層面來(lái)說(shuō),還有一個(gè)最重要的東西沒(méi)有講解,就是offset,他類似一個(gè)游標(biāo),表示當(dāng)前消費(fèi)的消息的位置。

如何保存消費(fèi)端的消費(fèi)位置

什么是offset

前面在講解partition的時(shí)候,提到過(guò)offset, 每個(gè)topic可以劃分多個(gè)分區(qū)(每個(gè)Topic至少有一個(gè)分區(qū)),同一topic下的不同分區(qū)包含的消息是不同的。每個(gè)消息在被添加到分區(qū)時(shí),都會(huì)被分配一個(gè)offset(稱之為偏移量),它是消息在此分區(qū)中的唯一編號(hào),kafka通過(guò)offset保證消息在分區(qū)內(nèi)的順序,offset的順序不跨分區(qū),即kafka只保證在同一個(gè)分區(qū)內(nèi)的消息是有序的; 對(duì)于應(yīng)用層的消費(fèi)來(lái)說(shuō),每次消費(fèi)一個(gè)消息并且提交以后,會(huì)保存當(dāng)前消費(fèi)到的最近的一個(gè)offset。那么offset保存在哪里?

image.png

offset在哪里維護(hù)?

在kafka中,提供了一個(gè)consumer_offsets_* 的一個(gè)topic,把offset信息寫入到這個(gè)topic中。
consumer_offsets——按保存了每個(gè)consumer group某一時(shí)刻提交的offset信息。
__consumer_offsets 默認(rèn)有50個(gè)分區(qū)。
根據(jù)前面我們演示的案例,我們?cè)O(shè)置了一個(gè)KafkaConsumerDemo的groupid。首先我們需要找到這個(gè)consumer_group保存在哪個(gè)分區(qū)中
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
計(jì)算公式:
Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由于默認(rèn)情況下groupMetadataTopicPartitionCount有50個(gè)分區(qū),計(jì)算得到的結(jié)果為:35, 意味著當(dāng)前的consumer_group的位移信息保存在__consumer_offsets的第35個(gè)分區(qū)
執(zhí)行如下命令,可以查看當(dāng)前consumer_goup中的offset位移提交的信息

kafka-console-consumer.sh --topic __consumer_offsets --partition 15 --bootstrap-server 192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092
--formatter
'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'

從輸出結(jié)果中,我們就可以看到test這個(gè)topic的offset的位移日志

分區(qū)的副本機(jī)制

我們已經(jīng)知道Kafka的每個(gè)topic都可以分為多個(gè)Partition,并且多個(gè)partition會(huì)均勻分布在集群的各個(gè)節(jié)點(diǎn)下。雖然這種方式能夠有效的對(duì)數(shù)據(jù)進(jìn)行分片,但是對(duì)于每個(gè)partition來(lái)說(shuō),都是單點(diǎn)的,當(dāng)其中一個(gè)partition不可用的時(shí)候,那么這部分消息就沒(méi)辦法消費(fèi)。所以kafka為了提高partition的可靠性而提供了副本的概念(Replica),通過(guò)副本機(jī)制來(lái)實(shí)現(xiàn)冗余備份。

每個(gè)分區(qū)可以有多個(gè)副本,并且在副本集合中會(huì)存在一個(gè)leader的副本,所有的讀寫請(qǐng)求都是由leader副本來(lái)進(jìn)行處理。剩余的其他副本都做為follower副本,follower副本會(huì)從leader副本同步消息日志。
這個(gè)有點(diǎn)類似zookeeper中l(wèi)eader和follower的概念,但是具體的時(shí)間方式還是有比較大的差異。所以我們可以認(rèn)為,副本集會(huì)存在一主多從的關(guān)系。

一般情況下,同一個(gè)分區(qū)的多個(gè)副本會(huì)被均勻分配到集群中的不同broker上,當(dāng)leader副本所在的broker出現(xiàn)故障后,可以重新選舉新的leader副本繼續(xù)對(duì)外提供服務(wù)。通過(guò)這樣的副本機(jī)制來(lái)提高kafka集群的可用性。

創(chuàng)建一個(gè)帶副本機(jī)制的topic

通過(guò)下面的命令去創(chuàng)建帶2個(gè)副本的topic

sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic

然后我們可以在/tmp/kafka-log路徑下看到對(duì)應(yīng)topic的副本信息了。我們通過(guò)一個(gè)圖形的方式來(lái)表達(dá)。
針對(duì)secondTopic這個(gè)topic的3個(gè)分區(qū)對(duì)應(yīng)的3個(gè)副本


image.png

如何知道那個(gè)各個(gè)分區(qū)中對(duì)應(yīng)的leader是誰(shuí)呢?

在zookeeper服務(wù)器上,通過(guò)如下命令去獲取對(duì)應(yīng)分區(qū)的信息, 比如下面這個(gè)是獲取secondTopic第1個(gè)
分區(qū)的狀態(tài)信息。

get /brokers/topics/secondTopic/partitions/1/state

{"controller_epoch":12,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1]}
或通過(guò)這個(gè)命令

sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test_partition

leader表示當(dāng)前分區(qū)的leader是那個(gè)broker-id。下圖中。綠色線條的表示該分區(qū)中的leader節(jié)點(diǎn)。其他節(jié)點(diǎn)就為follower


image.png

需要注意的是,kafka集群中的一個(gè)broker中最多只能有一個(gè)副本,leader副本所在的broker節(jié)點(diǎn)的分區(qū)叫l(wèi)eader節(jié)點(diǎn),follower副本所在的broker節(jié)點(diǎn)的分區(qū)叫follower節(jié)點(diǎn)

副本的leader選舉

Kafka提供了數(shù)據(jù)復(fù)制算法保證,如果leader副本所在的broker節(jié)點(diǎn)宕機(jī)或者出現(xiàn)故障,或者分區(qū)的leader節(jié)點(diǎn)發(fā)生故障,這個(gè)時(shí)候怎么處理呢?
那么,kafka必須要保證從follower副本中選擇一個(gè)新的leader副本。那么kafka是如何實(shí)現(xiàn)選舉的呢?
要了解leader選舉,我們需要了解幾個(gè)概念
Kafka分區(qū)下有可能有很多個(gè)副本(replica)用于實(shí)現(xiàn)冗余,從而進(jìn)一步實(shí)現(xiàn)高可用。副本根據(jù)角色的不同可分為3類:

  • leader副本:響應(yīng)clients端讀寫請(qǐng)求的副本
  • follower副本:被動(dòng)地備份leader副本中的數(shù)據(jù),不能響應(yīng)clients端讀寫請(qǐng)求。
  • ISR副本:包含了leader副本和所有與leader副本保持同步的follower副本——如何判定是否與leader同步后面會(huì)提到每個(gè)Kafka副本對(duì)象都有兩個(gè)重要的屬性:LEO和HW。注意是所有的副本,而不只是leader副本。
  • LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下一條消息!也就是說(shuō),如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,leader LEO和follower LEO的更新是有區(qū)別的。我們后面會(huì)詳細(xì)說(shuō)
  • HW:即上面提到的水位值。對(duì)于同一個(gè)副本對(duì)象而言,其HW值不會(huì)大于LEO值。小于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區(qū)別的
    從生產(chǎn)者發(fā)出的 一 條消息首先會(huì)被寫入分區(qū)的leader 副本,不過(guò)還需要等待ISR集合中的所有follower副本都同步完之后才能被認(rèn)為已經(jīng)提交,之后才會(huì)更新分區(qū)的HW, 進(jìn)而消費(fèi)者可以消費(fèi)到這條消息。

副本協(xié)同機(jī)制

剛剛提到了,消息的讀寫操作都只會(huì)由leader節(jié)點(diǎn)來(lái)接收和處理。follower副本只負(fù)責(zé)同步數(shù)據(jù)以及當(dāng)leader副本所在的broker掛了以后,會(huì)從follower副本中選取新的leader。

寫請(qǐng)求首先由Leader副本處理,之后follower副本會(huì)從leader上拉取寫入的消息,這個(gè)過(guò)程會(huì)有一定的延遲,導(dǎo)致follower副本中保存的消息略少于leader副本,但是只要沒(méi)有超出閾值都可以容忍。但是如果一個(gè)follower副本出現(xiàn)異常,比如宕機(jī)、網(wǎng)絡(luò)斷開(kāi)等原因長(zhǎng)時(shí)間沒(méi)有同步到消息,那這個(gè)時(shí)候,leader就會(huì)把它踢出去。kafka通過(guò)ISR集合來(lái)維護(hù)一個(gè)分區(qū)副本信息


image.png

一個(gè)新leader被選舉并被接受客戶端的消息成功寫入。Kafka確保從同步副本列表中選舉一個(gè)副本為leader;leader負(fù)責(zé)維護(hù)和跟蹤ISR(in-Sync replicas , 副本同步隊(duì)列)中所有follower滯后的狀態(tài)。當(dāng)producer發(fā)送一條消息到broker后,leader寫入消息并復(fù)制到所有follower。消息提交之后才被成功復(fù)制到所有的同步副本。

ISR

ISR表示目前“可用且消息量與leader相差不多的副本集合,這是整個(gè)副本集合的一個(gè)子集”。怎么去理解可用和相差不多這兩個(gè)詞呢?具體來(lái)說(shuō),ISR集合中的副本必須滿足兩個(gè)條件:

  1. 副本所在節(jié)點(diǎn)必須維持著與zookeeper的連接
  2. 副本最后一條消息的offset與leader副本的最后一條消息的offset之間的差值不能超過(guò)指定的閾值
    (replica.lag.time.max.ms) replica.lag.time.max.ms:如果該follower在此時(shí)間間隔內(nèi)一直沒(méi)有追上過(guò)leader的所有消息,則該follower就會(huì)被剔除isr列表
  3. ISR數(shù)據(jù)保存在Zookeeper的 /brokers/topics/<topic>/partitions/<partitionId>/state 節(jié)點(diǎn)中

follower副本把leader副本LEO之前的日志全部同步完成時(shí),則認(rèn)為follower副本已經(jīng)追趕上了leader副本,這個(gè)時(shí)候會(huì)更新這個(gè)副本的lastCaughtUpTimeMs標(biāo)識(shí),kafk副本管理器會(huì)啟動(dòng)一個(gè)副本過(guò)期檢查的定時(shí)任務(wù),這個(gè)任務(wù)會(huì)定期檢查當(dāng)前時(shí)間與副本的lastCaughtUpTimeMs的差值是否大于參數(shù)replica.lag.time.max.ms 的值,如果大于,則會(huì)把這個(gè)副本踢出ISR集合

image.png

如何處理所有的Replica不工作的情況

在ISR中至少有一個(gè)follower時(shí),Kafka可以確保已經(jīng)commit的數(shù)據(jù)不丟失,但如果某個(gè)Partition的所有Replica都宕機(jī)了,就無(wú)法保證數(shù)據(jù)不丟失了

  1. 等待ISR中的任一個(gè)Replica“活”過(guò)來(lái),并且選它作為L(zhǎng)eader
  2. 選擇第一個(gè)“活”過(guò)來(lái)的Replica(不一定是ISR中的)作為L(zhǎng)eader

這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的折衷。
如果一定要等待ISR中的Replica“活”過(guò)來(lái),那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果ISR中的所有Replica都無(wú)法“活”過(guò)來(lái)了,或者數(shù)據(jù)都丟失了,這個(gè)Partition將永遠(yuǎn)不可用。
選擇第一個(gè)“活”過(guò)來(lái)的Replica作為L(zhǎng)eader,而這個(gè)Replica不是ISR中的Replica,那即使它并不保證已經(jīng)包含了所有已commit的消息,它也會(huì)成為L(zhǎng)eader而作為consumer的數(shù)據(jù)源(所有讀寫都由Leader完成)。在我們課堂講的版本中,使用的是第一種策略。

副本數(shù)據(jù)同步原理

了解了副本的協(xié)同過(guò)程以后,還有一個(gè)最重要的機(jī)制,就是數(shù)據(jù)的同步過(guò)程。它需要解決

  1. 怎么傳播消息
  2. 在向消息發(fā)送端返回ack之前需要保證多少個(gè)Replica已經(jīng)接收到這個(gè)消息

下圖中,深紅色部分表示test_replica分區(qū)的leader副本,另外兩個(gè)節(jié)點(diǎn)上淺色部分表示follower副本


image.png

Producer在發(fā)布消息到某個(gè)Partition時(shí),

  • 先通過(guò)ZooKeeper找到該P(yáng)artition的Leader get /brokers/topics/<topic>/partitions/2/state ,然后無(wú)論該Topic的Replication Factor為多少(也即該P(yáng)artition有多少個(gè)Replica),Producer只將該消息發(fā)送到該P(yáng)artition的Leader。
  • Leader會(huì)將該消息寫入其本地Log。每個(gè)Follower都從Leader pull數(shù)據(jù)。這種方式上,F(xiàn)ollower存儲(chǔ)的數(shù)據(jù)順序與Leader保持一致。
  • Follower在收到該消息并寫入其Log后,向Leader發(fā)送ACK。
  • 一旦Leader收到了ISR中的所有Replica的ACK,該消息就被認(rèn)為已經(jīng)commit了,Leader將增加HW(HighWatermark)并且向Producer發(fā)送ACK。

LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值。注意是下一條消息!也就是說(shuō),如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,leader LEO和follower LEO的更新是有區(qū)別的。

HW:即上面提到的水位值(Hight Water)。對(duì)于同一個(gè)副本對(duì)象而言,其HW值不會(huì)大于LEO值。小于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。同理,leader副本和follower副本的HW更新是有區(qū)別的

通過(guò)下面這幅圖來(lái)表達(dá)LEO、HW的含義,隨著follower副本不斷和leader副本進(jìn)行數(shù)據(jù)同步,follower副本的LEO會(huì)主鍵后移并且追趕到leader副本,這個(gè)追趕上的判斷標(biāo)準(zhǔn)是當(dāng)前副本的LEO是否大于或者等于leader副本的HW,這個(gè)追趕上也會(huì)使得被踢出的follower副本重新加入到ISR集合中。
另外, 假如說(shuō)下圖中的最右側(cè)的follower副本被踢出ISR集合,也會(huì)導(dǎo)致這個(gè)分區(qū)的HW發(fā)生變化,變成了3


image.png

數(shù)據(jù)丟失的問(wèn)題

表達(dá)的含義是,至少需要多少個(gè)副本同步才能表示消息是提交的, 所以,當(dāng) min.insync.replicas=1的時(shí)候,一旦消息被寫入leader端log即被認(rèn)為是“已提交”,而延遲一輪FETCH RPC更新HW值的設(shè)計(jì)使得follower HW值是異步延遲更新的,倘若在這個(gè)過(guò)程中l(wèi)eader發(fā)生變更,那么成為新leader的follower的HW值就有可能是過(guò)期的,使得clients端認(rèn)為是成功提交的消息被刪除。


image.png

acks配置表示producer發(fā)送消息到broker上以后的確認(rèn)值。有三個(gè)可選項(xiàng)
0:表示producer不需要等待broker的消息確認(rèn)。這個(gè)選項(xiàng)時(shí)延最小但同時(shí)風(fēng)險(xiǎn)最大(因?yàn)楫?dāng)server宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)。
1:表示producer只需要獲得kafka集群中的leader節(jié)點(diǎn)確認(rèn)即可,這個(gè)選擇時(shí)延較小同時(shí)確保了leader節(jié)點(diǎn)確認(rèn)接收成功。
all(-1):需要ISR中所有的Replica給予接收確認(rèn),速度最慢,安全性最高,但是由于ISR可能會(huì)縮小到僅包含一個(gè)Replica,所以設(shè)置參數(shù)為all并不能一定避免數(shù)據(jù)丟失,

數(shù)據(jù)丟失的解決方案

在kafka0.11.0.0版本之后,引入了一個(gè)leader epoch來(lái)解決這個(gè)問(wèn)題,所謂的leader epoch實(shí)際上是一對(duì)值(epoch,offset),epoch代表leader的版本號(hào),從0開(kāi)始遞增,當(dāng)leader發(fā)生過(guò)變更,epoch就+1,而offset則是對(duì)應(yīng)這個(gè)epoch版本的leader寫入第一條消息的offset,比如
(0,0), (1,50) ,表示第一個(gè)leader從offset=0開(kāi)始寫消息,一共寫了50條。第二個(gè)leader版本號(hào)是1,從offset=50開(kāi)始寫,這個(gè)信息會(huì)持久化在對(duì)應(yīng)的分區(qū)的本地磁盤上,文件名是 /tmp/kafka-log/topic/leader-epoch-checkpoint 。
leader broker中會(huì)保存這樣一個(gè)緩存,并且定期寫入到checkpoint文件中
當(dāng)leader寫log時(shí)它會(huì)嘗試更新整個(gè)緩存: 如果這個(gè)leader首次寫消息,則會(huì)在緩存中增加一個(gè)條目;否則就不做更新。而每次副本重新成為leader時(shí)會(huì)查詢這部分緩存,獲取出對(duì)應(yīng)leader版本的offset

我們基于同樣的情況來(lái)分析,follower宕機(jī)并且恢復(fù)之后,有兩種情況,如果這個(gè)時(shí)候leader副本沒(méi)有掛,也就是意味著沒(méi)有發(fā)生leader選舉,那么follower恢復(fù)之后并不會(huì)去截?cái)嘧约旱娜罩?,而是先發(fā)送一個(gè)OffsetsForLeaderEpochRequest請(qǐng)求給到leader副本,leader副本收到請(qǐng)求之后返回當(dāng)前的LEO。
如果follower副本的leaderEpoch和leader副本的epoch相同, leader的leo只可能大于或者等于follower副本的leo值,所以這個(gè)時(shí)候不會(huì)發(fā)生截?cái)?br> 如果follower副本和leader副本的epoch值不同,那么leader副本會(huì)查找follower副本傳過(guò)來(lái)的epoch+1在本地文件中存儲(chǔ)的StartOffset返回給follower副本,也就是新leader副本的LEO。這樣也避免了數(shù)據(jù)丟失的問(wèn)題
如果leader副本宕機(jī)了重新選舉新的leader,那么原本的follower副本就會(huì)變成leader,意味著epoch從0變成1,使得原本follower副本中LEO的值的到了保留。

Leader副本的選舉過(guò)程

  1. KafkaController會(huì)監(jiān)聽(tīng)ZooKeeper的/brokers/ids節(jié)點(diǎn)路徑,一旦發(fā)現(xiàn)有broker掛了,執(zhí)行下面的邏輯。這里暫時(shí)先不考慮KafkaController所在broker掛了的情況,KafkaController掛了,各個(gè)broker會(huì)重新leader選舉出新的KafkaController
  2. leader副本在該broker上的分區(qū)就要重新進(jìn)行l(wèi)eader選舉,目前的選舉策略是
    a) 優(yōu)先從isr列表中選出第一個(gè)作為leader副本,這個(gè)叫優(yōu)先副本,理想情況下有限副本就是該分區(qū)的leader副本
    b) 如果isr列表為空,則查看該topic的unclean.leader.election.enable配置。
    unclean.leader.election.enable:為true則代表允許選用非isr列表的副本作為leader,那么此時(shí)就意味著數(shù)據(jù)可能丟失,為false的話,則表示不允許,直接拋出NoReplicaOnlineException異常,造成leader副本選舉失敗。
    c) 如果上述配置為true,則從其他副本中選出一個(gè)作為leader副本,并且isr列表只包含該leader副本。一旦選舉成功,則將選舉后的leader和isr和其他副本信息寫入到該分區(qū)的對(duì)應(yīng)的zk路徑上。

消息的存儲(chǔ)

消息發(fā)送端發(fā)送消息到broker上以后,消息是如何持久化的呢?那么接下來(lái)去分析下消息的存儲(chǔ)首先我們需要了解的是,kafka是使用日志文件的方式來(lái)保存生產(chǎn)者和發(fā)送者的消息,每條消息都有一個(gè)offset值來(lái)表示它在分區(qū)中的偏移量。Kafka中存儲(chǔ)的一般都是海量的消息數(shù)據(jù),為了避免日志文件過(guò)大,Log并不是直接對(duì)應(yīng)在一個(gè)磁盤上的日志文件,而是對(duì)應(yīng)磁盤上的一個(gè)目錄,這個(gè)目錄的命名規(guī)則是<topic_name>_<partition_id>

消息的文件存儲(chǔ)機(jī)制

一個(gè)topic的多個(gè)partition在物理磁盤上的保存路徑,路徑保存在 /tmp/kafka-logs/topic_partition,包含日志文件、索引文件和時(shí)間索引文件

image.png

kafka是通過(guò)分段的方式將Log分為多個(gè)LogSegment,LogSegment是一個(gè)邏輯上的概念,一個(gè)LogSegment對(duì)應(yīng)磁盤上的一個(gè)日志文件和一個(gè)索引文件,其中日志文件是用來(lái)記錄消息的。索引文件是用來(lái)保存消息的索引。那么這個(gè)LogSegment是什么呢?

LogSegment

假設(shè)kafka以partition為最小存儲(chǔ)單位,那么我們可以想象當(dāng)kafka producer不斷發(fā)送消息,必然會(huì)引起partition文件的無(wú)線擴(kuò)張,這樣對(duì)于消息文件的維護(hù)以及被消費(fèi)的消息的清理帶來(lái)非常大的挑戰(zhàn),所以kafka 以segment為單位又把partition進(jìn)行細(xì)分。每個(gè)partition相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等的segment數(shù)據(jù)文件中(每個(gè)segment文件中的消息不一定相等),這種特性方便已經(jīng)被消費(fèi)的消息的清理,提高磁盤的利用率。

  • log.segment.bytes=107370 (設(shè)置分段大小),默認(rèn)是1gb,我們把這個(gè)值調(diào)小以后,可以看到日志分段的效果
  • 抽取其中3個(gè)分段來(lái)進(jìn)行分析


    image.png

    segment file由2大部分組成,分別為index file和data file,此2個(gè)文件一一對(duì)應(yīng),成對(duì)出現(xiàn),后綴".index"和“.log”分別表示為segment索引文件、數(shù)據(jù)文件.
    segment文件命名規(guī)則:partion全局的第一個(gè)segment從0開(kāi)始,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset值進(jìn)行遞增。數(shù)值最大為64位long大小,20位數(shù)字字符長(zhǎng)度,沒(méi)有數(shù)字用0填充

查看segment文件命名規(guī)則

通過(guò)下面這條命令可以看到kafka消息日志的內(nèi)容

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.log --print-data-log

假如第一個(gè)log文件的最后一個(gè)offset為:5376,所以下一個(gè)segment的文件命名為:
00000000000000005376.log。對(duì)應(yīng)的index為00000000000000005376.index

segment中index和log的對(duì)應(yīng)關(guān)系

從所有分段中,找一個(gè)分段進(jìn)行分析
為了提高查找消息的性能,為每一個(gè)日志文件添加2個(gè)索引索引文件:OffsetIndex 和 TimeIndex,分別對(duì)應(yīng).index以及.timeindex, TimeIndex索引文件格式:它是映射時(shí)間戳和相對(duì)offset
查看索引內(nèi)容:

sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test-0/00000000000000000000.index --print-data-log
image.png

如圖所示,index中存儲(chǔ)了索引以及物理偏移量。 log存儲(chǔ)了消息的內(nèi)容。索引文件的元數(shù)據(jù)執(zhí)行對(duì)應(yīng)數(shù)據(jù)文件中message的物理偏移地址。舉個(gè)簡(jiǎn)單的案例來(lái)說(shuō),以[4053,80899]為例,在log文件中,對(duì)應(yīng)的是第4053條記錄,物理偏移量(position)為80899. position是ByteBuffer的指針位置

在partition中如何通過(guò)offset查找message

查找的算法是

  1. 根據(jù)offset的值,查找segment段中的index索引文件。由于索引文件命名是以上一個(gè)文件的最后一個(gè)offset進(jìn)行命名的,所以,使用二分查找算法能夠根據(jù)offset快速定位到指定的索引文件。
  2. 找到索引文件后,根據(jù)offset進(jìn)行定位,找到索引文件中的符合范圍的索引。(kafka采用稀疏索引的方式來(lái)提高查找性能)
  3. 得到position以后,再到對(duì)應(yīng)的log文件中,從position出開(kāi)始查找offset對(duì)應(yīng)的消息,將每條消息的offset與目標(biāo)offset進(jìn)行比較,直到找到消息

比如說(shuō),我們要查找offset=2490這條消息,那么先找到00000000000000000000.index, 然后找到[2487,49111]這個(gè)索引,再到log文件中,根據(jù)49111這個(gè)position開(kāi)始查找,比較每條消息的offset是否大于等于2490。最后查找到對(duì)應(yīng)的消息以后返回

Log文件的消息內(nèi)容分析

前面我們通過(guò)kafka提供的命令,可以查看二進(jìn)制的日志文件信息,一條消息,會(huì)包含很多的字段。

offset: 5371 position: 102124 CreateTime: 1531477349286 isvalid: true keysize:
-1 valuesize: 12 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1
sequence: -1 isTransactional: false headerKeys: [] payload: message_5371

offset和position這兩個(gè)前面已經(jīng)講過(guò)了、 createTime表示創(chuàng)建時(shí)間、keysize和valuesize表示key和value的大小、 compresscodec表示壓縮編碼、payload:表示消息的具體內(nèi)容

日志的清除策略以及壓縮策略

日志清除策略

前面提到過(guò),日志的分段存儲(chǔ),一方面能夠減少單個(gè)文件內(nèi)容的大小,另一方面,方便kafka進(jìn)行日志清理。日志的清理策略有兩個(gè):

  1. 根據(jù)消息的保留時(shí)間,當(dāng)消息在kafka中保存的時(shí)間超過(guò)了指定的時(shí)間,就會(huì)觸發(fā)清理過(guò)程
  2. 根據(jù)topic存儲(chǔ)的數(shù)據(jù)大小,當(dāng)topic所占的日志文件大小大于一定的閥值,則可以開(kāi)始刪除最舊的消息。kafka會(huì)啟動(dòng)一個(gè)后臺(tái)線程,定期檢查是否存在可以刪除的消息

通過(guò)log.retention.bytes和log.retention.hours這兩個(gè)參數(shù)來(lái)設(shè)置,當(dāng)其中任意一個(gè)達(dá)到要求,都會(huì)執(zhí)行刪除。
默認(rèn)的保留時(shí)間是:7天

日志壓縮策略

Kafka還提供了“日志壓縮(Log Compaction)”功能,通過(guò)這個(gè)功能可以有效的減少日志文件的大小,緩解磁盤緊張的情況,在很多實(shí)際場(chǎng)景中,消息的key和value的值之間的對(duì)應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫(kù)中的數(shù)據(jù)會(huì)不斷被修改一樣,消費(fèi)者只關(guān)心key對(duì)應(yīng)的最新的value。因此,我們可以開(kāi)啟kafka的日志壓縮功能,服務(wù)端會(huì)在后臺(tái)啟動(dòng)啟動(dòng)Cleaner線程池,定期將相同的key進(jìn)行合并,只保留最新的value值。日志的壓縮原理是


image.png

磁盤存儲(chǔ)的性能問(wèn)題

磁盤存儲(chǔ)的性能優(yōu)化

我們現(xiàn)在大部分企業(yè)仍然用的是機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對(duì)應(yīng)的柱面、磁頭以及對(duì)應(yīng)的扇區(qū);這個(gè)過(guò)程相對(duì)內(nèi)存來(lái)說(shuō)會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來(lái)的時(shí)間消耗,kafka采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù)。即使是這樣,但是頻繁的I/O操作仍然會(huì)造成磁盤的性能瓶頸

零拷貝

消息從發(fā)送到落地保存,broker維護(hù)的消息日志本身就是文件目錄,每個(gè)文件都是二進(jìn)制保存,生產(chǎn)者和消費(fèi)者使用相同的格式來(lái)處理。在消費(fèi)者獲取消息時(shí),服務(wù)器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動(dòng)的通過(guò)socket發(fā)送給消費(fèi)者。雖然這個(gè)操作描述起來(lái)很簡(jiǎn)單,但實(shí)際上經(jīng)歷了很多步驟。

操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁(yè)緩存:
? 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
? 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到socket緩存中
? 操作系統(tǒng)將數(shù)據(jù)從socket緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出


image.png

通過(guò)“零拷貝”技術(shù),可以去掉這些沒(méi)必要的數(shù)據(jù)復(fù)制操作,同時(shí)也會(huì)減少上下文切換次數(shù)?,F(xiàn)代的unix操作系統(tǒng)提供一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁(yè)緩存?zhèn)鬏數(shù)絪ocket;在Linux中,是通過(guò)sendfile系統(tǒng)調(diào)用來(lái)完成的。Java提供了訪問(wèn)這個(gè)系統(tǒng)調(diào)用的方法:FileChannel.transferTo API
使用sendfile,只需要一次拷貝就行,允許操作系統(tǒng)將數(shù)據(jù)直接從頁(yè)緩存發(fā)送到網(wǎng)絡(luò)上。所以在這個(gè)優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的


image.png

頁(yè)緩存

頁(yè)緩存是操作系統(tǒng)實(shí)現(xiàn)的一種主要的磁盤緩存,但凡設(shè)計(jì)到緩存的,基本都是為了提升i/o性能,所以頁(yè)緩存是用來(lái)減少磁盤I/O操作的。
磁盤高速緩存有兩個(gè)重要因素:
第一,訪問(wèn)磁盤的速度要遠(yuǎn)低于訪問(wèn)內(nèi)存的速度,若從處理器L1和L2高速緩存訪問(wèn)則速度更快。
第二,數(shù)據(jù)一旦被訪問(wèn),就很有可能短時(shí)間內(nèi)再次訪問(wèn)。正是由于基于訪問(wèn)內(nèi)存比磁盤快的多,所以磁盤的內(nèi)存緩存將給系統(tǒng)存儲(chǔ)性能帶來(lái)質(zhì)的飛越。

當(dāng) 一 個(gè)進(jìn)程準(zhǔn)備讀取磁盤上的文件內(nèi)容時(shí), 操作系統(tǒng)會(huì)先查看待讀取的數(shù)據(jù)所在的頁(yè)(page)是否在頁(yè)緩存(pagecache)中,如果存在(命中)則直接返回?cái)?shù)據(jù), 從而避免了對(duì)物理磁盤的I/0操作;如果沒(méi)有命中, 則操作系統(tǒng)會(huì)向磁盤發(fā)起讀取請(qǐng)求并將讀取的數(shù)據(jù)頁(yè)存入頁(yè)緩存, 之后再將數(shù)據(jù)返回給進(jìn)程。
同樣,如果 一 個(gè)進(jìn)程需要將數(shù)據(jù)寫入磁盤, 那么操作系統(tǒng)也會(huì)檢測(cè)數(shù)據(jù)對(duì)應(yīng)的頁(yè)是否在頁(yè)緩存中,如果不存在, 則會(huì)先在頁(yè)緩存中添加相應(yīng)的頁(yè), 最后將數(shù)據(jù)寫入對(duì)應(yīng)的頁(yè)。 被修改過(guò)后的頁(yè)也就變成了臟頁(yè), 操作系統(tǒng)會(huì)在合適的時(shí)間把臟頁(yè)中的數(shù)據(jù)寫入磁盤, 以保持?jǐn)?shù)據(jù)的 一 致性
Kafka中大量使用了頁(yè)緩存, 這是Kafka實(shí)現(xiàn)高吞吐的重要因素之 一 。 雖然消息都是先被寫入頁(yè)緩存,然后由操作系統(tǒng)負(fù)責(zé)具體的刷盤任務(wù)的, 但在Kafka中同樣提供了同步刷盤及間斷性強(qiáng)制刷盤(fsync),可以通過(guò) log.flush.interval.messages 和 log.flush.interval.ms 參數(shù)來(lái)控制。
同步刷盤能夠保證消息的可靠性,避免因?yàn)殄礄C(jī)導(dǎo)致頁(yè)緩存數(shù)據(jù)還未完成同步時(shí)造成的數(shù)據(jù)丟失。但是實(shí)際使用上,我們沒(méi)必要去考慮這樣的因素以及這種問(wèn)題帶來(lái)的損失,消息可靠性可以由多副本來(lái)解決,同步刷盤會(huì)帶來(lái)性能的影響。 刷盤的操作由操作系統(tǒng)去完成即可

Kafka消息的可靠性

沒(méi)有一個(gè)中間件能夠做到百分之百的完全可靠,可靠性更多的還是基于幾個(gè)9的衡量指標(biāo),比如4個(gè)9、5個(gè)9. 軟件系統(tǒng)的可靠性只能夠無(wú)限去接近100%,但不可能達(dá)到100%。所以kafka如何是實(shí)現(xiàn)最大可能的可靠性呢?

  • 分區(qū)副本, 你可以創(chuàng)建更多的分區(qū)來(lái)提升可靠性,但是分區(qū)數(shù)過(guò)多也會(huì)帶來(lái)性能上的開(kāi)銷,一般來(lái)說(shuō),3個(gè)副本就能滿足對(duì)大部分場(chǎng)景的可靠性要求

  • acks,生產(chǎn)者發(fā)送消息的可靠性,也就是我要保證我這個(gè)消息一定是到了broker并且完成了多副本的持久化,但這種要求也同樣會(huì)帶來(lái)性能上的開(kāi)銷。它有幾個(gè)可選項(xiàng):
    1 ,生產(chǎn)者把消息發(fā)送到leader副本,leader副本在成功寫入到本地日志之后就告訴生產(chǎn)者消息提交成功,但是如果isr集合中的follower副本還沒(méi)來(lái)得及同步leader副本的消息,leader掛了,就會(huì)造成消息丟失。
    -1 ,消息不僅僅寫入到leader副本,并且被ISR集合中所有副本同步完成之后才告訴生產(chǎn)者已經(jīng)提交成功,這個(gè)時(shí)候即使leader副本掛了也不會(huì)造成數(shù)據(jù)丟失。
    0:表示producer不需要等待broker的消息確認(rèn)。這個(gè)選項(xiàng)時(shí)延最小但同時(shí)風(fēng)險(xiǎn)最大(因?yàn)楫?dāng)server宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)。

  • 保障消息到了broker之后,消費(fèi)者也需要有一定的保證,因?yàn)橄M(fèi)者也可能出現(xiàn)某些問(wèn)題導(dǎo)致消息沒(méi)有消費(fèi)到。

  • enable.auto.commit默認(rèn)為true,也就是自動(dòng)提交offset,自動(dòng)提交是批量執(zhí)行的,有一個(gè)時(shí)間窗口,這種方式會(huì)帶來(lái)重復(fù)提交或者消息丟失的問(wèn)題,所以對(duì)于高可靠性要求的程序,要使用手動(dòng)提交。 對(duì)于高可靠要求的應(yīng)用來(lái)說(shuō),寧愿重復(fù)消費(fèi)也不應(yīng)該因?yàn)橄M(fèi)異常而導(dǎo)致消息丟失

    ——學(xué)自咕泡學(xué)院

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容