原文地址:https://kafka.apache.org/0101/documentation.html#apidesign
Producer APIs
Producer API封裝了底層兩個(gè)Producer:
- kafka.producer.SyncProducer
- kafka.producer.async.AsyncProducer
class Producer {
/* Sends the data, partitioned by key to the topic using either the */
/* synchronous or the asynchronous producer */
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
/* Sends a list of data, partitioned by key to the topic using either */
/* the synchronous or the asynchronous producer */
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
/* Closes the producer and cleans up */
public void close();
}
這么做的目的是通過一個(gè)簡(jiǎn)答的API暴露把所有Producer的功能暴露給Client。Kafka的Producer可以:
- 排隊(duì)/緩存多個(gè)發(fā)送請(qǐng)求并且異步的批量分發(fā)出去:
kafka.producer.Producer提供批量化多個(gè)發(fā)送請(qǐng)求(producer.type=async),之后進(jìn)行序列化并發(fā)送的分區(qū)的能力。批大小可以通過一些配置參數(shù)進(jìn)行設(shè)置。將事件加入到queue中,他們會(huì)被緩沖在queue中,直到滿足queue.time或batch.size達(dá)到配置的值。后臺(tái)線程(kafka.producer.async.ProducerSendThread)從queue中獲取數(shù)據(jù)并使用kafka.producer.EventHandler對(duì)數(shù)據(jù)進(jìn)行序列化并發(fā)送到合適的分區(qū)。可以通過event.handler參數(shù)以插件的形式添加自定義的event handler程序。在producer queue pipeline處理的各個(gè)階段可以注入回調(diào),用于自定義的日志/跟蹤代碼或者監(jiān)控邏輯。這可以通過實(shí)現(xiàn)kafka.producer.async.CallbackHandler接口并設(shè)置callback.handler參數(shù)來實(shí)現(xiàn)。 - 使用用戶指定的Encoder來序列化數(shù)據(jù):
interface Encoder<T> {
public Message toMessage(T data);
}
默認(rèn)使用kafka.serializer.DefaultEncoder。
- 通過用戶可選的Partitioner來實(shí)現(xiàn)亂負(fù)載均衡:
Partition的路由由kafka.producer.Partitioner決定。
interface Partitioner<T> {
int partition(T key, int numPartitions);
}
分區(qū)選擇API使用key和分區(qū)總數(shù)來選擇最終的partition(返回選擇的partition id)。id用于從排序的partition列表中選擇最終的一個(gè)分區(qū)去發(fā)送數(shù)據(jù)。默認(rèn)的分區(qū)策略是hash(key)%numPartitions。如果key是null,會(huì)隨機(jī)選擇一個(gè)分區(qū)??梢酝ㄟ^partitioner.class參數(shù)來配置特定的分區(qū)選擇策略。
Consumer APIs
我們有兩個(gè)級(jí)別的Consumer API。低級(jí)別的“簡(jiǎn)單的”API和單個(gè)Broker之間保持鏈接并且和發(fā)送到服務(wù)端的網(wǎng)絡(luò)請(qǐng)求有緊密的對(duì)應(yīng)關(guān)系。這個(gè)API是無狀態(tài)的,每個(gè)請(qǐng)求都包含offset信息,允許用戶維護(hù)這個(gè)元數(shù)據(jù)。
高級(jí)別的API在Consumer端隱藏了Broker的細(xì)節(jié),并且允許從集群消費(fèi)數(shù)據(jù)而不關(guān)心底層的拓?fù)浣Y(jié)構(gòu)。同樣維持了“哪些數(shù)據(jù)已經(jīng)被消費(fèi)過”的狀態(tài)。高級(jí)別的API還提供了通過表達(dá)式訂閱的Topic的功能(例如通過白名單或者黑名單的方式訂閱)。
Low-level API
class SimpleConsumer {
/* Send fetch request to a broker and get back a set of messages. */
public ByteBufferMessageSet fetch(FetchRequest request);
/* Send a list of fetch requests to a broker and get back a response set. */
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
* Get a list of valid offsets (up to maxSize) before the given time.
* The result is a list of offsets, in descending order.
* @param time: time in millisecs,
* if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest offset available.
* if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest offset available.
*/
public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
低級(jí)別的API用于實(shí)現(xiàn)高級(jí)別的API,也被直接使用在一些在狀態(tài)上有特殊需求的“離線”Consumer。
High-level API
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
* This method is used to get a list of KafkaStreams, which are iterators over
* MessageAndMetadata objects from which you can obtain messages and their
* associated metadata (currently only topic).
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
* You can also obtain a list of KafkaStreams, that iterate over messages
* from topics that match a TopicFilter. (A TopicFilter encapsulates a
* whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */
public commitOffsets()
/* Shut down the connector */
public shutdown()
}
這個(gè)API圍繞迭代器,通過KafkaStream類實(shí)現(xiàn)。一個(gè)KafkaStream表示了一個(gè)或多個(gè)分區(qū)(可以分布在不同的Broker上)組成的消息流。每個(gè)Stream被單個(gè)線程處理,客戶端可以在創(chuàng)建流時(shí)提供需要的個(gè)數(shù)。這樣,一個(gè)流背后可以是多個(gè)分區(qū),但是一個(gè)分區(qū)只會(huì)屬于一個(gè)流。
createMessageStreams調(diào)用會(huì)把Consumer注冊(cè)到Topic,促使Consumer/Broker的重新分配。API鼓勵(lì)在單次調(diào)用中創(chuàng)建多個(gè)Stream以減少充分配的次數(shù)。createMessageStreamsByFilter方法的調(diào)用(另外的)用于注冊(cè)watcher去發(fā)現(xiàn)匹配過濾規(guī)則的topic。createMessageStreamsByFilter返回的迭代器可以迭代來此多個(gè)Topic的消息(如果多個(gè)Topic都符合過濾規(guī)則)。