Apache Kafka詳解

一、消息隊(duì)列Message Queue

image.png

兩種模式

  1. 點(diǎn)對(duì)點(diǎn)模式
    點(diǎn)對(duì)點(diǎn)模式是一個(gè)基于拉取或輪詢的消息傳送模型,由消費(fèi)者主動(dòng)拉取數(shù)據(jù),客戶端需要實(shí)時(shí)開啟一個(gè)線程監(jiān)控隊(duì)列中是否有數(shù)據(jù)。
  2. 發(fā)布/訂閱模式
    發(fā)布/訂閱模式是一個(gè)基于推送的消息傳送模型,由MQ主動(dòng)推送消息給所有訂閱者,即使當(dāng)前訂閱者不可用。

優(yōu)點(diǎn)

  • 解耦
    允許你獨(dú)立的擴(kuò)展或修改兩邊的處理過程,只要確保它們遵循同樣的接口約束。
  • 冗余
    MQ把數(shù)據(jù)進(jìn)行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風(fēng)險(xiǎn)。
  • 擴(kuò)展
    因?yàn)镸Q解耦了你的處理過程,所以增大消息入隊(duì)和處理的頻率是很容易的,只要另外增加處理過程即可,提高了靈活性和峰值處理能力。
  • 可恢復(fù)性
    消息隊(duì)列降低了進(jìn)程間的耦合度,所以即使一個(gè)處理消息的進(jìn)程掛掉,加入隊(duì)列中的消息仍然可以在系統(tǒng)恢復(fù)后被處理。
  • 順序保證
    隊(duì)列本身就遵循FIFO的原則,保證了數(shù)據(jù)的處理順序。
  • 緩沖
    有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)者和消費(fèi)者處理速度不一致的問題。
  • 異步通信
    MQ提供了異步處理機(jī)制,允許用戶把消息放入隊(duì)列但不立即處理它,直到需要時(shí)再去處理。

二、Kafka簡(jiǎn)介

  1. Apache Kafka是一個(gè)開源消息系統(tǒng),由Scala寫成。是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源消息系統(tǒng)項(xiàng)目。
  2. Kafka最初是由LinkedIn公司開發(fā),并于2011年初開源。2012年10月從Apache Incubator畢業(yè)。該項(xiàng)目的目標(biāo)是為處理實(shí)時(shí)數(shù)據(jù)提供一個(gè)統(tǒng)一、高通量、低等待的平臺(tái)。
  3. Kafka是一個(gè)Distributed Message Queue。Kafka對(duì)消息保存時(shí)根據(jù)Topic進(jìn)行歸類,發(fā)送消息者稱為Producer,消息接收者稱為Consumer,此外Kafka集群有多個(gè)Kafka實(shí)例組成,每個(gè)實(shí)例稱為Broker。
  4. Kafka需要依賴于Zookeeper保存一些Meta信息,來保證系統(tǒng)可用性。
  5. 在流式計(jì)算中,Kafka一般用來緩存數(shù)據(jù),Storm通過消費(fèi)Kafka的數(shù)據(jù)進(jìn)行計(jì)算。

三、Kafka架構(gòu)

image.png
No. 組件 說明
1 Producer 消息生產(chǎn)者,向Kafka中發(fā)消息的客戶端。
2 Consumer 消息消費(fèi)者,從Kafka中取消息的客戶端。
3 Broker 一臺(tái)Kafka服務(wù)器就是一個(gè)Broker,一個(gè)集群由多個(gè)Broker組成,一個(gè)Broker可以容納多個(gè)Topic。
4 Topic 屬于特定類別的消息流稱為Topic, 數(shù)據(jù)存儲(chǔ)在Topic中,可以理解為一個(gè)隊(duì)列。
5 Partition 為了實(shí)現(xiàn)負(fù)載均衡和高并發(fā),一個(gè)非常大的Topic可以通過分為多個(gè)Partition分布到多個(gè)Broker上,每個(gè)Partition都是一個(gè)有序的隊(duì)列。
6 Consumer Group(CG) 這是Kafka用來實(shí)現(xiàn)一個(gè)消息的廣播(發(fā)給所有Consumer)和單播(發(fā)給任意一個(gè)Consumer)的手段。一個(gè)Topic可以有多個(gè)CG,Topic中的消息會(huì)復(fù)制(不是真的復(fù)制,是概念上的)到所有的CG,但每個(gè)Partition只會(huì)把消息發(fā)給該CG中的一個(gè)Consumer。如果需要實(shí)現(xiàn)廣播,只要每個(gè)Consumer都有一個(gè)獨(dú)立的CG即可,需要實(shí)現(xiàn)單播,只要所有Consumer都在同一個(gè)CG即可。用CG還可以將Consumer進(jìn)行自由分組而不需要重復(fù)發(fā)送消息到不同的Topic。
7 Offset Partition中的每條消息都會(huì)被分配一個(gè)有序的id(Offset)。Kafka只保證每個(gè)Partition中的順序,不保證多個(gè)Partition的順序。
8 Leader 負(fù)責(zé)給定Partition的所有讀取和寫入的節(jié)點(diǎn)。
9 Follower 跟隨Leader指令的節(jié)點(diǎn)被稱為Follower。 如果Leader節(jié)點(diǎn)宕機(jī),其中一個(gè)Follower將通過選舉自動(dòng)成為新的Leader。

