kafka基礎(chǔ)

1. kafka架構(gòu)圖

kafka架構(gòu)圖

2. 角色分析

1. Broker

kafka作為一個(gè)消息中間件,用于存儲(chǔ)和轉(zhuǎn)發(fā)消息,可以把它想象成一個(gè)中介,股票經(jīng)紀(jì)人就叫做broker。默認(rèn)端口是9092,生產(chǎn)者和消費(fèi)者都需要跟這個(gè)Broker建立連接才可以實(shí)現(xiàn)消息的收發(fā)。

2. 消息

客戶端之間傳輸?shù)臄?shù)據(jù)稱之為消息, 或者說是記錄(record)。請記住,對于kafka來說,不管是消費(fèi)者還是生產(chǎn)者都是客戶端。 在客戶端的代碼中,Record可以是一個(gè)key-value鍵值對,生產(chǎn)者對應(yīng)的封裝類是ProducerRecord, 消費(fèi)者對應(yīng)的封裝類是ConsumerRecord。消息在傳輸?shù)倪^程中需要序列化,所有需要我們在代碼中執(zhí)行序列化工具。消息在服務(wù)端中存儲(chǔ)的格式(RecordBatch和Record)。

3. 生產(chǎn)者

我們將發(fā)送消息的一方稱之為生產(chǎn)者,接收消息的乙方稱之為消費(fèi)者,為了提升消息發(fā)送的速率,生產(chǎn)者并不是組條發(fā)送消息到broker中,而是批量發(fā)送的。多少條發(fā)送一次,由配置中的一個(gè)參數(shù)決定。

props.put("batch.size", 16384);

4. 消費(fèi)者

一般來說,消費(fèi)者獲取消息存在兩種方式,一種是pull, 一種是push。kafka采用的是pull模式。WHY?
\color{red}{在push模式下,如果消息產(chǎn)生的速度遠(yuǎn)大于消費(fèi)者消費(fèi)的速度,消費(fèi)者會(huì)不堪重負(fù),最終掛掉。}
消費(fèi)者可以控制自己一次消費(fèi)多少條消息

max.poll.record=500    #默認(rèn)是500條

5. \color{red}{TOPIC}

生產(chǎn)者和消費(fèi)者之間每條消息之間是如何關(guān)聯(lián)起來的呢?也就是消費(fèi)者怎么就知道自己需要消費(fèi)什么消息?
隊(duì)列的存在就是解決這個(gè)問題的。在kafka里面這個(gè)隊(duì)列就是topic,\color{red}{它是一個(gè)邏輯概念}。
生產(chǎn)者和Topic,Topic和消費(fèi)者的關(guān)系都是多對多(不建議這么做)。
當(dāng)生產(chǎn)者發(fā)送消息時(shí),沒有對應(yīng)的Topic,這個(gè)時(shí)候會(huì)自動(dòng)創(chuàng)建Topic。可以通過參數(shù)控制

auto.enable.topics.enable=true   #默認(rèn)時(shí)true

6. partition和Cluster

分區(qū)其實(shí)是一種數(shù)據(jù)庫分片的思想。試想一下,如果一個(gè)topic中消息過多,會(huì)產(chǎn)生什么樣的問題。

  • 不方便橫向擴(kuò)展,通過擴(kuò)展機(jī)器而不是升級硬件擴(kuò)展。
  • 并發(fā)負(fù)載,所有的客戶端都操作同一個(gè)topic,在高并發(fā)的場景下,性能瓶頸
    kafka分區(qū)概念---partition。一個(gè)topic可以劃分成多個(gè)分區(qū),分區(qū)在創(chuàng)建topic的時(shí)候指定,每個(gè)topic至少有一個(gè)分區(qū)。
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic xiong

如果沒有指定分區(qū)數(shù),默認(rèn)分區(qū)是1個(gè),可通過下述參數(shù)修改

num.partitions=1

