kafka學(xué)習(xí)筆記

版本選擇

  • 0.7版本:
    只有基礎(chǔ)消息隊(duì)列功能,無(wú)副本;打死也不使用
  • 0.8版本:
    增加了副本機(jī)制,新的producer API;建議使用0.8.2.2版本;不建議使用0.8.2.0之后的producer API
  • 0.9版本:
    增加權(quán)限和認(rèn)證,新的consumer API,Kafka Connect功能;不建議使用consumer API;
  • 0.10版本:
    引入Kafka Streams功能,bug修復(fù);建議版本0.10.2.2;建議使用新版consumer API
  • 0.11版本:
    producer API冪等,事物API,消息格式重構(gòu);建議版本0.11.0.3;謹(jǐn)慎對(duì)待消息格式變化
  • 1.0和2.0版本:
    Kafka Streams改進(jìn);建議版本2.0;

部署需要注意

image.png

重要參數(shù)

  • Broker 端參數(shù)
  1. log.dirs:這是非常重要的參數(shù),指定了 Broker 需要使用的若干個(gè)文件目錄路徑。要知道這個(gè)參數(shù)是沒(méi)有默認(rèn)值的,這說(shuō)明什么?這說(shuō)明它必須由你親自指定。log.dir:注意這是 dir,結(jié)尾沒(méi)有 s,說(shuō)明它只能表示單個(gè)路徑,它是補(bǔ)充上一個(gè)參數(shù)用的。

提升讀寫(xiě)性能:比起單塊磁盤(pán),多塊物理磁盤(pán)同時(shí)讀寫(xiě)數(shù)據(jù)有更高的吞吐量

  • Zookeeper
    zookeeper.connect:zk1:2181,zk2:2181,zk3:2181/kafka1

  • listeners:學(xué)名叫監(jiān)聽(tīng)器
    其實(shí)就是告訴外部連接者要通過(guò)什么協(xié)議訪問(wèn)指定主機(jī)名和端口開(kāi)放的 Kafka 服務(wù)。advertised.listeners:和 listeners 相比多了個(gè) advertised。Advertised 的含義表示宣稱的、公布的,就是說(shuō)這組監(jiān)聽(tīng)器是 Broker 用于對(duì)外發(fā)布的。host.name/port:列出這兩個(gè)參數(shù)就是想說(shuō)你把它們忘掉吧,壓根不要為它們指定值,畢竟都是過(guò)期的參數(shù)了。

最好全部使用主機(jī)名,即 Broker 端和 Client 端應(yīng)用配置中全部填寫(xiě)主機(jī)名

  • topic:
  1. auto.create.topics.enable:是否允許自動(dòng)創(chuàng)建 Topic。推薦設(shè)置為false
    規(guī)避線上自動(dòng)創(chuàng)建topic問(wèn)題
  2. unclean.leader.election.enable:是否允許 Unclean Leader 選舉。推薦為false
    規(guī)避落后的副本自動(dòng)選為leader,導(dǎo)致數(shù)據(jù)丟失.
  3. auto.leader.rebalance.enable:是否允許定期進(jìn)行 Leader 選舉。推薦false
    規(guī)避自動(dòng)切換leader造成不必要的性能開(kāi)銷
  • 消息保存三兄弟
  1. log.retention.{hours|minutes|ms}:這是個(gè)“三兄弟”,都是控制一條消息數(shù)據(jù)被保存多長(zhǎng)時(shí)間。從優(yōu)先級(jí)上來(lái)說(shuō) ms 設(shè)置最高、minutes 次之、hours 最低
  2. log.retention.bytes:這是指定 Broker 為消息保存的總磁盤(pán)容量大小。
  3. message.max.bytes:控制 Broker 能夠接收的最大消息大小。
  • 創(chuàng)建topic時(shí)帶參數(shù)
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1 --config retention.ms=15552000000 --config max.message.bytes=5242880

  • 修改topic參數(shù)
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name transaction --alter --add-config max.message.bytes=10485760

Kafka jvm設(shè)置推薦參數(shù)

  • KAFKA_HEAP_OPTS:指定堆大小。
  • KAFKA_JVM_PERFORMANCE_OPTS:指定 GC 參數(shù)。

