【Kafka官方文檔翻譯】5.5.1. API 設(shè)計(jì)

原文地址:https://kafka.apache.org/0101/documentation.html#apidesign

Producer APIs

Producer API封裝了底層兩個(gè)Producer:

  • kafka.producer.SyncProducer
  • kafka.producer.async.AsyncProducer
    class Producer {

    /* Sends the data, partitioned by key to the topic using either the */
    /* synchronous or the asynchronous producer */
    public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);

    /* Sends a list of data, partitioned by key to the topic using either */
    /* the synchronous or the asynchronous producer */
    public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);

    /* Closes the producer and cleans up */
    public void close();

    }

這么做的目的是通過一個(gè)簡(jiǎn)答的API暴露把所有Producer的功能暴露給Client。Kafka的Producer可以:

  • 排隊(duì)/緩存多個(gè)發(fā)送請(qǐng)求并且異步的批量分發(fā)出去:
      kafka.producer.Producer提供批量化多個(gè)發(fā)送請(qǐng)求(producer.type=async),之后進(jìn)行序列化并發(fā)送的分區(qū)的能力。批大小可以通過一些配置參數(shù)進(jìn)行設(shè)置。將事件加入到queue中,他們會(huì)被緩沖在queue中,直到滿足queue.time或batch.size達(dá)到配置的值。后臺(tái)線程(kafka.producer.async.ProducerSendThread)從queue中獲取數(shù)據(jù)并使用kafka.producer.EventHandler對(duì)數(shù)據(jù)進(jìn)行序列化并發(fā)送到合適的分區(qū)。可以通過event.handler參數(shù)以插件的形式添加自定義的event handler程序。在producer queue pipeline處理的各個(gè)階段可以注入回調(diào),用于自定義的日志/跟蹤代碼或者監(jiān)控邏輯。這可以通過實(shí)現(xiàn)kafka.producer.async.CallbackHandler接口并設(shè)置callback.handler參數(shù)來實(shí)現(xiàn)。
  • 使用用戶指定的Encoder來序列化數(shù)據(jù):
    interface Encoder<T> {
    public Message toMessage(T data);
    }

默認(rèn)使用kafka.serializer.DefaultEncoder。

  • 通過用戶可選的Partitioner來實(shí)現(xiàn)亂負(fù)載均衡:
      Partition的路由由kafka.producer.Partitioner決定。
    interface Partitioner<T> {
    int partition(T key, int numPartitions);
    }

分區(qū)選擇API使用key和分區(qū)總數(shù)來選擇最終的partition(返回選擇的partition id)。id用于從排序的partition列表中選擇最終的一個(gè)分區(qū)去發(fā)送數(shù)據(jù)。默認(rèn)的分區(qū)策略是hash(key)%numPartitions。如果key是null,會(huì)隨機(jī)選擇一個(gè)分區(qū)??梢酝ㄟ^partitioner.class參數(shù)來配置特定的分區(qū)選擇策略。

Consumer APIs

我們有兩個(gè)級(jí)別的Consumer API。低級(jí)別的“簡(jiǎn)單的”API和單個(gè)Broker之間保持鏈接并且和發(fā)送到服務(wù)端的網(wǎng)絡(luò)請(qǐng)求有緊密的對(duì)應(yīng)關(guān)系。這個(gè)API是無狀態(tài)的,每個(gè)請(qǐng)求都包含offset信息,允許用戶維護(hù)這個(gè)元數(shù)據(jù)。
  高級(jí)別的API在Consumer端隱藏了Broker的細(xì)節(jié),并且允許從集群消費(fèi)數(shù)據(jù)而不關(guān)心底層的拓?fù)浣Y(jié)構(gòu)。同樣維持了“哪些數(shù)據(jù)已經(jīng)被消費(fèi)過”的狀態(tài)。高級(jí)別的API還提供了通過表達(dá)式訂閱的Topic的功能(例如通過白名單或者黑名單的方式訂閱)。

Low-level API

    class SimpleConsumer {

    /* Send fetch request to a broker and get back a set of messages. */
    public ByteBufferMessageSet fetch(FetchRequest request);

    /* Send a list of fetch requests to a broker and get back a response set. */
    public MultiFetchResponse multifetch(List<FetchRequest> fetches);

    /**
    * Get a list of valid offsets (up to maxSize) before the given time.
    * The result is a list of offsets, in descending order.
    * @param time: time in millisecs,
    *              if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.
    *              if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
    */
    public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
    }

低級(jí)別的API用于實(shí)現(xiàn)高級(jí)別的API,也被直接使用在一些在狀態(tài)上有特殊需求的“離線”Consumer。

High-level API

    /* create a connection to the cluster */
    ConsumerConnector connector = Consumer.create(consumerConfig);

    interface ConsumerConnector {

    /**
    * This method is used to get a list of KafkaStreams, which are iterators over
    * MessageAndMetadata objects from which you can obtain messages and their
    * associated metadata (currently only topic).
    *  Input: a map of <topic, #streams>
    *  Output: a map of <topic, list of message streams>
    */
    public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);

    /**
    * You can also obtain a list of KafkaStreams, that iterate over messages
    * from topics that match a TopicFilter. (A TopicFilter encapsulates a
    * whitelist or a blacklist which is a standard Java regex.)
    */
    public List<KafkaStream> createMessageStreamsByFilter(
        TopicFilter topicFilter, int numStreams);

    /* Commit the offsets of all messages consumed so far. */
    public commitOffsets()

    /* Shut down the connector */
    public shutdown()
    }

這個(gè)API圍繞迭代器,通過KafkaStream類實(shí)現(xiàn)。一個(gè)KafkaStream表示了一個(gè)或多個(gè)分區(qū)(可以分布在不同的Broker上)組成的消息流。每個(gè)Stream被單個(gè)線程處理,客戶端可以在創(chuàng)建流時(shí)提供需要的個(gè)數(shù)。這樣,一個(gè)流背后可以是多個(gè)分區(qū),但是一個(gè)分區(qū)只會(huì)屬于一個(gè)流。
  createMessageStreams調(diào)用會(huì)把Consumer注冊(cè)到Topic,促使Consumer/Broker的重新分配。API鼓勵(lì)在單次調(diào)用中創(chuàng)建多個(gè)Stream以減少充分配的次數(shù)。createMessageStreamsByFilter方法的調(diào)用(另外的)用于注冊(cè)watcher去發(fā)現(xiàn)匹配過濾規(guī)則的topic。createMessageStreamsByFilter返回的迭代器可以迭代來此多個(gè)Topic的消息(如果多個(gè)Topic都符合過濾規(guī)則)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容