partition負(fù)載的實(shí)現(xiàn)。舉例說明,Topic有三個(gè)分區(qū),生產(chǎn)者發(fā)送了9條消息,第一個(gè)分區(qū)存儲(chǔ)了1 4 7, 第二個(gè)分區(qū)存儲(chǔ)了2 5 8,第三個(gè)分區(qū)存儲(chǔ)了3 6 9。這種情況下其實(shí)就是負(fù)載的一種體現(xiàn)

每個(gè)partition都會(huì)有一個(gè)物理目錄。kafka的配置文件下可以配置日志的存儲(chǔ)路徑,默認(rèn)存儲(chǔ)在/tmp/kafka-logs下,假設(shè)topic=xiongTopic, 每個(gè)分區(qū)的存儲(chǔ)目錄就是xiongTopic-0、xiongTopic-1.....

\color{red}{與rabbitMq不同的是,Partition中的消息被讀取以后不會(huì)被刪除,kafka是通過一個(gè)類似游標(biāo)的東} \color{red}{西用來記錄當(dāng)前消息讀取的位置偏移量信息。同時(shí),同一批消息在一個(gè)partition里面是順序追加寫入的。} \color{red}{這里也是kafka吞吐量大的一個(gè)重要原因.}

7. 副本機(jī)制

如果partition的數(shù)據(jù)只存儲(chǔ)了一份,在發(fā)生網(wǎng)絡(luò)或者硬件故障的時(shí)候,該分區(qū)的數(shù)據(jù)會(huì)無法訪問或者無法恢復(fù)了。kafka在0.8版本之后增加了副本機(jī)制, 每個(gè)partiotion可以有若干個(gè)副本,\color{red}{副本必須在不同的broker上}。一般我們說的副本包括其中的主節(jié)點(diǎn)。
由replication-factor指定一個(gè)Topic的副本數(shù):

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partition --topic testxiong

服務(wù)端有個(gè)參數(shù)控制默認(rèn)的副本數(shù)

offsets.topic.replication.factor=3
分區(qū)、副本

leader用粉紅色標(biāo)識(shí),follower用綠色標(biāo)識(shí),leader是由選舉得出。
生產(chǎn)者消費(fèi)者消息傳遞都是通過leader來操作,follower的數(shù)據(jù)是通過leader同步過來的。

8. Segment

kafka的數(shù)據(jù)是放在后綴為.log的文件中,試想一下,kafka的數(shù)據(jù)在同一個(gè)partition中是順序?qū)懭氲?,我們不斷的追加?shù)據(jù),那保存數(shù)據(jù)的文件就會(huì)越來越大,這個(gè)時(shí)候檢索的效率就會(huì)越來越低。
所以,kafka這塊干脆對partition再次進(jìn)行了切分,切分出來的單位就就做段(segment),實(shí)際上kafka數(shù)據(jù)的存儲(chǔ)是分段的。我們可以在kafka的存儲(chǔ)目錄下看到這三個(gè)文件都是成對出現(xiàn)的:


segment

這其中是一個(gè)數(shù)據(jù)文件,2個(gè)索引文件。segment的默認(rèn)存儲(chǔ)大小是1G,可以通過一下參數(shù)進(jìn)行控制。

log.segment.bytes=1073741824

9. Consumer Group

在kafka中,消費(fèi)者是以消費(fèi)者組的形式對消息進(jìn)行接收。每個(gè)消費(fèi)者組都會(huì)由一個(gè)group id與對應(yīng)的topic進(jìn)行綁定。
\color{blue}{注意: 同一個(gè)group中的消費(fèi)者不能消費(fèi)相同的partition,可以將partition比作一個(gè)座位,一個(gè)座位最多坐一個(gè)人。}

  • 消費(fèi)者組中,消費(fèi)者數(shù)量比partition數(shù)量少的情況下,一個(gè)消費(fèi)者同時(shí)消費(fèi)多個(gè)partition。
  • 消費(fèi)者組中,消費(fèi)者數(shù)量比partition數(shù)量多的情況下,存在消費(fèi)者空閑。

這兩種情況都不是效率最高的情況,只有消費(fèi)者數(shù)量和partition數(shù)量保持一致才是最好的選擇。如果想要消費(fèi)同一個(gè)partition,就需要另一個(gè)消費(fèi)者組來進(jìn)行。