kafka 零拷貝

  • 數(shù)據(jù)在磁盤(pán)和網(wǎng)絡(luò)進(jìn)行傳輸時(shí)避免昂貴的內(nèi)核態(tài)數(shù)據(jù)拷貝,從而實(shí)現(xiàn)快速的數(shù)據(jù)傳輸

分區(qū)策略

  • 輪詢策略
    生產(chǎn)者程序會(huì)按照輪詢的方式在主題的所有分區(qū)間均勻地“碼放”消息。
    輪詢策略有非常優(yōu)秀的負(fù)載均衡表現(xiàn),它總是能保證消息最大限度地被平均分配到所有分區(qū)上,故默認(rèn)情況下它是最合理的分區(qū)策略,也是我們最常用的分區(qū)策略之一
  • 隨機(jī)策略
    我們隨意地將消息放置到任意一個(gè)分區(qū)上
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
  • 按消息鍵保序策略[官方?jīng)]有該策略]
    Kafka 允許為每條消息定義消息鍵,簡(jiǎn)稱為 Key。這個(gè) Key 的作用非常大,它可以是一個(gè)有著明確業(yè)務(wù)含義的字符串,比如客戶代碼、部門(mén)編號(hào)或是業(yè)務(wù) ID 等;也可以用來(lái)表征消息元數(shù)據(jù)。特別是在 Kafka 不支持時(shí)間戳的年代,在一些場(chǎng)景中,工程師們都是直接將消息創(chuàng)建時(shí)間封裝進(jìn) Key 里面的。一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

kafka生產(chǎn)者優(yōu)化

  • 開(kāi)啟壓縮

 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 // 開(kāi)啟GZIP壓縮
 props.put("compression.type", "gzip");
  Producer<String, String> producer = new KafkaProducer<>(props);
  • 壓縮參數(shù)


    image.png

結(jié)論:

  1. 吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在壓縮比方面,zstd > LZ4 > GZIP > Snappy。
  2. 壓縮比方面: zstd > LZ4 > GZIP > snappy

kafka消息不丟失保證策略

  1. 生產(chǎn)者 選擇合適的api
    Producer 永遠(yuǎn)要使用帶有回調(diào)通知的發(fā)送 API,也就是說(shuō)不要使用 producer.send(msg),而要使用 producer.send(msg, callback)
  2. 消費(fèi)者維護(hù)好偏移量
    維持先消費(fèi)消息(閱讀),再更新位移(書(shū)簽)的順序
    如果是多線程異步處理消費(fèi)消息,Consumer 程序不要開(kāi)啟自動(dòng)提交位移,而是要應(yīng)用程序手動(dòng)提交位移
    3.最佳實(shí)踐
  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。記住,一定要使用帶有回調(diào)通知的 send 方法。
  • 設(shè)置 acks = all。acks 是 Producer 的一個(gè)參數(shù),代表了你對(duì)“已提交”消息的定義。如果設(shè)置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級(jí)的“已提交”定義。
  • 設(shè)置 retries 為一個(gè)較大的值。這里的 retries 同樣是 Producer 的參數(shù),對(duì)應(yīng)前面提到的 Producer 自動(dòng)重試。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)配置了 retries > 0 的 Producer 能夠自動(dòng)重試消息發(fā)送,避免消息丟失。
  • 設(shè)置 unclean.leader.election.enable = false。這是 Broker 端的參數(shù),它控制的是哪些 Broker 有資格競(jìng)選分區(qū)的 Leader。如果一個(gè) Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會(huì)造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。
  • 設(shè)置 replication.factor >= 3。這也是 Broker 端的參數(shù)。其實(shí)這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機(jī)制就是冗余。
  • 設(shè)置 min.insync.replicas > 1。這依然是 Broker 端參數(shù),控制的是消息至少要被寫(xiě)入到多少個(gè)副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實(shí)際環(huán)境中千萬(wàn)不要使用默認(rèn)值 1。
  • 確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)就無(wú)法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。
  • 推薦設(shè)置成 replication.factor = min.insync.replicas + 1。確保消息消費(fèi)完成再提交。
  • Consumer 端有個(gè)參數(shù) enable.auto.commit,最好把它設(shè)置成 false,并采用手動(dòng)提交位移的方式。就像前面說(shuō)的,這對(duì)于單 Consumer 多線程處理的場(chǎng)景而言是至關(guān)重要的。
image.png

kafka中的攔截器

  • 參數(shù)設(shè)置
