首先說下簡單的使用,先實(shí)例化一個(gè)KafkaConsumer對(duì)象,再通過對(duì)象的subscribe方法訂閱topic,通過poll方法獲取到數(shù)據(jù)并做相應(yīng)處理,完成處理后,調(diào)用commitSync提交獲取到數(shù)據(jù)的偏移量。
//初始化
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(new Properties());
//訂閱
consumer.subscribe(Collections.singletonList("topic"));
while (Constant.MQTT_SERVICE_STARTED) {//死循環(huán)一直獲取
ConsumerRecords<String, String> records = consumer.poll(10000);//獲取數(shù)據(jù)
for (TopicPartition partition : records.partitions()) {//處理數(shù)據(jù)
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
if (record != null && null != record.value()) {
//TODO:處理
}
}
//偏移量處理
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
那么這樣的話,就先從KafkaConsumer的構(gòu)造函數(shù)開始,進(jìn)到KafkaConsumer.java,可以發(fā)現(xiàn),所有的構(gòu)造函數(shù),最終執(zhí)行的都是下面這個(gè)。
簡單的總結(jié)一下這個(gè)構(gòu)造函數(shù)中干了什么,在這里從傳入的properties參數(shù)中讀取到一些相關(guān)的配置,初始化metadata、client、subscriptions、coordinator、fetcher等等,構(gòu)造函數(shù)的相關(guān)源碼如下,代碼不少,暫時(shí)可以先只了解一下,后續(xù)可以再回過頭來看看這個(gè)方法。
private KafkaConsumer(ConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer) {
try {
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
this.time = new SystemTime();
String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
Map<String, String> metricsTags = new LinkedHashMap<>();
metricsTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
this.metrics = new Metrics(metricConfig, reporters, time);
this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
// load interceptors and make sure they get clientId
Map<String, Object> userProvidedConfigs = config.originals();
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.keyDeserializer.configure(config.originals(), true);
} else {
config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
this.keyDeserializer = keyDeserializer;
}
if (valueDeserializer == null) {
this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
this.valueDeserializer.configure(config.originals(), false);
} else {
config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
this.valueDeserializer = valueDeserializer;
}
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
this.metadata.update(Cluster.bootstrap(addresses), 0);
String metricGrpPrefix = "consumer";
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
NetworkClient netClient = new NetworkClient(
new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
this.metadata,
clientId,
100, // a fixed large enough value will suffice
config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List<PartitionAssignor> assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
PartitionAssignor.class);
this.coordinator = new ConsumerCoordinator(this.client,
config.getString(ConsumerConfig.GROUP_ID_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
assignors,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
retryBackoffMs,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
this.interceptors,
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
this.fetcher = new Fetcher<>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
this.retryBackoffMs);
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka consumer created");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
}
}
既然初始化完成了,接著就是調(diào)用KafkaConsumer的subscribe方法,訂閱一下topic,下面看一下subscribe做了什么處理。
@Override
public void subscribe(Collection<String> topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
acquire();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
} else if (topics.isEmpty()) {
// treat subscribing to empty topic list as the same as unsubscribing
this.unsubscribe();
} else {
for (String topic : topics) {
if (topic == null || topic.trim().isEmpty())
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
release();
}
}
從subscribe方法中可以看到,它其實(shí)調(diào)用就是KafkaConsumer構(gòu)造函數(shù)里初始化的subscriptions的subscribe方法,那么就直接接跟進(jìn)這個(gè)方法里。
public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
if (listener == null)
throw new IllegalArgumentException("RebalanceListener cannot be null");
setSubscriptionType(SubscriptionType.AUTO_TOPICS);
this.listener = listener;
changeSubscription(topics);
}
先看第一個(gè)方法setSubscriptionType(SubscriptionType.AUTO_TOPICS),這里傳入了一個(gè)參數(shù)SubscriptionType.AUTO_TOPICS,下面簡單介紹一下這些模式。
這些模式定義在SubscriptionType枚舉類中,KafakaConsumer有三種訂閱模式獲取要消費(fèi)的partiton,其中只有AUTO_TOPICS、AUTO_PATTERN模式才需要加入Group獲取要消費(fèi)的partion,USER_ASSIGNED模式則不用,下面簡單介紹一下三個(gè)模式:
- AUTO_TOPICS模式:通過訂閱相關(guān)topic,加入到指定Group后,由GroupCoordinator來分配要消費(fèi)的partition。AUTO_TOPICS模式是topic粒度級(jí)別的訂閱
- AUTO_PATTERN模式:用戶可以指定一個(gè)parttern,consumer需要去獲取所有topics,然后去匹配parttern,匹配上的那些topic就是要消費(fèi)的那些topic,之后和AUTO_TOPICS模式加入Group獲取要消費(fèi)的partition。AUTO_PATTERN模式是topic粒度級(jí)別的訂閱
- USER_ASSIGNED模式:直接執(zhí)行KafkaConsumer#assign()方法來指定要消費(fèi)的topic-partition。USER_ASSIGNED模式是parition粒度級(jí)別的訂閱
如果是AUTO_TOPICS模式,Consumer會(huì)去broker拉取指定topics的元數(shù)據(jù)。如果是AUTO_PATTERN,Consumer就會(huì)將所有topics的元數(shù)據(jù)拉取下來,然后去匹配獲取真正要消費(fèi)的topics是哪些。
這里只是在subscriptions中設(shè)置了一下subscriptionType值記錄。
private enum SubscriptionType {
NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
}
private void setSubscriptionType(SubscriptionType type) {
if (this.subscriptionType == SubscriptionType.NONE)
this.subscriptionType = type;
else if (this.subscriptionType != type)
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
}
再看第二個(gè)方法changeSubscription,可以簡單的理解,這里就是記錄一下topic和group:
/* the list of topics the user has requested */
private Set<String> subscription;
/* the list of topics the group has subscribed to (set only for the leader on join group completion) */
private final Set<String> groupSubscription;
private void changeSubscription(Set<String> topicsToSubscribe) {
if (!this.subscription.equals(topicsToSubscribe)) {
this.subscription = topicsToSubscribe;
this.groupSubscription.addAll(topicsToSubscribe);
}
}
到這里,KafakaConsumer對(duì)象初始化也完成了,topic也訂閱并記錄了,那么接下來就應(yīng)該進(jìn)行連接及數(shù)據(jù)獲取了。
本文開始的時(shí)候,就簡單介紹了使用流程,那么接下來就跟進(jìn)ConsumerRecords<String, String> records = consumer.poll(10000)這一行看下怎么獲取數(shù)據(jù)的,先進(jìn)入到KafkaConsumer.java的poll方法。
@Override
public ConsumerRecords<K, V> poll(long timeout) {
acquire();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
fetcher.sendFetches();
client.pollNoWakeup();
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
long elapsed = time.milliseconds() - start;
remaining = timeout - elapsed;
} while (remaining > 0);
return ConsumerRecords.empty();
} finally {
release();
}
}
可以看到,這里又來了一個(gè)do...while循環(huán),每次都是pollOnce后,如果有數(shù)據(jù)返回,發(fā)送新的拉取請(qǐng)求并返回?cái)?shù)據(jù)給業(yè)務(wù)層處理,每次處理都會(huì)計(jì)算消耗時(shí)間,對(duì)比timeout做超時(shí)處理邏輯。
穩(wěn)住,一步步來,先看一下pollOnce里面的處理邏輯。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.poll(time.milliseconds());
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}
pollOnce方法依舊傳入了超時(shí)時(shí)間,最大為KafkaConsumer.poll(long timeout)的參數(shù)值,并在循環(huán)時(shí)遞減消耗時(shí)間,在這里也是做超時(shí)處理邏輯。
大概介紹一下這個(gè)方法的功能
- 1.是通過
KafkaConsmuer的構(gòu)造函數(shù)中初始化的coordinator的poll方法,做好相關(guān)的連接處理以及一些未發(fā)送的請(qǐng)求發(fā)送 - 2.用
fetcher.fetchedRecords取出數(shù)據(jù)(fetcher也是在構(gòu)造函數(shù)中初始化的,可以回頭看一下構(gòu)造函數(shù)) - 3.如果拉取的數(shù)據(jù)不為空,則直接返回結(jié)果
- 4.如果接取的數(shù)據(jù)為空,這里就通過
fetcher.sendFetches發(fā)送一次拉取請(qǐng)求,并再次fetcher.fetchedRecords取一下數(shù)據(jù)再返回結(jié)果
看到這里,就可以看大概知道,獲取數(shù)據(jù)是通過fetcher.fetchedRecords獲取到的,但是我們先不著急去看這個(gè)fetcher.fetchedRecords方法,先跟著代碼走,看第一行coordinator.poll方法里面做了什么,它源碼如下:
public void poll(long now) {
invokeCompletedOffsetCommitCallbacks();
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
ensureCoordinatorReady();
now = time.milliseconds();
}
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
ensureActiveGroup();
now = time.milliseconds();
}
pollHeartbeat(now);
maybeAutoCommitOffsetsAsync(now);
}
還是第一行代碼開始,先簡單說一下invokeCompletedOffsetCommitCallbacks()做了什么,顧名思義,它就回調(diào)完成偏移量提交的方法的,它的源碼如下:
void invokeCompletedOffsetCommitCallbacks() {
while (true) {
OffsetCommitCompletion completion = completedOffsetCommits.poll();
if (completion == null)
break;
completion.invoke();
}
}
private static class OffsetCommitCompletion {
private final OffsetCommitCallback callback;
private final Map<TopicPartition, OffsetAndMetadata> offsets;
private final Exception exception;
public OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
this.callback = callback;
this.offsets = offsets;
this.exception = exception;
}
public void invoke() {
if (callback != null)
callback.onComplete(offsets, exception);
}
}
completedOffsetCommits就是一個(gè)ConcurrentLinkedQueue,它在ConsumerCoordinator的構(gòu)造函數(shù)中就初始化了,而調(diào)用completion.invoke實(shí)際上就是執(zhí)行了添加到這個(gè)隊(duì)列里的callback的onComplete方法,有點(diǎn)意思了,它說白了就是只做回調(diào)處理的功能。
這個(gè)隊(duì)列主要是以下幾種情況中會(huì)添加數(shù)據(jù),這三個(gè)添加點(diǎn),后續(xù)會(huì)看到相關(guān)的調(diào)用,現(xiàn)在只需要理解它干了什么就行。
- 1.提交偏移量時(shí),節(jié)點(diǎn)不可用,導(dǎo)致查找節(jié)點(diǎn)失敗時(shí),添加一個(gè)帶異常的
OffsetCommitCompletion - 2.提交偏移量成功時(shí),添加一個(gè)不帶異常信息的
OffsetCommitCompletion - 3.提交偏移量失敗時(shí),添加一個(gè)帶異常信息的
OffsetCommitCompletion
那么接下來再跟著看coordinator.poll方法,它第二個(gè)執(zhí)行的代碼段如下:
if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
ensureCoordinatorReady();
now = time.milliseconds();
}
public boolean partitionsAutoAssigned() {
return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
}
先進(jìn)去subscriptions.partitionsAutoAssigned看一下,發(fā)現(xiàn)它就是用來判斷訂閱topic是用什么方式訂閱的,上文在分析訂閱subscribe方法的源碼時(shí),我們知道只有AUTOTOPICS、AUTOPATTERN模式才需要加入Group獲取要消費(fèi)的partion,USER_ASSIGNED模式則不用,所以這里判斷了是否是USER_ASSIGNED模式。
我們用consumer.subscribe訂閱時(shí),調(diào)用setSubscriptionType默認(rèn)的參數(shù)就是SubscriptionType.AUTO_TOPICS,所以這里需要看一下ensureCoordinatorReady方法是做什么的,源碼如下,有幾個(gè)調(diào)用到的方法,一并都貼出來了。
public synchronized void ensureCoordinatorReady() {
while (coordinatorUnknown()) {
RequestFuture<Void> future = lookupCoordinator();
client.poll(future);
if (future.failed()) {
if (future.isRetriable())
client.awaitMetadataUpdate();
else
throw future.exception();
} else if (coordinator != null && client.connectionFailed(coordinator)) {
// we found the coordinator, but the connection has failed, so mark
// it dead and backoff before retrying discovery
coordinatorDead();
time.sleep(retryBackoffMs);
}
}
}
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
// TODO: If there are no brokers left, perhaps we should use the bootstrap set
// from configuration?
return RequestFuture.noBrokersAvailable();
} else
findCoordinatorFuture = sendGroupCoordinatorRequest(node);
}
return findCoordinatorFuture;
}
private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
// initiate the group metadata request
log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
.compose(new GroupCoordinatorResponseHandler());
}
private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
@Override
public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
// use MAX_VALUE - node.id as the coordinator id to mimic separate connections
// for the coordinator in the underlying network client layer
// TODO: this needs to be better handled in KAFKA-1935
Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
clearFindCoordinatorFuture();
if (error == Errors.NONE) {
synchronized (AbstractCoordinator.this) {
AbstractCoordinator.this.coordinator = new Node(
Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
groupCoordinatorResponse.node().host(),
groupCoordinatorResponse.node().port());
log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
client.tryConnect(coordinator);
heartbeat.resetTimeouts(time.milliseconds());
}
future.complete(null);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());
future.raise(error);
}
}
@Override
public void onFailure(RuntimeException e, RequestFuture<Void> future) {
clearFindCoordinatorFuture();
super.onFailure(e, future);
}
}
這里總結(jié)下,它的大概邏輯就是:
- 1.調(diào)用
lookupCoordinator,通過client.leastLoadedNode()找到一個(gè)未完成請(qǐng)求最少的節(jié)點(diǎn),通過它發(fā)送一個(gè)ApiKeys.GROUP_COORDINATOR請(qǐng)求,并添加一個(gè)回調(diào)監(jiān)聽GroupCoordinatorResponseHandler,同進(jìn) - 2.通過
client.poll發(fā)送請(qǐng)求 - 3.在
GroupCoordinatorResponseHandler的onSuccess中監(jiān)聽結(jié)果處理,如果返回?cái)?shù)據(jù)沒有異常,則調(diào)用client.tryConnect進(jìn)行連接
在這里,我們可以發(fā)現(xiàn),所有以請(qǐng)求都是通過client這個(gè)對(duì)象進(jìn)行處理了,比如client.leastLoadedNode、client.send、client.tryConnect、client.poll、client.awaitMetadataUpdate。
那么這個(gè)對(duì)象怎么來的?再回頭看一下KafkaConsumer的構(gòu)造方法,client是在這里進(jìn)行初始化的,并且在初始化coordinator、fetcher時(shí),都把它傳入進(jìn)去了,也就是說這里用的就是kafkaConsumer里的初始化的client。
調(diào)用的client的這幾個(gè)方法,從方法名上就很容易看出來它干了什么,這里就看一下client.send方法,它主要就是構(gòu)建一些API請(qǐng)求,上面這里sendGroupCoordinatorRequest發(fā)送請(qǐng)求時(shí),傳入的就是ApiKeys.GROUP_COORDINATOR,通進(jìn)client.send方法,會(huì)把它放到一個(gè)unsent的HashMap中等待發(fā)送,只要一調(diào)用的trySend,就會(huì)循環(huán)發(fā)送所有unsent中的請(qǐng)求,client.send方法的源碼如下:
private RequestFuture<ClientResponse> send(Node node,
ApiKeys api,
short version,
AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
RequestHeader header = client.nextRequestHeader(api, version);
RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
put(node, new ClientRequest(now, true, send, completionHandler));
// wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
return completionHandler.future;
}
private void put(Node node, ClientRequest request) {
synchronized (this) {
List<ClientRequest> nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
nodeUnsent = new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
}
}
那么在哪里會(huì)調(diào)用trySend這個(gè)方法呢?可以發(fā)現(xiàn),只有在ConsumerNetworkClient.poll方法中,才會(huì)調(diào)用trySend把這些數(shù)據(jù)給發(fā)送出去,在上面ensureCoordinatorReady邏輯的第2步,可以看到調(diào)用了client.poll方法,把數(shù)據(jù)一并給發(fā)送出去了,下面這個(gè)是ConsumerNetworkClient.poll方法的源碼。
public void poll(long timeout, long now, PollCondition pollCondition) {
// there may be handlers which need to be invoked if we woke up the previous call to poll
firePendingCompletedRequests();
synchronized (this) {
// send all the requests we can send now
trySend(now);
// check whether the poll is still needed by the caller. Note that if the expected completion
// condition becomes satisfied after the call to shouldBlock() (because of a fired completion
// handler), the client will be woken up.
if (pollCondition == null || pollCondition.shouldBlock()) {
// if there are no requests in flight, do not block longer than the retry backoff
if (client.inFlightRequestCount() == 0)
timeout = Math.min(timeout, retryBackoffMs);
client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now = time.milliseconds();
} else {
client.poll(0, now);
}
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
// trigger wakeups after checking for disconnects so that the callbacks will be ready
// to be fired on the next call to poll()
maybeTriggerWakeup();
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(now);
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();
}
而調(diào)用trySend實(shí)際上就是調(diào)用了NetworkClient.java的send方法,通過selector.send發(fā)送請(qǐng)求,下面貼上相關(guān)調(diào)用的源碼。
private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
Node node = requestEntry.getKey();
Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {
ClientRequest request = iterator.next();
if (client.ready(node, now)) {
client.send(request, now);
iterator.remove();
requestsSent = true;
}
}
}
return requestsSent;
}
@Override
public void send(ClientRequest request, long now) {
String nodeId = request.request().destination();
if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
doSend(request, now);
}
private void doSend(ClientRequest request, long now) {
request.setSendTimeMs(now);
this.inFlightRequests.add(request);
selector.send(request.request());
}
//selector
public void send(Send send) {
KafkaChannel channel = channelOrFail(send.destination());
try {
channel.setSend(send);
} catch (CancelledKeyException e) {
this.failedSends.add(send.destination());
close(channel);
}
}
//KafkaChannel
private final TransportLayer transportLayer;
private Send send;
public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
public Send write() throws IOException {
Send result = null;
if (send != null && send(send)) {
result = send;
send = null;
}
return result;
}
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
這里就是把數(shù)據(jù)保存到了KafkaChannel的send對(duì)象中,通過調(diào)用selector.poll到selector.pollSelectionKeys,再調(diào)用KafkaChannel.write到KafkaChannel.send方法,由send.writeTo將數(shù)據(jù)發(fā)送出去。
好了,跟到這里就當(dāng)它已發(fā)送出去了,有興趣的可以跟進(jìn)KafkaChannel和Send去深入看一下,這里暫時(shí)只分析KafkaConsumer的數(shù)據(jù)邏輯。
到這里繞代碼已經(jīng)有點(diǎn)多,目前跟著代碼走,也只是連上節(jié)點(diǎn),還沒有做加入Group處理,先簡單總結(jié)一下
- 1.判斷是AUTO_TOPICS或AUTO_PATTERN,準(zhǔn)備加入組
- 2.找到一個(gè)未完成請(qǐng)求最少的節(jié)點(diǎn),通過它發(fā)送一個(gè)
GROUP_COORDINATOR請(qǐng)求,并在響應(yīng)后,連上這個(gè)節(jié)點(diǎn)
然后再回到coordinator.poll方法中去,跟著看下一個(gè)要執(zhí)行的代碼塊。
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
ensureActiveGroup();
now = time.milliseconds();
}
這里明顯的看到加入組的邏輯代碼,那就直接跟進(jìn)AbstractCoordinator的ensureActiveGroup方法中。
首先看到是啟動(dòng)了一個(gè)心跳線程,這個(gè)線程主要是發(fā)送心跳請(qǐng)求,同時(shí)有一些判斷節(jié)點(diǎn)可用并重連、節(jié)點(diǎn)dead判斷、節(jié)點(diǎn)入組離開組等,發(fā)送心跳請(qǐng)求,心跳請(qǐng)求也是調(diào)用的client.send方法,傳入的是ApiKeys.HEARTBEAT。
public void ensureActiveGroup() {
// always ensure that the coordinator is ready because we may have been disconnected
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();
startHeartbeatThreadIfNeeded();
joinGroupIfNeeded();
}
private synchronized void startHeartbeatThreadIfNeeded() {
if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
}
}
//HeartbeatThread
synchronized RequestFuture<Void> sendHeartbeatRequest() {
HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
return client.send(coordinator, ApiKeys.HEARTBEAT, req)
.compose(new HeartbeatResponseHandler());
}
這里主要看一下joinGroupIfNeeded方法處理邏輯,這個(gè)方法調(diào)用是initiateJoinGroup通過sendJoinGroupRequest發(fā)送一個(gè)ApiKeys.JOIN_GROUP請(qǐng)求,看到?jīng)],這里我們又調(diào)用了client.send,從上面的源碼分析可以知道,client.send只是把請(qǐng)求放到了一個(gè)unsent的集合中,等待調(diào)用trySend進(jìn)行發(fā)送,而trySend只由client.poll進(jìn)行調(diào)用,所以,這里initiateJoinGroup后,又調(diào)用了client.poll方法,把數(shù)據(jù)發(fā)送出去,源碼如下:
void joinGroupIfNeeded() {
while (needRejoin() || rejoinIncomplete()) {
ensureCoordinatorReady();
// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes. This must be called
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}
RequestFuture<ByteBuffer> future = initiateJoinGroup();
client.poll(future);
resetJoinGroupFuture();
if (future.succeeded()) {
needsJoinPrepare = true;
onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
} else {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
exception instanceof IllegalGenerationException)
continue;
else if (!future.isRetriable())
throw exception;
time.sleep(retryBackoffMs);
}
}
}
private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
// we store the join future in case we are woken up by the user after beginning the
// rebalance in the call to poll below. This ensures that we do not mistakenly attempt
// to rejoin before the pending rebalance has completed.
if (joinFuture == null) {
// fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
// Note that this must come after the call to onJoinPrepare since we must be able to continue
// sending heartbeats if that callback takes some time.
disableHeartbeatThread();
state = MemberState.REBALANCING;
joinFuture = sendJoinGroupRequest();
joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
@Override
public void onSuccess(ByteBuffer value) {
// handle join completion in the callback so that the callback will be invoked
// even if the consumer is woken up before finishing the rebalance
synchronized (AbstractCoordinator.this) {
log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);
state = MemberState.STABLE;
if (heartbeatThread != null)
heartbeatThread.enable();
}
}
@Override
public void onFailure(RuntimeException e) {
// we handle failures below after the request finishes. if the join completes
// after having been woken up, the exception is ignored and we will rejoin
synchronized (AbstractCoordinator.this) {
state = MemberState.UNJOINED;
}
}
});
}
return joinFuture;
}
private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
log.info("(Re-)joining group {}", groupId);
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
this.rebalanceTimeoutMs,
this.generation.memberId,
protocolType(),
metadata());
log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
}
跟到這里,加入Group邏輯就整理完了。
再次回到coordinator.poll,上面處理完成后,假設(shè)正常,那么它就連接節(jié)點(diǎn)并加入組,但是并沒有接收消費(fèi)數(shù)據(jù),那接下來再跟著看最一行代碼maybeAutoCommitOffsetsAsync里的邏輯。
maybeAutoCommitOffsetsSync方法會(huì)根據(jù)enable.auto.commit配置項(xiàng)決定是否自動(dòng)提交偏移量,也就是說,如果配置false的話,是不會(huì)自動(dòng)提交偏移量的,既然到這里了,那就跟進(jìn)這個(gè)方法看看它是怎么做的吧,因?yàn)樯厦嬖?code>coordinator.poll的第一行,還有一個(gè)invokeCompletedOffsetCommitCallbacks需要理解下。
先來看這個(gè)doAutoCommitOffsetsAsync方法,它實(shí)際上是通過doAutoCommitOffsetsAsync調(diào)用commitOffsetsAsync方法
private void doAutoCommitOffsetsAsync() {
commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
if (exception instanceof RetriableException)
nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
} else {
log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
}
}
});
}
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
doCommitOffsetsAsync(offsets, callback);
}
@Override
public void onFailure(RuntimeException e) {
completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
}
});
}
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}
從commitOffsetsAsync可以看到,如果節(jié)點(diǎn)不可用時(shí),會(huì)再次查找節(jié)點(diǎn),并生成一個(gè)OffsetCommitCompletion,添加到completedOffsetCommits隊(duì)列中。
還記得上文說的,在coordinator.poll方法第一行invokeCompletedOffsetCommitCallbacks做了什么嗎?就是遍歷completedOffsetCommits,并一個(gè)個(gè)執(zhí)行OffsetCommitCallback方法,這個(gè)OffsetCommitCallback就是在doAutoCommitOffsetsAsync這個(gè)方法里初始化的。
commitOffsetsAsync在節(jié)點(diǎn)正常的情況就會(huì)從doCommitOffsetsAsync走下去,而這個(gè)方法,它會(huì)調(diào)用sendOffsetCommitRequest發(fā)送提交請(qǐng)求,同時(shí)在這里可以看到,它不論成功或失敗,也都會(huì)生成一個(gè)OffsetCommitCompletion放到completedOffsetCommits隊(duì)列中,在調(diào)用invokeCompletedOffsetCommitCallbacks時(shí),會(huì)處理相關(guān)callback。
private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
this.subscriptions.needRefreshCommits();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
future.addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
if (interceptors != null)
interceptors.onCommit(offsets);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
}
@Override
public void onFailure(RuntimeException e) {
Exception commitException = e;
if (e instanceof RetriableException)
commitException = new RetriableCommitFailedException(e);
completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
}
});
}
好了,接下來看看它是怎么發(fā)送偏移量請(qǐng)求,并獲取到數(shù)據(jù)的。從上面這個(gè)方法看到它實(shí)際上調(diào)用sendOffsetCommitRequest,這個(gè)方法在調(diào)用consumer.commitSync手動(dòng)同步提交偏移量里,也是從這里處理,所以還是得跟進(jìn)里面看看它做了啥。
private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
if (offsets.isEmpty())
return RequestFuture.voidSuccess();
Node coordinator = coordinator();
if (coordinator == null)
return RequestFuture.coordinatorNotAvailable();
// create the offset commit request
Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
OffsetAndMetadata offsetAndMetadata = entry.getValue();
offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
}
final Generation generation;
if (subscriptions.partitionsAutoAssigned())
generation = generation();
else
generation = Generation.NO_GENERATION;
// if the generation is null, we are not part of an active group (and we expect to be).
// the only thing we can do is fail the commit and let the user rejoin the group in poll()
if (generation == null)
return RequestFuture.failure(new CommitFailedException());
OffsetCommitRequest req = new OffsetCommitRequest(
this.groupId,
generation.generationId,
generation.memberId,
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);
return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
.compose(new OffsetCommitResponseHandler(offsets));
}
這個(gè)方法最后走的是client.send發(fā)送ApiKeys.OFFSET_COMMIT,同時(shí)添加了一個(gè)OffsetCommitResponseHandler的回調(diào)處理。
下面來看一下OffsetCommitResponseHandler類,它到底干了啥?在發(fā)送ApiKeys.OFFSET_COMMIT請(qǐng)求成功后,它獲取到的response做了什么處理。
private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {
private final Map<TopicPartition, OffsetAndMetadata> offsets;
public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
this.offsets = offsets;
}
@Override
public OffsetCommitResponse parse(ClientResponse response) {
return new OffsetCommitResponse(response.responseBody());
}
@Override
public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
sensors.commitLatency.record(response.requestLatencyMs());
Set<String> unauthorizedTopics = new HashSet<>();
for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
TopicPartition tp = entry.getKey();
OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
long offset = offsetAndMetadata.offset();
Errors error = Errors.forCode(entry.getValue());
if (error == Errors.NONE) {
log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offsetAndMetadata);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
log.error("Not authorized to commit offsets for group {}", groupId);
future.raise(new GroupAuthorizationException(groupId));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
unauthorizedTopics.add(tp.topic());
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE
|| error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
// raise the error to the user
log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// just retry
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP
|| error == Errors.REQUEST_TIMED_OUT) {
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
coordinatorDead();
future.raise(error);
return;
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
log.debug("Offset commit for group {} failed: {}", groupId, error.message());
resetGeneration();
future.raise(new CommitFailedException());
return;
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
return;
} else {
log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
return;
}
}
if (!unauthorizedTopics.isEmpty()) {
log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else {
future.complete(null);
}
}
}
從handle方法中,看到它獲取到了OffsetAndMetadata,這個(gè)就是數(shù)據(jù)的偏移量元數(shù)據(jù)。這里,我們就只看沒有異常的處理邏輯,它最終調(diào)用的是subscriptions.committed(tp, offsetAndMetadata)將數(shù)據(jù)保存到assignment中,這個(gè)subscriptions又是何方神圣?
回過頭來,再看看KafkaConsumer的構(gòu)造函數(shù),它就是在這里生成的。
OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
好了,知道subscriptions是啥,也是知道這些偏移量元數(shù)據(jù)是保存在subscriptions的assignment中了,到這里,整個(gè)coordinator.poll方法也跟著走完了,是時(shí)候再往上一級(jí),看看KafkaConsumer的pollOnce了,直接把這個(gè)方法的源碼再貼一下,不用再向上翻了。
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
coordinator.poll(time.milliseconds());
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());
// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}
從這個(gè)方法可以看出,它最終返回的結(jié)果是fetcher.fetchedRecords(),并且在coordinator.poll結(jié)束后,會(huì)先調(diào)用一下fetcher.fetchedRecords()獲取數(shù)據(jù),如果有結(jié)果的話,直接返回結(jié)果,反之就調(diào)用fetcher.sendFetches()發(fā)送拉取請(qǐng)求,然后再返回fetcher.fetchedRecords()。發(fā)送請(qǐng)求,再拉取數(shù)據(jù),可以簡單這么理解。
這個(gè)fetcher是哪來的?再回過頭來看一下KafkaConsumer的構(gòu)造函數(shù),它就是在這里生成的,那么首先我們跟進(jìn)去看一下發(fā)送請(qǐng)求fetcher.sendFetches()它到底干了些什么。
public void sendFetches() {
for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
client.send(fetchTarget, ApiKeys.FETCH, request)
.addListener(new RequestFutureListener<ClientResponse>() {
@Override
public void onSuccess(ClientResponse resp) {
FetchResponse response = new FetchResponse(resp.responseBody());
if (!matchesRequestedPartitions(request, response)) {
// obviously we expect the broker to always send us valid responses, so this check
// is mainly for test cases where mock fetch responses must be manually crafted.
log.warn("Ignoring fetch response containing partitions {} since it does not match " +
"the requested partitions {}", response.responseData().keySet(),
request.fetchData().keySet());
return;
}
Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);
for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
TopicPartition partition = entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData = entry.getValue();
completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
}
sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
public void onFailure(RuntimeException e) {
log.debug("Fetch request to {} failed", fetchTarget, e);
}
});
}
}
看到?jīng)],它又是調(diào)用client.send發(fā)送了一個(gè)ApiKeys.FETCH的請(qǐng)求,然后在onSuccess進(jìn)行數(shù)據(jù)處理。
在Listener中可以看到,它最終在for循環(huán)中,把拉取到的數(shù)據(jù),保存到completedFetches這個(gè)隊(duì)列里,放到這個(gè)隊(duì)列里,就是給fetcher.fetchedRecords()方法獲取數(shù)據(jù)用的,那么接下來就看一下fetcher.fetchedRecords()獲取邏輯。
private PartitionRecords<K, V> nextInLineRecords = null;
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
nextInLineRecords = parseFetchedData(completedFetch);
} else {
TopicPartition partition = nextInLineRecords.partition;
List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
if (currentRecords == null) {
drained.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
return drained;
}
這個(gè)方法先從completedFetches這個(gè)隊(duì)列中取出來一個(gè),通過parseFetchedData解析獲取到它的PartitionRecords。
private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
TopicPartition tp = completedFetch.partition;
FetchResponse.PartitionData partition = completedFetch.partitionData;
long fetchOffset = completedFetch.fetchedOffset;
int bytes = 0;
int recordsCount = 0;
PartitionRecords<K, V> parsedRecords = null;
Errors error = Errors.forCode(partition.errorCode);
try {
if (!subscriptions.isFetchable(tp)) {
// this can happen when a rebalance happened or a partition consumption paused
// while fetch is still in-flight
log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
} else if (error == Errors.NONE) {
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
ByteBuffer buffer = partition.recordSet;
MemoryRecords records = MemoryRecords.readableRecords(buffer);
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
for (LogEntry logEntry : records) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
bytes += logEntry.size();
}
}
recordsCount = parsed.size();
this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);
if (!parsed.isEmpty()) {
log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
}
} else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
this.metadata.requestUpdate();
} else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
"may not exist or the user may not have Describe access to it", tp);
this.metadata.requestUpdate();
} else if (error == Errors.OFFSET_OUT_OF_RANGE) {
if (fetchOffset != subscriptions.position(tp)) {
log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
"does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
} else if (subscriptions.hasDefaultOffsetResetPolicy()) {
log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
subscriptions.needOffsetReset(tp);
} else {
throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
}
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
log.warn("Not authorized to read from topic {}.", tp.topic());
throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
} else if (error == Errors.UNKNOWN) {
log.warn("Unknown error fetching data for topic-partition {}", tp);
} else {
throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
}
} finally {
completedFetch.metricAggregator.record(tp, bytes, recordsCount);
}
// we move the partition to the end if we received some bytes or if there was an error. This way, it's more
// likely that partitions for the same topic can remain together (allowing for more efficient serialization).
if (bytes > 0 || error != Errors.NONE)
subscriptions.movePartitionToEnd(tp);
return parsedRecords;
}
這里面有一個(gè)判斷subscriptions.isFetchable(tp),就是通過sendOffsetCommitRequest獲取到的OffsetAndMetadata,它保存的就是在subscriptions的assignment中,就是提交偏移量后,在回調(diào)里進(jìn)行保存的數(shù)據(jù)。
public boolean isFetchable(TopicPartition tp) {
return isAssigned(tp) && assignedState(tp).isFetchable();
}
public boolean isAssigned(TopicPartition tp) {
return assignment.contains(tp);
}
接下來跟著看正常情況下走error == Errors.NONE的邏輯,它會(huì)根據(jù)subscriptions中這個(gè)TopicPartition所在的Position,生成一個(gè)parsedRecords返回交由fetcher.fetchedRecords()的while循環(huán)繼續(xù)處理,再返回到fetcher.fetchedRecords(),這里再貼下fetcher.fetchedRecords()的代碼:
private PartitionRecords<K, V> nextInLineRecords = null;
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
nextInLineRecords = parseFetchedData(completedFetch);
} else {
TopicPartition partition = nextInLineRecords.partition;
List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
if (!records.isEmpty()) {
List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
if (currentRecords == null) {
drained.put(partition, records);
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining -= records.size();
}
}
}
return drained;
}
接下來跟著看while循環(huán),在if中的處理,正常獲取到數(shù)據(jù)的話,會(huì)給nextInLineRecords賦值并不為空,所以直接看else代碼塊邏輯處理,它通過drainRecords解析出數(shù)據(jù),并將數(shù)據(jù)添加了drained這個(gè)HashMap中,跳出循環(huán),返回?cái)?shù)據(jù)。
private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
if (partitionRecords.isDrained())
return Collections.emptyList();
if (!subscriptions.isAssigned(partitionRecords.partition)) {
// this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
} else {
// note that the consumed position should always be available as long as the partition is still assigned
long position = subscriptions.position(partitionRecords.partition);
if (!subscriptions.isFetchable(partitionRecords.partition)) {
// this can happen when a partition is paused before fetched records are returned to the consumer's poll call
log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
} else if (partitionRecords.fetchOffset == position) {
// we are ensured to have at least one record since we already checked for emptiness
List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;
log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
"position to {}", position, partitionRecords.partition, nextOffset);
subscriptions.position(partitionRecords.partition, nextOffset);
return partRecords;
} else {
// these records aren't next in line based on the last consumed position, ignore them
// they must be from an obsolete request
log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
partitionRecords.partition, partitionRecords.fetchOffset, position);
}
}
partitionRecords.drain();
return Collections.emptyList();
}
好了,到這里,整個(gè)獲取數(shù)據(jù)的流程都跟看分析完了,我們知道,在new KafkaConsumer<>(new Properties())的這個(gè)properties中,如果沒有配置enable.auto.commit的話,是不會(huì)自動(dòng)提交偏移量的,所以,在獲取完成數(shù)據(jù)并處理后,需要手動(dòng)提交一下偏移量,調(diào)用consumer.commitSync,這里最后再跟進(jìn)去看一下偏移量手動(dòng)提交的源碼吧。
public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
invokeCompletedOffsetCommitCallbacks();
if (offsets.isEmpty())
return;
while (true) {
ensureCoordinatorReady();
RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
client.poll(future);
if (future.succeeded()) {
if (interceptors != null)
interceptors.onCommit(offsets);
return;
}
if (!future.isRetriable())
throw future.exception();
time.sleep(retryBackoffMs);
}
}
它實(shí)際上調(diào)用的還是sendOffsetCommitRequest,和上面在coordinator.poll中調(diào)用的maybeAutoCommitOffsetsAsync走的是一個(gè)邏輯,只不過在這里,它是一個(gè)同步的調(diào)用,需要等待提交完成后并返回結(jié)果才能繼續(xù)處理,這里就不再重復(fù)分析了。