Kafka流程分析-生產(chǎn)者

Kafka 工作流程分析

1、Kafka生產(chǎn)過(guò)程分析

(1)寫入方式

producer采用推(push)模式將消息發(fā)布到broker,每條消息都被追加(append)到分區(qū)(patition)中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障kafka吞吐率)

(2)partition

說(shuō)明:

  • 消息發(fā)送時(shí)都被發(fā)送到一個(gè)topic,其本質(zhì)就是一個(gè)目錄,而topic是由一些Partition Logs(分區(qū)日志)組成.
  • 每個(gè)Partition中的消息都是有序的,生產(chǎn)的消息被不斷追加到Partition log上,其中的每一個(gè)消息都被賦予了一個(gè)唯一的offset值。

分區(qū)原因:

  • 提升拓展性:每個(gè)Partition可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了
  • 提高吞吐能力:在進(jìn)行數(shù)據(jù)寫入時(shí)以 Partition 為單位進(jìn)行寫入。

分區(qū)依據(jù):

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        // (1) 指定了patition,則直接使用該 Partition
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }
  • 對(duì)于已經(jīng)指定了 partition 的,則直接使用該partition;
  • 未指定patition但指定key,通過(guò)對(duì)key的value進(jìn)行hash出一個(gè)patition;
  • patition和key都未指定,使用輪詢選出一個(gè)patition;
(3)Replica(副本)
Replica流程
(4)寫入流程

流程圖:

producer寫入流程

流程描述:

  • producer先從zookeeper的 "/brokers/.../state"節(jié)點(diǎn)找到該partition的leader
  • producer將消息發(fā)送給該leader
  • leader將消息寫入本地log
  • followers從leader pull消息
  • Follower將 pull到的消息寫入本地log
  • Follower 寫入成功值后向leader發(fā)送ACK
  • leader收到所有ISR中的replication的ACK后,增加HW
  • 向producer發(fā)送ACK

2、 Broker 保存消息

(1)存儲(chǔ)說(shuō)明
  • 物理上把topic分成一個(gè)或多個(gè)patition,每個(gè)patition物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該patition的所有消息和索引文件)
  • Kafka讀取特定消息的時(shí)間復(fù)雜度為O(1);
  • 消息數(shù)據(jù)是存儲(chǔ)在partition文件夾下的*.log文件中的;
  • 消息存儲(chǔ)時(shí)常有兩個(gè)策略,分別為:
基于時(shí)間存儲(chǔ)策略:默認(rèn)保留168小時(shí)(log.retention.hours=168)  
基于大小保留策略:默認(rèn)保留 1G(log.retention.bytes=1073741824)
(2)Zk存儲(chǔ)結(jié)構(gòu)
Zk存儲(chǔ)結(jié)構(gòu)

3、consumer flow

(1) 高級(jí)API與低級(jí)API
  • kafka提供了兩套consumer API:高級(jí)Consumer API和低級(jí)Consumer API。
  • 高級(jí)API不需要自行去管理offset,partition replica等,系統(tǒng)通過(guò)Zk自行管理。(低級(jí) API反之)
(2)Consumer Group(消費(fèi)者組)

流程圖:

Consumer Group

描述說(shuō)明:

  • Consumer Group 由多個(gè)Consumer 組成,同時(shí)一個(gè)Consumer只有屬于一個(gè)Consumer Group。
  • Consumer Group 保證了其訂閱的Topic partition 會(huì)被該Consumer Group 中的Consumer消費(fèi)。對(duì)于多個(gè)Consumer Group訂閱了同一個(gè)Topic,每個(gè)Consumer Group之間互不影響。
  • 如果要實(shí)現(xiàn)一個(gè)消息被多個(gè) consumer 消費(fèi),則可以將當(dāng)consumer 單獨(dú)添加到單獨(dú)的Consumer Group中(反之,如果要實(shí)現(xiàn)一個(gè)消息 被一個(gè) consumer 消費(fèi),則可以將當(dāng)consumer 添加到同一個(gè)Consumer Group中)
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容