四、Kafka工作流程

Producer生產(chǎn)過程

  1. 寫入方式
    Producer采用Push模式將消息發(fā)布到Broker,每條消息都被追加(append)到Partition中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障Kafka吞吐率)。
  2. 分區(qū)(Partition)
    消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic,其本質(zhì)就是一個(gè)目錄,而topic是由一些Partition Logs(分區(qū)日志)組成,其組織結(jié)構(gòu)如下圖所示:


    image.png

    image.png

    我們可以看到,每個(gè)Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。

    1. 分區(qū)的原因
      1. 負(fù)載均衡,方便在集群中擴(kuò)展,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)Topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了。
      2. 可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫了。
    2. 分區(qū)的原則
      1. 已指定Partition,則直接使用該P(yáng)artition。
      2. 未指定Partition但指定了Key,則通過對(duì)Key進(jìn)行哈希計(jì)算得出一個(gè)Partition。
      3. Partition和Key都未指定,則輪詢選出一個(gè)Partition。
  3. 副本(Replication)
    同一個(gè)Partition可能會(huì)有多個(gè)Replication(對(duì)應(yīng)server.properties配置中的default.replication.factor=N)。沒有Replication的情況下,一旦Broker宕機(jī),其上所有Partition的數(shù)據(jù)都不可被Consumer消費(fèi),同時(shí)Producer也不能再將數(shù)據(jù)存于其上的Partition。引入Replication之后,同一個(gè)Partition可能會(huì)有多個(gè)Replication,而這時(shí)需要在這些Replication之間選出一個(gè)Leader,Producer和Consumer只與這個(gè)Leader交互,其它Replication作為Follower從Leader中復(fù)制數(shù)據(jù)。
  4. 寫入流程


    image.png

Broker保存過程

  1. 存儲(chǔ)方式
    物理上把Topic分成一個(gè)或多個(gè)Partition(對(duì)應(yīng)server.properties配置中的num.partitions=3),每個(gè)Partition物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該P(yáng)artition的所有消息和索引文件),如下:
[atguigu@hadoop102 logs]$ ll
drwxrwxr-x. 2 atguigu atguigu  4096 8月   6 14:37 first-0
drwxrwxr-x. 2 atguigu atguigu  4096 8月   6 14:35 first-1
drwxrwxr-x. 2 atguigu atguigu  4096 8月   6 14:37 first-2
[atguigu@hadoop102 logs]$ cd first-0
[atguigu@hadoop102 first-0]$ ll
-rw-rw-r–. 1 atguigu atguigu 10485760 8月   6 14:33 00000000000000000000.index
-rw-rw-r–. 1 atguigu atguigu      219 8月   6 15:07 00000000000000000000.log
-rw-rw-r–. 1 atguigu atguigu 10485756 8月   6 14:33 00000000000000000000.timeindex
-rw-rw-r–. 1 atguigu atguigu        8 8月   6 14:37 leader-epoch-checkpoint
  1. 存儲(chǔ)策略
    無論消息是否被消費(fèi),Kafka都會(huì)保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

    1. 基于時(shí)間:log.retention.hours=168
    2. 基于大小:log.retention.bytes=1073741824

    需要注意的是,因?yàn)镵afka讀取特定消息的時(shí)間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高Kafka性能無關(guān)。

  2. Zookeeper存儲(chǔ)結(jié)構(gòu)


    image.png

    注意:只有Broker和Consumer在Zookeeper中注冊(cè),Producer不在Zookeeper中注冊(cè)。