10. Comsumer Offset

我們前面談到,在Kafka中消息是順序寫入的,并且消費(fèi)的消息是不會(huì)被刪除的。那么,如果消費(fèi)者突然掛掉,或者進(jìn)行下次讀寫時(shí),如何知道自己已經(jīng)讀取了哪些信息,該從何處繼續(xù)讀取消息呢?
既然消息是有序的,那我們就可以給消息進(jìn)行編號,來唯一標(biāo)識(shí)一條消息。

offset

這里的編號我們就稱之為offset,偏移量。offset記錄著下一條將要發(fā)送給consumer的消息序號。offset的保存是保存在服務(wù)端的,并不是保存在ZK上面。

3. Kafka Java開發(fā)

生產(chǎn)者:

public class SimpleProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.128:9092");
        // 設(shè)置key  value序列化的工具
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //設(shè)置消息接收確認(rèn)模式  0 發(fā)出就立刻確認(rèn), 1 leader接收到就確認(rèn)  all 所有follower同步完成再確認(rèn)
        props.put("acks","1");
        // 異常重試次數(shù)
        props.put("retries", 3);
        // 設(shè)置批量發(fā)送數(shù)據(jù)一次,數(shù)據(jù)大小,默認(rèn)16k
        props.put("batch.size",16384);
        // 設(shè)置批量發(fā)送等待時(shí)間
        props.put("linger.ms", 5);
        // 設(shè)置客戶端緩沖區(qū)大小,默認(rèn)是32M,滿了以后也會(huì)出發(fā)消息發(fā)送
        props.put("buffer.memory", 33554432);
        // 獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞時(shí)間,超時(shí)后拋出異常
        props.put("max.block.ms", 3000);

        Producer<String, String> producer = new KafkaProducer<String, String>(props);

        for (int i=0; i < 100; i ++) {
            producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}

消費(fèi)者

public class SimpleConsumer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.182.128:9092");
        props.put("group.id", "xiong-group");
        // 是否自動(dòng)提交偏移量,只有commit之后才更新消費(fèi)者組的offset
        props.put("enable.auto.commit", "true");
        // 消費(fèi)者自動(dòng)提交的時(shí)間間隔
        props.put("auto.commit.interval.ms", "1000");
        // 從最早的數(shù)據(jù)開始消費(fèi)earliest | latest | none
        props.put("auto.offset.reset", "earliest");
        // 設(shè)置key  value反序列化的工具
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //訂閱隊(duì)列
        consumer.subscribe(Arrays.asList("mytopic"));
        try{
            while(true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset=%d, key=%s, value=%s, partition=%s%n",
                            record.offset(), record.key(), record.value(), record.partition());
                }
            }

        } finally {
            consumer.close();
        }
    }
}

查詢消費(fèi)者相關(guān)偏移量數(shù)據(jù):

./kafka-consumer-groups.sh --bootstrap-server 192.168.182.128:9092 --describe --group xiong-group

4. 消息冪等性

什么叫做消息冪等性?
簡單來說就是,消息發(fā)送一次的結(jié)果和發(fā)送多次的結(jié)果是一樣的。
有時(shí)候消息消費(fèi)失敗的情況下,我們可能會(huì)采用消息重發(fā)的機(jī)制。但是生產(chǎn)者有時(shí)候是不知道消息是不是真的消費(fèi)失敗時(shí),這時(shí)候消息的重發(fā)可能會(huì)產(chǎn)生消息重復(fù)的情況。
kafka實(shí)現(xiàn)消息的冪等性是在broker中實(shí)現(xiàn)的,而不是消費(fèi)者端實(shí)現(xiàn),大大的解放了消費(fèi)者的雙手。
如何實(shí)現(xiàn)消息的去重?
去重是需要依賴生產(chǎn)者消息的唯一標(biāo)識(shí)的,不然我們沒法知道是否是同一條消息,kafka中可以通過如下配置來產(chǎn)生唯一標(biāo)識(shí),將producer升級成冪等性的producer。

props.put("enable.idempotence", true);