Properties props = new Properties();
List<String> interceptors = new ArrayList<>();
interceptors.add("com.yourcompany.kafkaproject.interceptors.AddTimestampInterceptor"); // 攔截器1
interceptors.add("com.yourcompany.kafkaproject.interceptors.UpdateCounterInterceptor"); // 攔截器2
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

  • 生產(chǎn)者
public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> {
    private Jedis jedis; // 省略Jedis初始化
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        jedis.incr("totalSentMessage");
        return record;
    }
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    }
    @Override
    public void close() {
    }
    @Override
    public void configure(Map<java.lang.String, ?> configs) {
    }
  • 消費(fèi)者
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> {
    private Jedis jedis; //省略Jedis初始化


    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        long lantency = 0L;
        for (ConsumerRecord<String, String> record : records) {
            lantency += (System.currentTimeMillis() - record.timestamp());
        }
        jedis.incrBy("totalLatency", lantency);
        long totalLatency = Long.parseLong(jedis.get("totalLatency"));
        long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
        jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        return records;
    }
    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }
    @Override
    public void configure(Map<String, ?> configs) {

java生產(chǎn)者是如何管理tcp連接的?

Apache Kafka的所有通信都是基于TCP的,而不是于HTTP或其他協(xié)議的
1 為什采用TCP?
(1)TCP擁有一些高級(jí)功能,如多路復(fù)用請(qǐng)求和同時(shí)輪詢多個(gè)連接的能力。
(2)很多編程語(yǔ)言的HTTP庫(kù)功能相對(duì)的比較簡(jiǎn)陋。
名詞解釋:
多路復(fù)用請(qǐng)求:multiplexing request,是將兩個(gè)或多個(gè)數(shù)據(jù)合并到底層—物理連接中的過(guò)程。TCP的多路復(fù)用請(qǐng)求會(huì)在一條物理連接上創(chuàng)建若干個(gè)虛擬連接,每個(gè)虛擬連接負(fù)責(zé)流轉(zhuǎn)各自對(duì)應(yīng)的數(shù)據(jù)流。嚴(yán)格講:TCP并不能多路復(fù)用,只是提供可靠的消息交付語(yǔ)義保證,如自動(dòng)重傳丟失的報(bào)文。

2 何時(shí)創(chuàng)建TCP連接?
(1)在創(chuàng)建KafkaProducer實(shí)例時(shí),
A:生產(chǎn)者應(yīng)用會(huì)在后臺(tái)創(chuàng)建并啟動(dòng)一個(gè)名為Sender的線程,該Sender線程開(kāi)始運(yùn)行時(shí),首先會(huì)創(chuàng)建與Broker的連接。
B:此時(shí)不知道要連接哪個(gè)Broker,kafka會(huì)通過(guò)METADATA請(qǐng)求獲取集群的元數(shù)據(jù),連接所有的Broker。
(2)還可能在更新元數(shù)據(jù)后,或在消息發(fā)送時(shí)
3 何時(shí)關(guān)閉TCP連接
(1)Producer端關(guān)閉TCP連接的方式有兩種:用戶主動(dòng)關(guān)閉,或kafka自動(dòng)關(guān)閉。
A:用戶主動(dòng)關(guān)閉,通過(guò)調(diào)用producer.close()方關(guān)閉,也包括kill -9暴力關(guān)閉。
B:Kafka自動(dòng)關(guān)閉,這與Producer端參數(shù)connection.max.idles.ms的值有關(guān),默認(rèn)為9分鐘,9分鐘內(nèi)沒(méi)有任何請(qǐng)求流過(guò),就會(huì)被自動(dòng)關(guān)閉。這個(gè)參數(shù)可以調(diào)整。
C:第二種方式中,TCP連接是在Broker端被關(guān)閉的,但這個(gè)連接請(qǐng)求是客戶端發(fā)起的,對(duì)TCP而言這是被動(dòng)的關(guān)閉,被動(dòng)關(guān)閉會(huì)產(chǎn)生大量的CLOSE_WAIT連接。

代碼實(shí)現(xiàn)冪等性

props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。

代碼開(kāi)啟生產(chǎn)者事務(wù)

開(kāi)啟 enable.idempotence = true。
設(shè)置 Producer 端參數(shù) transactional. id。最好為其設(shè)置一個(gè)有意義的名字

producer.initTransactions();
try {
            producer.beginTransaction();
            producer.send(record1);
            producer.send(record2);
            producer.commitTransaction();
} catch (KafkaException e) {
            producer.abortTransaction();
}

消費(fèi)組平衡問(wèn)題

  1. 組成員數(shù)量發(fā)生變化。
  2. 訂閱主題數(shù)量發(fā)生變化。
  3. 訂閱主題的分區(qū)數(shù)發(fā)生變化。
image.png

kafka位移

Committing Offsets:Consumer 需要向 Kafka 匯報(bào)自己的位移數(shù)據(jù),這個(gè)匯報(bào)過(guò)程被稱為提交位移

  • 自動(dòng)提交位移
Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
    // 開(kāi)啟自動(dòng)提交位移 
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "2000");
     
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
  • 手動(dòng)提交

   try {
           while(true) {
                ConsumerRecords<String, String> records =     consumer.poll(Duration.ofSeconds(1));                    
                process(records); // 處理消息
                commitAysnc(); // 使用異步提交規(guī)避阻塞
            }
} catch(Exception e) {
            handle(e); // 處理異常
} finally {
            try {
               consumer.commitSync(); // 最后一次提交使用同步阻塞式提交
  } finally {
       consumer.close();
}
}
  • 精細(xì)提交位移
private Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
int count = 0;
……
while (true) {
            ConsumerRecords<String, String> records = 
  consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record: records) {
                        process(record);  // 處理消息
                        offsets.put(new TopicPartition(record.topic(), record.partition()),
                                   new OffsetAndMetadata(record.offset() + 1);
                       if(count % 100 == 0)
                                    consumer.commitAsync(offsets, null); // 回調(diào)處理邏輯是null
                        count++;
  }
}

對(duì)于一次要處理很多消息的 Consumer 而言,它會(huì)關(guān)心社區(qū)有沒(méi)有方法允許它在消費(fèi)的中間進(jìn)行位移提交。比如前面這個(gè) 5000 條消息的例子,你可能希望每處理完 100 條消息就提交一次位移,這樣能夠避免大批量的消息重新消費(fèi)。

image.png

CommitFailedException異常怎么處理?

  • 場(chǎng)景一
    當(dāng)消息處理的總時(shí)間超過(guò)預(yù)設(shè)的 max.poll.interval.ms 參數(shù)值時(shí),Kafka Consumer 端會(huì)拋出 CommitFailedException 異常
    四種處理方式
  1. 縮短單條消息處理的時(shí)間
  2. 增加 Consumer 端允許下游系統(tǒng)消費(fèi)一批消息的最大時(shí)長(zhǎng)
  3. 減少下游系統(tǒng)一次性消費(fèi)的消息總數(shù)
  4. 下游系統(tǒng)使用多線程來(lái)加速消費(fèi)


    image.png
  • 場(chǎng)景二
    Kafka Java Consumer 端還提供了一個(gè)名為 Standalone Consumer 的獨(dú)立消費(fèi)者。它沒(méi)有消費(fèi)者組的概念,每個(gè)消費(fèi)者實(shí)例都是獨(dú)立工作的,彼此之間毫無(wú)聯(lián)系。不過(guò),你需要注意的是,獨(dú)立消費(fèi)者的位移提交機(jī)制和消費(fèi)者組是一樣的,因此獨(dú)立消費(fèi)者的位移提交也必須遵守之前說(shuō)的那些規(guī)定,比如獨(dú)立消費(fèi)者也要指定 group.id 參數(shù)才能提交位移.
image.png