Consumer消費(fèi)過程

  1. 消費(fèi)者組


    image.png

    Consumer是以Consumer Group的方式工作,由一個(gè)或者多個(gè)Consumer組成一個(gè)Group,共同消費(fèi)一個(gè)Topic。每個(gè)Partition在同一時(shí)間只能由Group中的一個(gè)Consumer讀取,但是多個(gè)Group可以同時(shí)消費(fèi)同一個(gè)Partition。在圖中,有一個(gè)由三個(gè)Consumer組成的Group,有一個(gè)Consumer讀取Topic中的兩個(gè)Partition,另外兩個(gè)Consumer分別讀取一個(gè)Partition。某個(gè)Consumer讀取某個(gè)Partition,也可以叫做某個(gè)Consumer是某個(gè)Partition的擁有者。
    在這種情況下,Consumer可以通過水平擴(kuò)展的方式同時(shí)讀取大量的消息。另外,如果一個(gè)Consumer失敗了,那么Group中其它的Consumer會(huì)自動(dòng)負(fù)載均衡讀取之前失敗的Consumer讀取的Partition。

  2. 消費(fèi)方式
    Consumer采用Pull模式從Broker中讀取數(shù)據(jù)。
    Push模式很難適應(yīng)消費(fèi)速率不同的Consumer,因?yàn)橄l(fā)送速率是由Broker決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成Consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而Pull模式則可以根據(jù)Consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
    對(duì)于Kafka而言,Pull模式更合適,它可簡(jiǎn)化Broker的設(shè)計(jì),Consumer可自主控制消費(fèi)消息的速率,同時(shí)Consumer可以自己控制消費(fèi)方式——即可以批量消費(fèi)也可以逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語(yǔ)義。
    Pull模式不足之處是,如果Kafka沒有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán)中,一直等待數(shù)據(jù)到達(dá)。為了避免這種情況,可以Pull請(qǐng)求中設(shè)置參數(shù),允許Consumer請(qǐng)求在等待數(shù)據(jù)到達(dá)的“長(zhǎng)輪詢”中進(jìn)行阻塞(并且可選地等待到給定的字節(jié)數(shù),以確保大的傳輸大?。?/li>

五、Kafka案例

生產(chǎn)者

package com.fas.kafka.service;

import org.apache.log4j.Logger;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import com.fas.kafka.util.KafkaUtil;

@Service
@Log4j
public class KafkaProducerService {
    private KafkaTemplate<String, String> kafkaTemplate = KafkaUtil.getKafkaTemplate();

    public void send(String topic, String data) {
        ListenableFuture<SendResult<String, String>> listenableFuture = kafkaTemplate.send(topic, data);
        listenableFuture.addCallback(result -> log.info("Send data to kafka [success] - " + topic + " - " + data), result -> log.info("Send data to kafka [error] - " + topic + " - " + data));
    }
}

消費(fèi)者

package com.fas.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.MessageListener;

import com.alibaba.fastjson.JSONObject;
import com.fas.kafka.entity.DeviceOnlineData;

import lombok.extern.log4j.Log4j;

@Log4j
public class KafkaConsumer implements MessageListener<String, String> {
    public static final BlockingQueue<DeviceOnlineData> DATA_ACCESS_BLOCKING_QUEUE = new LinkedBlockingQueue<>();
    public static final String KAFKA_DATA_ACCESS_TOPIC = PropertiesUtil.KAFKA_PROPERTIES.getProperty("kafka.data_access.topic");
    public static final String KAFKA_TEST_TOPIC = PropertiesUtil.KAFKA_PROPERTIES.getProperty("kafka.test.topic");

    @Override
    public void onMessage(ConsumerRecord<String, String> consumerRecord) {
        String topic = consumerRecord.topic();
        log.info("topic: " + topic);
        String value = consumerRecord.value();
        log.info("value: " + value);

        if (KAFKA_DATA_ACCESS_TOPIC.equals(topic)) {
            JSONObject.parseArray(value, DeviceOnlineData.class).stream().forEach(deviceOnlineData -> {
                try {
                    DATA_ACCESS_BLOCKING_QUEUE.put(deviceOnlineData);
                    log.info("Put into blocking queue");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        } else if (KAFKA_TEST_TOPIC.equals(topic)) {
            log.info("Test success");
        } else {
            log.info("Unknown topic");
        }
    }
}

六、常用命令

  1. 查看所有Topic
    ./bin/kafka-topics.sh --list --zookeeper zk01:2181
  2. 創(chuàng)建Topic
    ./bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
  3. 刪除Topic
    ./bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
    需要server.properties中設(shè)置delete.topic.enable=true,否則只是標(biāo)記刪除,或者直接重啟
  4. 通過Shell命令發(fā)送消息
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
  5. 通過Shell消費(fèi)消息
    ./bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test
  6. 查看消費(fèi)位置
    ./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
  7. 查看某個(gè)Topic的詳情
    ./bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181
  8. 對(duì)分區(qū)數(shù)進(jìn)行修改
    ./bin/kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容