org.apache.kafka.clients.consumer.KafkaConsumer(一)

* A client that consumes records from a Kafka cluster.

* kafka客戶端從kafka集群消費(fèi)消息(獲取消息)


*? This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions it fetches migrate ????within the cluster. This client also interacts with the broker to allow groups of consumers to load balance consumption ????using?consumer groups.????

* 該客戶端透明地處理Kafka broker的故障,并透明地適應(yīng)它在集群內(nèi)獲取遠(yuǎn)處的主題分區(qū)。該客戶端還與Kafka broker交互為了使用許消費(fèi)者組使用?消費(fèi)者組?來負(fù)載均衡消費(fèi)。


* The consumer maintains TCP connections to the necessary brokers to fetch data Failure to close the consumer after use will leak these connections.The consumer is not thread-safe. See?Multi-threaded Processing?for more details.

* Cross-Version Compatibility

* This client can communicate with brokers that are version 0.10.0 or newer. Older or newer brokers may not support? certain features. For example, 0.10.0 brokers do not support offsetsForTimes, because this feature was added in version 0.10.1. You will receive an {@link org.apache.kafka.common.errors.UnsupportedVersionException} when invoking an API that is not available on the running broker version.

*消費(fèi)者維護(hù)TCP連接到brokers去獲取數(shù)據(jù),在使用消費(fèi)者后,關(guān)閉消費(fèi)者,如果消費(fèi)者關(guān)閉失敗,那么這些鏈接將泄漏。消費(fèi)者不是線程安全的。 有關(guān)更多詳細(xì)信息,請(qǐng)參閱?多線程處理。

* 跨版本兼容性

*此客戶端可以與0.10.0或更新版本的代理進(jìn)行通信。 較早或較新的brokers可能不支持某些功能。 例如,0.10.0代理不支持? ? ?offsetsForTimes,因?yàn)榇斯δ苁窃诎姹?.10.1中添加的。 調(diào)用正在運(yùn)行的 broker 版本上不可用的API時(shí),您將收到{@link org.apache.kafka.common.errors.UnsupportedVersionException}。


*and Consumer Position

* Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There are actually two notions of position relevant to the user of the consumer:

*偏移和消費(fèi)者的位置

* Kafka為分區(qū)中的每條記錄保留一個(gè)數(shù)字偏移量。 該偏移量用作該分區(qū)內(nèi)記錄的唯一標(biāo)識(shí)符,也表示消費(fèi)者在分區(qū)中的位置。 例如,處于位置5的消費(fèi)者已經(jīng)消耗了具有偏移量0到4的記錄,并且將接下來以偏移量5接收該記錄。實(shí)際上有兩個(gè)與消費(fèi)者有關(guān)的位置的概念:一個(gè)是消費(fèi)者所處的位置一個(gè)是消費(fèi)者已經(jīng)消費(fèi)了的位置


* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances every time the consumer receives messages in a call to {@link #poll(long)}.

*消費(fèi)者的{@link #position(TopicPartition)position}給出下一個(gè)記錄的偏移量。 這比消費(fèi)者在該分區(qū)中看到的最高偏移大一點(diǎn)。 每次消費(fèi)者在調(diào)用{@link #poll(long)}接收到消息時(shí),它都會(huì)自動(dòng)前進(jìn)。


* The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs (e.g. {@link #commitSync() commitSync} and {@link #commitAsync(OffsetCommitCallback) commitAsync}).

* {@link #commitSync()提交位置}是安全存儲(chǔ)的最后一個(gè)偏移量。 如果進(jìn)程失敗并重新啟動(dòng),這是個(gè)位置是消費(fèi)者將要繼續(xù)使用的繼續(xù)提交位置。 用戶也可以選擇通過調(diào)用其中一個(gè)提交API(例如{@link #commitSync()commitSync}和{@link #commitAsync(OffsetCommitCallback)commitAsync})來手動(dòng)控制此提交位置。


* This distinction gives the consumer control over when a record is considered consumed. It is discussed in further detail below.?Consumer Groups and Topic Subscriptions?

*Kafka uses the concept of?consumer groups?to allow a pool of processes to divide the work of consuming and processing records. These processes can either be running on the same machine or they can be distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances? sharing the same group.id will be part of the same consumer group.

*這種由消費(fèi)者控制消費(fèi)記錄的機(jī)制,將在下面進(jìn)一步詳細(xì)討論。

*卡夫卡使用消費(fèi)者組和來劃分消費(fèi)和處理記錄的工作。 這些進(jìn)程既可以在同一臺(tái)機(jī)器上運(yùn)行,也可以為了更好的處理可擴(kuò)展性和容錯(cuò)性,將其分布在多臺(tái)機(jī)器上。 所有共享同一個(gè)group.id 的消費(fèi)者實(shí)例屬于同一個(gè)消費(fèi)者組。


* Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. This is achieved by balancing the partitions between all members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions.

*消費(fèi)者組中的每個(gè)消費(fèi)者可以通過{@link #subscribe(Collection,ConsumerRebalanceListener)subscribe} API之一,動(dòng)態(tài)設(shè)置想要訂閱的主題列表。 Kafka將把消費(fèi)者組訂閱的主題,的每條消息傳遞給每個(gè)消費(fèi)者組中的一個(gè)進(jìn)程。 這是通過平衡消費(fèi)者組中的所有成員之間的分區(qū)來實(shí)現(xiàn)的,以便每個(gè)分區(qū)恰好分配給組中的一個(gè)消費(fèi)者。 因此,如果有一個(gè)包含四個(gè)分區(qū)的主題和一個(gè)包含兩個(gè)進(jìn)程的消費(fèi)者組,則每個(gè)進(jìn)程將從兩個(gè)分區(qū)中獲取消息(如:一個(gè)主題有4個(gè)分區(qū),同時(shí)訂閱這個(gè)主題的消費(fèi)者組有兩個(gè)消費(fèi)進(jìn)程。那么每個(gè)消費(fèi)進(jìn)程從連個(gè)分區(qū)獲取記錄)。


* Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved from existing consumers to the new one. This is known as?rebalancing?the group and is discussed in more detail?below. Group rebalancing is also used when new partitions are added to one of the subscribed topics or when a new topic matching a {@link #subscribe(Pattern, ConsumerRebalanceListener) subscribed regex} is created. The group will automatically detect the new partitions through periodic metadata refreshes and assign them to members of the group.

*消費(fèi)者組動(dòng)態(tài)維護(hù)其成員:如果這個(gè)消費(fèi)者組的消費(fèi)者進(jìn)程失敗,分配給它的分區(qū)將被重新分配給同一組中的其他使用者(消費(fèi)者進(jìn)程)。同樣,如果新的消費(fèi)者加入該組,分區(qū)將從現(xiàn)有的消費(fèi)者移動(dòng)到新的消費(fèi)者(添加新的消費(fèi)者到消費(fèi)者組時(shí),分區(qū)將重新分配)。這被稱為“重新平衡”組,并在下面進(jìn)行更詳細(xì)的討論。當(dāng)生成一個(gè)新的分區(qū)或者一個(gè)新的主題時(shí),也會(huì)使用“重新平衡”組,,該消費(fèi)者組定期刷新來自動(dòng)檢測(cè)新分區(qū),并將其分配給組成員。


* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a given topic without duplicating data (additional consumers are actually quite cheap).

從概念上講,你可以把一個(gè)消費(fèi)者組看作是一個(gè)由多個(gè)進(jìn)程組成的單個(gè)邏輯用戶。 作為一個(gè)多用戶系統(tǒng),Kafka支持為給定的主題提供任意數(shù)量的用戶組,而不需要復(fù)制數(shù)據(jù)(添加新的用戶實(shí)際上相當(dāng)簡(jiǎn)單)。


* This is a slight generalization of the functionality that is common in messaging systems. To get semantics similar to a queue in a traditional messaging system all processes would be part of a single consumer group and hence record delivery would be balanced over the group like with a queue. Unlike a traditional messaging system, though, you can have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would have its own consumer group, so each process would subscribe to all the records published to the topic.


* In addition, when group reassignment happens automatically, consumers can be notified through a {@link ConsumerRebalanceListener}, which allows them to finish necessary application-level logic such as state cleanup, manual offset commits, etc. See?Storing Offsets Outside Kafka?for more details.

*此外,當(dāng)分組重新分配自動(dòng)發(fā)生時(shí),消費(fèi)者可以通過{@Link ConsumerRebalanceListener}來獲取通知,這允許消費(fèi)者完成必要的應(yīng)用程序級(jí)邏輯,例如狀態(tài)清除,手動(dòng)偏移提交等。參見在卡夫卡外存儲(chǔ)偏移量以獲取更多詳細(xì)信息。


* It is also possible for the consumer to?manually assign?specific partitions (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition assignment and consumer group coordination will be disabled.Detecting Consumer Failures?After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for a duration of session.timeout.ms, then the consumer will be considered dead and its partitions will be reassigned.

*我們也可以手動(dòng)為消費(fèi)者分配分區(qū)。 當(dāng)使用手動(dòng)分區(qū)時(shí),動(dòng)態(tài)分區(qū)分配和消費(fèi)者組協(xié)調(diào)將被禁用。消費(fèi)者訂閱了一組主題,當(dāng){@link #poll(long)}被調(diào)用時(shí),消費(fèi)者將自動(dòng)加入該消費(fèi)者組。poll (long ) API旨在確保消費(fèi)者存活。 只要您繼續(xù)調(diào)用poll (long ),消費(fèi)者就會(huì)留在消費(fèi)者組中,并繼續(xù)從分配的分區(qū)接收消息。 在這個(gè)方法的底層,消費(fèi)者定期發(fā)送心跳到中心服務(wù)器。 如果消費(fèi)者在session.timeout.ms 期間內(nèi)崩潰或無法發(fā)送心跳,那么消費(fèi)者將被視為死亡,其分區(qū)將被重新分配。


* It is also possible that the consumer could encounter a "livelock" situation where it is continuing to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions indefinitely in this case, we provide a liveness detection mechanism using the max.poll.interval.ms setting. Basically if you don't call poll at least as frequently as the configured max interval, then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}).This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. So to stay in the group, you must continue to call poll.

*消費(fèi)者也有可能遇到“活鎖”的情況,即持續(xù)發(fā)送心跳,但沒有消費(fèi)任何數(shù)據(jù)。 為了防止消費(fèi)者在這種情況下無限期地保持其分區(qū),我們使用max.poll.interval.ms 設(shè)置提供活躍檢測(cè)機(jī)制。 基本上,如果最大時(shí)間間隔max.poll.interval.ms沒有調(diào)用poll函數(shù)來消費(fèi)消息,則客戶端將主動(dòng)離開組,以便另一個(gè)消費(fèi)者可以接管其分區(qū)。 發(fā)生這種情況時(shí),您可能會(huì)看到一個(gè)偏移提交失?。ㄈ缤ㄟ^調(diào)用{@link #commitSync()})拋出的{@link CommitFailedException}所示。這是一個(gè)安全機(jī)制,它保證只有組中存活的成員可以提交偏移。 所以為了要留在消費(fèi)者組中,你必須持續(xù)調(diào)用poll。


* The consumer provides two configuration settings to control the behavior of the poll loop: max.poll.interval.ms: By increasing the interval between expected polls, you can give the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback is that increasing this value may delay a group rebalance since the consumer will only join the rebalance inside the call to poll. You can use this setting to bound the time to finish a rebalance, but you risk slower progress if the consumer cannot actually call {@link #poll(long) poll} often enough. max.poll.records: Use this setting to limit the total records returned from a single call to poll. This can make it easier to predict the maximum that must be handled within each poll interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the impact of group rebalancing.

*消費(fèi)者通過兩個(gè)配置設(shè)置來控制循環(huán)調(diào)用POLL的行為:max.poll.interval.ms :通過增加預(yù)期輪詢之間的時(shí)間間隔,您可以給消費(fèi)者 有更多時(shí)間來處理{@link #poll(long)}返回的一批記錄。 缺點(diǎn)是增加此值可能會(huì)延遲群組重新平衡,因?yàn)橄M(fèi)者只會(huì)在調(diào)用內(nèi)部加入重新平衡。 您可以使用此設(shè)置來限定完成重新平衡所需的時(shí)間,但如果消費(fèi)者不能實(shí)際經(jīng)常調(diào)用{@link #poll(long)poll},則風(fēng)險(xiǎn)會(huì)進(jìn)一步降低。

max。 poll.records :使用此設(shè)置可將單個(gè)調(diào)用返回的記錄總數(shù)限制為輪詢。 這可以更容易地預(yù)測(cè)每個(gè)輪詢間隔內(nèi)必須處理的最大值。 通過調(diào)整此值,您可能會(huì)減少輪詢間隔,這將減少組重新平衡的影響。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容