多線程開(kāi)發(fā)消費(fèi)者實(shí)例

    1. 消費(fèi)者程序啟動(dòng)多個(gè)線程,每個(gè)線程維護(hù)專屬的 KafkaConsumer 實(shí)例,負(fù)責(zé)完整的消息獲取、消息處理流程
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
      ConsumerRecords records = 
        consumer.poll(Duration.ofMillis(10000));
                 //  執(zhí)行消息處理邏輯
             }
         } catch (WakeupException e) {
             // Ignore exception if closing
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
     // Shutdown hook which can be called from a separate thread
     public void shutdown() {
         closed.set(true);
         consumer.wakeup();
     }
  • 2.消費(fèi)者程序使用單或多線程獲取消息,同時(shí)創(chuàng)建多個(gè)消費(fèi)線程執(zhí)行消息處理邏

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...


private int workerNum = ...;
executors = new ThreadPoolExecutor(
  workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
  new ArrayBlockingQueue<>(1000), 
  new ThreadPoolExecutor.CallerRunsPolicy());


...
while (true)  {
  ConsumerRecords<String, String> records = 
    consumer.poll(Duration.ofSeconds(1));
  for (final ConsumerRecord record : records) {
    executors.submit(new Worker(record));
  }
}
..
image.png

kafka副本機(jī)制

image.png

kafka如何控制請(qǐng)求?

https://www.processon.com/view/link/5d481e6be4b07c4cf3031755

消費(fèi)組重平衡

  • heartbeat.interval.ms
    這個(gè)參數(shù)的真正作用是控制重平衡通知的頻率。如果你想要消費(fèi)者實(shí)例更迅速地得到通知,那么就可以給這個(gè)參數(shù)設(shè)置一個(gè)非常小的值,這樣消費(fèi)者就能更快地感知到重平衡已經(jīng)開(kāi)啟了

費(fèi)者組狀態(tài)機(jī)

image.png

image.png

Broker 端重平衡場(chǎng)景剖析

場(chǎng)景一:新成員入組。


image.png

場(chǎng)景二:組成員主動(dòng)離組


image.png

場(chǎng)景四:重平衡時(shí)協(xié)調(diào)者對(duì)組內(nèi)成員提交位移的處理。

image.png

控制器

1作用:
控制器組件(Controller),是Apache Kafka的核心組件。它的主要作用是Apache Zookeeper的幫助下管理和協(xié)調(diào)整個(gè)Kafka集群。
集群中任意一臺(tái)Broker都能充當(dāng)控制器的角色,但在運(yùn)行過(guò)程中,只能有一個(gè)Broker成為控制器。

2 特點(diǎn):控制器是重度依賴Zookeeper。
3 產(chǎn)生:
控制器是被選出來(lái)的,Broker在啟動(dòng)時(shí),會(huì)嘗試去Zookeeper中創(chuàng)建/controller節(jié)點(diǎn)。Kafka當(dāng)前選舉控制器的規(guī)則是:第一個(gè)成功創(chuàng)建/controller節(jié)點(diǎn)的Broker會(huì)被指定為控制器。

4 功能:
A :主題管理(創(chuàng)建,刪除,增加分區(qū))
當(dāng)執(zhí)行kafka-topics腳本時(shí),大部分的后臺(tái)工作都是控制器來(lái)完成的。
B :分區(qū)重分配
Kafka-reassign-partitions腳本提供的對(duì)已有主題分區(qū)進(jìn)行細(xì)粒度的分配功能。
C :Preferred領(lǐng)導(dǎo)者選舉
Preferred領(lǐng)導(dǎo)者選舉主要是Kafka為了避免部分Broker負(fù)載過(guò)重而提供的一種換Leade的方案。
D :集群成員管理(新增Broker,Broker主動(dòng)關(guān)閉,Broker宕機(jī))
控制器組件會(huì)利用watch機(jī)制檢查Zookeeper的/brokers/ids節(jié)點(diǎn)下的子節(jié)點(diǎn)數(shù)量變更。當(dāng)有新Broker啟動(dòng)后,它會(huì)在/brokers下創(chuàng)建專屬的znode節(jié)點(diǎn)。一旦創(chuàng)建完畢,Zookeeper會(huì)通過(guò)Watch機(jī)制將消息通知推送給控制器,這樣,控制器就能自動(dòng)地感知到這個(gè)變化。進(jìn)而開(kāi)啟后續(xù)新增Broker作業(yè)。
偵測(cè)Broker存活性則是依賴于剛剛提到的另一個(gè)機(jī)制:臨時(shí)節(jié)點(diǎn)。每個(gè)Broker啟動(dòng)后,會(huì)在/brokers/ids下創(chuàng)建一個(gè)臨時(shí)的znode。當(dāng)Broker宕機(jī)或主機(jī)關(guān)閉后,該Broker與Zookeeper的會(huì)話結(jié)束,這個(gè)znode會(huì)被自動(dòng)刪除。同理,Zookeeper的Watch機(jī)制將這一變更推送給控制器,這樣控制器就能知道有Broker關(guān)閉或宕機(jī)了,從而進(jìn)行善后。

E :數(shù)據(jù)服務(wù)
控制器上保存了最全的集群元數(shù)據(jù)信息,其他所有Broker會(huì)定期接收控制器發(fā)來(lái)的元數(shù)據(jù)更新請(qǐng)求,從而更新其內(nèi)存中的緩存數(shù)據(jù)。

5 控制器保存的數(shù)據(jù)

控制器中保存的這些數(shù)據(jù)在Zookeeper中也保存了一份。每當(dāng)控制器初始化時(shí),它都會(huì)從Zookeeper上讀取對(duì)應(yīng)的元數(shù)據(jù)并填充到自己的緩存中。

6 控制器故障轉(zhuǎn)移(Failover)
故障轉(zhuǎn)移是指:當(dāng)運(yùn)行中的控制器突然宕機(jī)或意外終止時(shí),Kafka能夠快速地感知到,并立即啟用備用控制器來(lái)替代之前失敗的控制器。

7 內(nèi)部設(shè)計(jì)原理
A :控制器的內(nèi)部設(shè)計(jì)相當(dāng)復(fù)雜
控制器是多線程的設(shè)計(jì),會(huì)在內(nèi)部創(chuàng)建很多線程。如:
(1)為每個(gè)Broker創(chuàng)建一個(gè)對(duì)應(yīng)的Socket連接,然后在創(chuàng)建一個(gè)專屬的線程,用于向這些Broker發(fā)送特定的請(qǐng)求。
(2)控制連接zookeeper,也會(huì)創(chuàng)建單獨(dú)的線程來(lái)處理Watch機(jī)制通知回調(diào)。
(3)控制器還會(huì)為主題刪除創(chuàng)建額外的I/O線程。
這些線程還會(huì)訪問(wèn)共享的控制器緩存數(shù)據(jù),為了維護(hù)數(shù)據(jù)安全性,控制在代碼中大量使用ReetrantLock同步機(jī)制,進(jìn)一步拖慢了整個(gè)控制器的處理速度。

B :在0.11版對(duì)控制器的低沉設(shè)計(jì)進(jìn)了重構(gòu)。

(1)最大的改進(jìn)是:把多線程的方案改成了單線程加事件對(duì)列的方案。

a. 單線程+隊(duì)列的實(shí)現(xiàn)方式:社區(qū)引入了一個(gè)事件處理線程,統(tǒng)一處理各種控制器事件,然后控制器將原來(lái)執(zhí)行的操作全部建模成一個(gè)個(gè)獨(dú)立的事件,發(fā)送到專屬的事件隊(duì)列中,供此線程消費(fèi)。
b. 單線程不代表之前提到的所有線程都被干掉了,控制器只是把緩存狀態(tài)變更方面的工作委托給了這個(gè)線程而已。
(2)第二個(gè)改進(jìn):將之前同步操作Zookeeper全部改為異步操作。
a. Zookeeper本身的API提供了同步寫(xiě)和異步寫(xiě)兩種方式。同步操作zk,在有大量主題分區(qū)發(fā)生變更時(shí),Zookeeper容易成為系統(tǒng)的瓶頸。

高水位的討論

Leader 副本保持同步條件

  1. 該遠(yuǎn)程 Follower 副本在 ISR 中。
  2. 該遠(yuǎn)程 Follower 副本 LEO 值落后于 Leader 副本 LEO 值的時(shí)間,不超過(guò) Broker 端參數(shù) replica.lag.time.max.ms 的值。如果使用默認(rèn)值的話,就是不超過(guò) 10 秒。
image.png

kafka調(diào)優(yōu)

1.操作系統(tǒng)調(diào)優(yōu)
系統(tǒng)時(shí)禁掉 atime 更新
至少選擇 ext4 或 XFS
ulimit -n 和 vm.max_map_count
操作系統(tǒng)頁(yè)緩存

2.JVM 層調(diào)優(yōu)
設(shè)置堆大小 6-8G
GC 收集器 G1

  1. Broker 端調(diào)優(yōu)
    Producer -> Broker -> Consumer三端kafka版本要保持一致

4.應(yīng)用層調(diào)優(yōu)
不要頻繁地創(chuàng)建 Producer 和 Consumer 對(duì)象實(shí)例
用完及時(shí)關(guān)閉
合理利用多線程來(lái)改善性能

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容