實(shí)現(xiàn)機(jī)制:

  • PID(Producer ID), 冪等性的生產(chǎn)者每個(gè)客戶端都有一個(gè)唯一的編號。
  • sequence number,冪等性的生產(chǎn)者發(fā)送的每條消息都會(huì)帶sequence number, Server端就是通過這個(gè)值來判斷消息是否重復(fù)。如果server端發(fā)現(xiàn)sequence number的值比服務(wù)端記錄的值要小,那證明這個(gè)消息是重復(fù)的消息。(同一分區(qū)消息順序?qū)懭?,之前如果存在sequence number較小的在后面寫入,那證明之前肯定已經(jīng)有相同的消息已經(jīng)發(fā)送過來過了)。

作用范圍:

  1. sequence number并不是全局有序,不能保證所有時(shí)間上的冪等。只能保證單分區(qū)上的冪等。
  2. 單會(huì)話上的冪等,這里的會(huì)話是指producer進(jìn)程的一次運(yùn)行。當(dāng)producer重啟以后就不能保證了。

5. 生產(chǎn)者事務(wù)

生產(chǎn)者與事務(wù)有關(guān)的方法如下:(kafka 0.11版本以后才支持事務(wù))

對象 描述
initTransactions() 初始化事務(wù)
beginTransaction() 開啟事務(wù)
commitTransaction() 提交事務(wù)
abortTransaction() 中止事務(wù)
sendOffsetsToTransaction() sendOffsetsToTransaction方法是消費(fèi)者和生產(chǎn)者在同一段代碼使用的(從上游接收消息發(fā)送給下游),在提交的時(shí)候把消費(fèi)消息的offset發(fā)送給consumer Corordinator.

代碼示例:

        //事務(wù)的前提是消費(fèi)者的冪等性
        props.put("enable.idempotence", true);
        //設(shè)置事務(wù)id,唯一
        props.put("transactional.id", UUID.randomUUID().toString());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        producer.initTransactions();
        try{
            producer.beginTransaction();
            for (int i=0; i < 100; i ++) {
                producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(i), Integer.toString(i)));
                if (i == 20) {
                    Integer j = 1/0; //制造異常
                }
            }
            producer.send(new ProducerRecord<String, String>("mytopic", Integer.toString(100), Integer.toString(100)));
            producer.commitTransaction();
        } catch (KafkaException e) {
            producer.abortTransaction();
        }
        producer.close();

kafka分布式事務(wù)的實(shí)現(xiàn):

  1. 生產(chǎn)者的消息會(huì)分區(qū),所以這里的事務(wù)屬于分布式事務(wù)。kafka采用的是2PC提交。如果大家都可以commit就提交,否則就abort;
  2. 2PC的情況下,需要一個(gè)協(xié)調(diào)者,在Kafka中這個(gè)角色叫做Transaction Coordinator。
  3. 事務(wù)管理必須有事務(wù)日志來記錄事務(wù)的狀態(tài),以便在Coordinator以外掛掉以后繼續(xù)處理原來的事務(wù)。事務(wù)日志的存儲(chǔ)類似于消費(fèi)者offset的存儲(chǔ),kafka使用了一個(gè)特殊topic--transaction_state來記錄事務(wù)的狀態(tài)信息。
  4. 如果生產(chǎn)者掛了,事務(wù)要在重啟以后繼續(xù)處理就需要有一個(gè)唯一的事務(wù)id來找到對應(yīng)的事務(wù),這個(gè)就是transaction.id。配置了transaction.id,此時(shí)生產(chǎn)者必須是冪等性的生產(chǎn)者。事務(wù)id相同的生產(chǎn)者可以繼續(xù)處理原來的事務(wù)。
事務(wù)處理

步驟描述:
A: 生產(chǎn)者通過initTransactions Api向coordinator注冊事務(wù)id。
B: Corrdinator記錄事務(wù)日志
C: 生產(chǎn)者將消息寫入目標(biāo)分區(qū)
D: 分區(qū)域Coordinator的交互,當(dāng)事務(wù)完成以后消息的狀態(tài)應(yīng)該是已提交。這時(shí)候消費(fèi)者才能消費(fèi)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容