KafkaConsumer(0.10.11)源碼分析

首先說下簡單的使用,先實(shí)例化一個(gè)KafkaConsumer對(duì)象,再通過對(duì)象的subscribe方法訂閱topic,通過poll方法獲取到數(shù)據(jù)并做相應(yīng)處理,完成處理后,調(diào)用commitSync提交獲取到數(shù)據(jù)的偏移量。


    //初始化
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(new Properties());
    //訂閱
    consumer.subscribe(Collections.singletonList("topic"));
    while (Constant.MQTT_SERVICE_STARTED) {//死循環(huán)一直獲取
        ConsumerRecords<String, String> records = consumer.poll(10000);//獲取數(shù)據(jù)
        for (TopicPartition partition : records.partitions()) {//處理數(shù)據(jù)
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            for (ConsumerRecord<String, String> record : partitionRecords) {
                if (record != null && null != record.value()) {
                    //TODO:處理
                }
            }
            //偏移量處理
            long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
            consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
        }
    }

那么這樣的話,就先從KafkaConsumer的構(gòu)造函數(shù)開始,進(jìn)到KafkaConsumer.java,可以發(fā)現(xiàn),所有的構(gòu)造函數(shù),最終執(zhí)行的都是下面這個(gè)。

簡單的總結(jié)一下這個(gè)構(gòu)造函數(shù)中干了什么,在這里從傳入的properties參數(shù)中讀取到一些相關(guān)的配置,初始化metadata、clientsubscriptions、coordinatorfetcher等等,構(gòu)造函數(shù)的相關(guān)源碼如下,代碼不少,暫時(shí)可以先只了解一下,后續(xù)可以再回過頭來看看這個(gè)方法。


    private KafkaConsumer(ConsumerConfig config,
                          Deserializer<K> keyDeserializer,
                          Deserializer<V> valueDeserializer) {
        try {
            this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            int sessionTimeOutMs = config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG);
            int fetchMaxWaitMs = config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
            if (this.requestTimeoutMs <= sessionTimeOutMs || this.requestTimeoutMs <= fetchMaxWaitMs)
                throw new ConfigException(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG + " should be greater than " + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG + " and " + ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG);
            this.time = new SystemTime();

            String clientId = config.getString(ConsumerConfig.CLIENT_ID_CONFIG);
            if (clientId.length() <= 0)
                clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
            this.clientId = clientId;
            Map<String, String> metricsTags = new LinkedHashMap<>();
            metricsTags.put("client-id", clientId);
            MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                    .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                    .tags(metricsTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                    MetricsReporter.class);
            this.metrics = new Metrics(metricConfig, reporters, time);
            this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);

            // load interceptors and make sure they get clientId
            Map<String, Object> userProvidedConfigs = config.originals();
            userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
            List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
                    ConsumerInterceptor.class);
            this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
            if (keyDeserializer == null) {
                this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                        Deserializer.class);
                this.keyDeserializer.configure(config.originals(), true);
            } else {
                config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
                this.keyDeserializer = keyDeserializer;
            }
            if (valueDeserializer == null) {
                this.valueDeserializer = config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                        Deserializer.class);
                this.valueDeserializer.configure(config.originals(), false);
            } else {
                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
                this.valueDeserializer = valueDeserializer;
            }
            ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList);
            this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners);
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
            this.metadata.update(Cluster.bootstrap(addresses), 0);
            String metricGrpPrefix = "consumer";
            ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
            NetworkClient netClient = new NetworkClient(
                    new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                    this.metadata,
                    clientId,
                    100, // a fixed large enough value will suffice
                    config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                    config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
            this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
                    config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
            OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
            this.subscriptions = new SubscriptionState(offsetResetStrategy);
            List<PartitionAssignor> assignors = config.getConfiguredInstances(
                    ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                    PartitionAssignor.class);
            this.coordinator = new ConsumerCoordinator(this.client,
                    config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                    config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
                    config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
                    config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG),
                    assignors,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    this.time,
                    retryBackoffMs,
                    new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                    config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
                    config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                    this.interceptors,
                    config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
            this.fetcher = new Fetcher<>(this.client,
                    config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
                    config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                    config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                    config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
                    this.keyDeserializer,
                    this.valueDeserializer,
                    this.metadata,
                    this.subscriptions,
                    metrics,
                    metricGrpPrefix,
                    this.time,
                    this.retryBackoffMs);

            config.logUnused();
            AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);

            log.debug("Kafka consumer created");
        } catch (Throwable t) {
            // call close methods if internal objects are already constructed
            // this is to prevent resource leak. see KAFKA-2121
            close(true);
            // now propagate the exception
            throw new KafkaException("Failed to construct kafka consumer", t);
        }
    }

既然初始化完成了,接著就是調(diào)用KafkaConsumer的subscribe方法,訂閱一下topic,下面看一下subscribe做了什么處理。


    @Override
    public void subscribe(Collection<String> topics) {
        subscribe(topics, new NoOpConsumerRebalanceListener());
    }
    @Override
    public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
        acquire();
        try {
            if (topics == null) {
                throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
            } else if (topics.isEmpty()) {
                // treat subscribing to empty topic list as the same as unsubscribing
                this.unsubscribe();
            } else {
                for (String topic : topics) {
                    if (topic == null || topic.trim().isEmpty())
                        throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
                }
                log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
                this.subscriptions.subscribe(new HashSet<>(topics), listener);
                metadata.setTopics(subscriptions.groupSubscription());
            }
        } finally {
            release();
        }
    }

subscribe方法中可以看到,它其實(shí)調(diào)用就是KafkaConsumer構(gòu)造函數(shù)里初始化的subscriptionssubscribe方法,那么就直接接跟進(jìn)這個(gè)方法里。


    public void subscribe(Set<String> topics, ConsumerRebalanceListener listener) {
        if (listener == null)
            throw new IllegalArgumentException("RebalanceListener cannot be null");

        setSubscriptionType(SubscriptionType.AUTO_TOPICS);

        this.listener = listener;

        changeSubscription(topics);
    }

先看第一個(gè)方法setSubscriptionType(SubscriptionType.AUTO_TOPICS),這里傳入了一個(gè)參數(shù)SubscriptionType.AUTO_TOPICS,下面簡單介紹一下這些模式。

這些模式定義在SubscriptionType枚舉類中,KafakaConsumer有三種訂閱模式獲取要消費(fèi)的partiton,其中只有AUTO_TOPICS、AUTO_PATTERN模式才需要加入Group獲取要消費(fèi)的partion,USER_ASSIGNED模式則不用,下面簡單介紹一下三個(gè)模式:

  • AUTO_TOPICS模式:通過訂閱相關(guān)topic,加入到指定Group后,由GroupCoordinator來分配要消費(fèi)的partition。AUTO_TOPICS模式是topic粒度級(jí)別的訂閱
  • AUTO_PATTERN模式:用戶可以指定一個(gè)parttern,consumer需要去獲取所有topics,然后去匹配parttern,匹配上的那些topic就是要消費(fèi)的那些topic,之后和AUTO_TOPICS模式加入Group獲取要消費(fèi)的partition。AUTO_PATTERN模式是topic粒度級(jí)別的訂閱
  • USER_ASSIGNED模式:直接執(zhí)行KafkaConsumer#assign()方法來指定要消費(fèi)的topic-partition。USER_ASSIGNED模式是parition粒度級(jí)別的訂閱

如果是AUTO_TOPICS模式,Consumer會(huì)去broker拉取指定topics的元數(shù)據(jù)。如果是AUTO_PATTERN,Consumer就會(huì)將所有topics的元數(shù)據(jù)拉取下來,然后去匹配獲取真正要消費(fèi)的topics是哪些。

這里只是在subscriptions中設(shè)置了一下subscriptionType值記錄。


    private enum SubscriptionType {
        NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
    }

    private void setSubscriptionType(SubscriptionType type) {
        if (this.subscriptionType == SubscriptionType.NONE)
            this.subscriptionType = type;
        else if (this.subscriptionType != type)
            throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
    }

再看第二個(gè)方法changeSubscription,可以簡單的理解,這里就是記錄一下topic和group:


    /* the list of topics the user has requested */
    private Set<String> subscription;

    /* the list of topics the group has subscribed to (set only for the leader on join group completion) */
    private final Set<String> groupSubscription;

    private void changeSubscription(Set<String> topicsToSubscribe) {
        if (!this.subscription.equals(topicsToSubscribe)) {
            this.subscription = topicsToSubscribe;
            this.groupSubscription.addAll(topicsToSubscribe);
        }
    }

到這里,KafakaConsumer對(duì)象初始化也完成了,topic也訂閱并記錄了,那么接下來就應(yīng)該進(jìn)行連接及數(shù)據(jù)獲取了。

本文開始的時(shí)候,就簡單介紹了使用流程,那么接下來就跟進(jìn)ConsumerRecords<String, String> records = consumer.poll(10000)這一行看下怎么獲取數(shù)據(jù)的,先進(jìn)入到KafkaConsumer.javapoll方法。


    @Override
    public ConsumerRecords<K, V> poll(long timeout) {
        acquire();
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
                throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");

            // poll for new data until the timeout expires
            long start = time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
                if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    fetcher.sendFetches();
                    client.pollNoWakeup();

                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;
                remaining = timeout - elapsed;
            } while (remaining > 0);

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

可以看到,這里又來了一個(gè)do...while循環(huán),每次都是pollOnce后,如果有數(shù)據(jù)返回,發(fā)送新的拉取請(qǐng)求并返回?cái)?shù)據(jù)給業(yè)務(wù)層處理,每次處理都會(huì)計(jì)算消耗時(shí)間,對(duì)比timeout做超時(shí)處理邏輯。

穩(wěn)住,一步步來,先看一下pollOnce里面的處理邏輯。


    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.poll(time.milliseconds());

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        long now = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

        client.poll(pollTimeout, now, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();

        return fetcher.fetchedRecords();
    }

pollOnce方法依舊傳入了超時(shí)時(shí)間,最大為KafkaConsumer.poll(long timeout)的參數(shù)值,并在循環(huán)時(shí)遞減消耗時(shí)間,在這里也是做超時(shí)處理邏輯。

大概介紹一下這個(gè)方法的功能

  • 1.是通過KafkaConsmuer的構(gòu)造函數(shù)中初始化的coordinatorpoll方法,做好相關(guān)的連接處理以及一些未發(fā)送的請(qǐng)求發(fā)送
  • 2.用fetcher.fetchedRecords取出數(shù)據(jù)(fetcher也是在構(gòu)造函數(shù)中初始化的,可以回頭看一下構(gòu)造函數(shù))
  • 3.如果拉取的數(shù)據(jù)不為空,則直接返回結(jié)果
  • 4.如果接取的數(shù)據(jù)為空,這里就通過fetcher.sendFetches發(fā)送一次拉取請(qǐng)求,并再次fetcher.fetchedRecords取一下數(shù)據(jù)再返回結(jié)果

看到這里,就可以看大概知道,獲取數(shù)據(jù)是通過fetcher.fetchedRecords獲取到的,但是我們先不著急去看這個(gè)fetcher.fetchedRecords方法,先跟著代碼走,看第一行coordinator.poll方法里面做了什么,它源碼如下:


    public void poll(long now) {
        invokeCompletedOffsetCommitCallbacks();

        if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
            ensureCoordinatorReady();
            now = time.milliseconds();
        }

        if (needRejoin()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription())
                client.ensureFreshMetadata();

            ensureActiveGroup();
            now = time.milliseconds();
        }

        pollHeartbeat(now);
        maybeAutoCommitOffsetsAsync(now);
    }

還是第一行代碼開始,先簡單說一下invokeCompletedOffsetCommitCallbacks()做了什么,顧名思義,它就回調(diào)完成偏移量提交的方法的,它的源碼如下:


    void invokeCompletedOffsetCommitCallbacks() {
        while (true) {
            OffsetCommitCompletion completion = completedOffsetCommits.poll();
            if (completion == null)
                break;
            completion.invoke();
        }
    }

    private static class OffsetCommitCompletion {
        private final OffsetCommitCallback callback;
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final Exception exception;

        public OffsetCommitCompletion(OffsetCommitCallback callback, Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            this.callback = callback;
            this.offsets = offsets;
            this.exception = exception;
        }

        public void invoke() {
            if (callback != null)
                callback.onComplete(offsets, exception);
        }
    }

completedOffsetCommits就是一個(gè)ConcurrentLinkedQueue,它在ConsumerCoordinator的構(gòu)造函數(shù)中就初始化了,而調(diào)用completion.invoke實(shí)際上就是執(zhí)行了添加到這個(gè)隊(duì)列里的callbackonComplete方法,有點(diǎn)意思了,它說白了就是只做回調(diào)處理的功能。

這個(gè)隊(duì)列主要是以下幾種情況中會(huì)添加數(shù)據(jù),這三個(gè)添加點(diǎn),后續(xù)會(huì)看到相關(guān)的調(diào)用,現(xiàn)在只需要理解它干了什么就行。

  • 1.提交偏移量時(shí),節(jié)點(diǎn)不可用,導(dǎo)致查找節(jié)點(diǎn)失敗時(shí),添加一個(gè)帶異常的OffsetCommitCompletion
  • 2.提交偏移量成功時(shí),添加一個(gè)不帶異常信息的OffsetCommitCompletion
  • 3.提交偏移量失敗時(shí),添加一個(gè)帶異常信息的OffsetCommitCompletion

那么接下來再跟著看coordinator.poll方法,它第二個(gè)執(zhí)行的代碼段如下:


    if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
        ensureCoordinatorReady();
        now = time.milliseconds();
    }

    public boolean partitionsAutoAssigned() {
        return this.subscriptionType == SubscriptionType.AUTO_TOPICS || this.subscriptionType == SubscriptionType.AUTO_PATTERN;
    }

先進(jìn)去subscriptions.partitionsAutoAssigned看一下,發(fā)現(xiàn)它就是用來判斷訂閱topic是用什么方式訂閱的,上文在分析訂閱subscribe方法的源碼時(shí),我們知道只有AUTOTOPICS、AUTOPATTERN模式才需要加入Group獲取要消費(fèi)的partion,USER_ASSIGNED模式則不用,所以這里判斷了是否是USER_ASSIGNED模式。

我們用consumer.subscribe訂閱時(shí),調(diào)用setSubscriptionType默認(rèn)的參數(shù)就是SubscriptionType.AUTO_TOPICS,所以這里需要看一下ensureCoordinatorReady方法是做什么的,源碼如下,有幾個(gè)調(diào)用到的方法,一并都貼出來了。


    public synchronized void ensureCoordinatorReady() {
        while (coordinatorUnknown()) {
            RequestFuture<Void> future = lookupCoordinator();
            client.poll(future);

            if (future.failed()) {
                if (future.isRetriable())
                    client.awaitMetadataUpdate();
                else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                coordinatorDead();
                time.sleep(retryBackoffMs);
            }
        }
    }

    protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (findCoordinatorFuture == null) {
            // find a node to ask about the coordinator
            Node node = this.client.leastLoadedNode();
            if (node == null) {
                // TODO: If there are no brokers left, perhaps we should use the bootstrap set
                // from configuration?
                return RequestFuture.noBrokersAvailable();
            } else
                findCoordinatorFuture = sendGroupCoordinatorRequest(node);
        }
        return findCoordinatorFuture;
    }

    private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
        // initiate the group metadata request
        log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
        GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
        return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
                     .compose(new GroupCoordinatorResponseHandler());
    }

    private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {

        @Override
        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {

            GroupCoordinatorResponse groupCoordinatorResponse = new GroupCoordinatorResponse(resp.responseBody());
            // use MAX_VALUE - node.id as the coordinator id to mimic separate connections
            // for the coordinator in the underlying network client layer
            // TODO: this needs to be better handled in KAFKA-1935
            Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
            clearFindCoordinatorFuture();
            if (error == Errors.NONE) {
                synchronized (AbstractCoordinator.this) {
                    AbstractCoordinator.this.coordinator = new Node(
                            Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
                            groupCoordinatorResponse.node().host(),
                            groupCoordinatorResponse.node().port());
                    log.info("Discovered coordinator {} for group {}.", coordinator, groupId);
                    client.tryConnect(coordinator);
                    heartbeat.resetTimeouts(time.milliseconds());
                }
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                log.debug("Group coordinator lookup for group {} failed: {}", groupId, error.message());
                future.raise(error);
            }
        }

        @Override
        public void onFailure(RuntimeException e, RequestFuture<Void> future) {
            clearFindCoordinatorFuture();
            super.onFailure(e, future);
        }
    }

這里總結(jié)下,它的大概邏輯就是:

  • 1.調(diào)用lookupCoordinator,通過client.leastLoadedNode()找到一個(gè)未完成請(qǐng)求最少的節(jié)點(diǎn),通過它發(fā)送一個(gè)ApiKeys.GROUP_COORDINATOR請(qǐng)求,并添加一個(gè)回調(diào)監(jiān)聽GroupCoordinatorResponseHandler,同進(jìn)
  • 2.通過client.poll發(fā)送請(qǐng)求
  • 3.在GroupCoordinatorResponseHandler的onSuccess中監(jiān)聽結(jié)果處理,如果返回?cái)?shù)據(jù)沒有異常,則調(diào)用client.tryConnect進(jìn)行連接

在這里,我們可以發(fā)現(xiàn),所有以請(qǐng)求都是通過client這個(gè)對(duì)象進(jìn)行處理了,比如client.leastLoadedNode、client.sendclient.tryConnect、client.pollclient.awaitMetadataUpdate

那么這個(gè)對(duì)象怎么來的?再回頭看一下KafkaConsumer的構(gòu)造方法,client是在這里進(jìn)行初始化的,并且在初始化coordinatorfetcher時(shí),都把它傳入進(jìn)去了,也就是說這里用的就是kafkaConsumer里的初始化的client。

調(diào)用的client的這幾個(gè)方法,從方法名上就很容易看出來它干了什么,這里就看一下client.send方法,它主要就是構(gòu)建一些API請(qǐng)求,上面這里sendGroupCoordinatorRequest發(fā)送請(qǐng)求時(shí),傳入的就是ApiKeys.GROUP_COORDINATOR,通進(jìn)client.send方法,會(huì)把它放到一個(gè)unsent的HashMap中等待發(fā)送,只要一調(diào)用的trySend,就會(huì)循環(huán)發(fā)送所有unsent中的請(qǐng)求,client.send方法的源碼如下:


    private RequestFuture<ClientResponse> send(Node node,
                                              ApiKeys api,
                                              short version,
                                              AbstractRequest request) {
        long now = time.milliseconds();
        RequestFutureCompletionHandler completionHandler = new RequestFutureCompletionHandler();
        RequestHeader header = client.nextRequestHeader(api, version);
        RequestSend send = new RequestSend(node.idString(), header, request.toStruct());
        put(node, new ClientRequest(now, true, send, completionHandler));

        // wakeup the client in case it is blocking in poll so that we can send the queued request
        client.wakeup();
        return completionHandler.future;
    }

    private void put(Node node, ClientRequest request) {
        synchronized (this) {
            List<ClientRequest> nodeUnsent = unsent.get(node);
            if (nodeUnsent == null) {
                nodeUnsent = new ArrayList<>();
                unsent.put(node, nodeUnsent);
            }
            nodeUnsent.add(request);
        }
    }
    

那么在哪里會(huì)調(diào)用trySend這個(gè)方法呢?可以發(fā)現(xiàn),只有在ConsumerNetworkClient.poll方法中,才會(huì)調(diào)用trySend把這些數(shù)據(jù)給發(fā)送出去,在上面ensureCoordinatorReady邏輯的第2步,可以看到調(diào)用了client.poll方法,把數(shù)據(jù)一并給發(fā)送出去了,下面這個(gè)是ConsumerNetworkClient.poll方法的源碼。


    public void poll(long timeout, long now, PollCondition pollCondition) {
        // there may be handlers which need to be invoked if we woke up the previous call to poll
        firePendingCompletedRequests();

        synchronized (this) {
            // send all the requests we can send now
            trySend(now);

            // check whether the poll is still needed by the caller. Note that if the expected completion
            // condition becomes satisfied after the call to shouldBlock() (because of a fired completion
            // handler), the client will be woken up.
            if (pollCondition == null || pollCondition.shouldBlock()) {
                // if there are no requests in flight, do not block longer than the retry backoff
                if (client.inFlightRequestCount() == 0)
                    timeout = Math.min(timeout, retryBackoffMs);
                client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
                now = time.milliseconds();
            } else {
                client.poll(0, now);
            }

            // handle any disconnects by failing the active requests. note that disconnects must
            // be checked immediately following poll since any subsequent call to client.ready()
            // will reset the disconnect status
            checkDisconnects(now);

            // trigger wakeups after checking for disconnects so that the callbacks will be ready
            // to be fired on the next call to poll()
            maybeTriggerWakeup();

            // try again to send requests since buffer space may have been
            // cleared or a connect finished in the poll
            trySend(now);

            // fail requests that couldn't be sent if they have expired
            failExpiredRequests(now);
        }

        // called without the lock to avoid deadlock potential if handlers need to acquire locks
        firePendingCompletedRequests();
    }

而調(diào)用trySend實(shí)際上就是調(diào)用了NetworkClient.javasend方法,通過selector.send發(fā)送請(qǐng)求,下面貼上相關(guān)調(diào)用的源碼。


    private boolean trySend(long now) {
        // send any requests that can be sent now
        boolean requestsSent = false;
        for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) {
            Node node = requestEntry.getKey();
            Iterator<ClientRequest> iterator = requestEntry.getValue().iterator();
            while (iterator.hasNext()) {
                ClientRequest request = iterator.next();
                if (client.ready(node, now)) {
                    client.send(request, now);
                    iterator.remove();
                    requestsSent = true;
                }
            }
        }
        return requestsSent;
    }

    @Override
    public void send(ClientRequest request, long now) {
        String nodeId = request.request().destination();
        if (!canSendRequest(nodeId))
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        doSend(request, now);
    }

    private void doSend(ClientRequest request, long now) {
        request.setSendTimeMs(now);
        this.inFlightRequests.add(request);
        selector.send(request.request());
    }

    //selector
    public void send(Send send) {
        KafkaChannel channel = channelOrFail(send.destination());
        try {
            channel.setSend(send);
        } catch (CancelledKeyException e) {
            this.failedSends.add(send.destination());
            close(channel);
        }
    }
    
    //KafkaChannel

    private final TransportLayer transportLayer;
    private Send send;

    public void setSend(Send send) {
        if (this.send != null)
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
    }

    public Send write() throws IOException {
        Send result = null;
        if (send != null && send(send)) {
            result = send;
            send = null;
        }
        return result;
    }

    private boolean send(Send send) throws IOException {
        send.writeTo(transportLayer);
        if (send.completed())
            transportLayer.removeInterestOps(SelectionKey.OP_WRITE);

        return send.completed();
    }

這里就是把數(shù)據(jù)保存到了KafkaChannelsend對(duì)象中,通過調(diào)用selector.pollselector.pollSelectionKeys,再調(diào)用KafkaChannel.writeKafkaChannel.send方法,由send.writeTo將數(shù)據(jù)發(fā)送出去。

好了,跟到這里就當(dāng)它已發(fā)送出去了,有興趣的可以跟進(jìn)KafkaChannelSend去深入看一下,這里暫時(shí)只分析KafkaConsumer的數(shù)據(jù)邏輯。

到這里繞代碼已經(jīng)有點(diǎn)多,目前跟著代碼走,也只是連上節(jié)點(diǎn),還沒有做加入Group處理,先簡單總結(jié)一下

  • 1.判斷是AUTO_TOPICS或AUTO_PATTERN,準(zhǔn)備加入組
  • 2.找到一個(gè)未完成請(qǐng)求最少的節(jié)點(diǎn),通過它發(fā)送一個(gè)GROUP_COORDINATOR請(qǐng)求,并在響應(yīng)后,連上這個(gè)節(jié)點(diǎn)

然后再回到coordinator.poll方法中去,跟著看下一個(gè)要執(zhí)行的代碼塊。


    if (needRejoin()) {
        // due to a race condition between the initial metadata fetch and the initial rebalance,
        // we need to ensure that the metadata is fresh before joining initially. This ensures
        // that we have matched the pattern against the cluster's topics at least once before joining.
        if (subscriptions.hasPatternSubscription())
            client.ensureFreshMetadata();

        ensureActiveGroup();
        now = time.milliseconds();
    }

這里明顯的看到加入組的邏輯代碼,那就直接跟進(jìn)AbstractCoordinatorensureActiveGroup方法中。

首先看到是啟動(dòng)了一個(gè)心跳線程,這個(gè)線程主要是發(fā)送心跳請(qǐng)求,同時(shí)有一些判斷節(jié)點(diǎn)可用并重連、節(jié)點(diǎn)dead判斷、節(jié)點(diǎn)入組離開組等,發(fā)送心跳請(qǐng)求,心跳請(qǐng)求也是調(diào)用的client.send方法,傳入的是ApiKeys.HEARTBEAT


    public void ensureActiveGroup() {
        // always ensure that the coordinator is ready because we may have been disconnected
        // when sending heartbeats and does not necessarily require us to rejoin the group.
        ensureCoordinatorReady();
        startHeartbeatThreadIfNeeded();
        joinGroupIfNeeded();
    }

    private synchronized void startHeartbeatThreadIfNeeded() {
        if (heartbeatThread == null) {
            heartbeatThread = new HeartbeatThread();
            heartbeatThread.start();
        }
    }


    //HeartbeatThread
    synchronized RequestFuture<Void> sendHeartbeatRequest() {
        HeartbeatRequest req = new HeartbeatRequest(this.groupId, this.generation.generationId, this.generation.memberId);
        return client.send(coordinator, ApiKeys.HEARTBEAT, req)
                .compose(new HeartbeatResponseHandler());
    }

這里主要看一下joinGroupIfNeeded方法處理邏輯,這個(gè)方法調(diào)用是initiateJoinGroup通過sendJoinGroupRequest發(fā)送一個(gè)ApiKeys.JOIN_GROUP請(qǐng)求,看到?jīng)],這里我們又調(diào)用了client.send,從上面的源碼分析可以知道,client.send只是把請(qǐng)求放到了一個(gè)unsent的集合中,等待調(diào)用trySend進(jìn)行發(fā)送,而trySend只由client.poll進(jìn)行調(diào)用,所以,這里initiateJoinGroup后,又調(diào)用了client.poll方法,把數(shù)據(jù)發(fā)送出去,源碼如下:


    void joinGroupIfNeeded() {
        while (needRejoin() || rejoinIncomplete()) {
            ensureCoordinatorReady();

            // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
            // time if the client is woken up before a pending rebalance completes. This must be called
            // on each iteration of the loop because an event requiring a rebalance (such as a metadata
            // refresh which changes the matched subscription set) can occur while another rebalance is
            // still in progress.
            if (needsJoinPrepare) {
                onJoinPrepare(generation.generationId, generation.memberId);
                needsJoinPrepare = false;
            }

            RequestFuture<ByteBuffer> future = initiateJoinGroup();
            client.poll(future);
            resetJoinGroupFuture();

            if (future.succeeded()) {
                needsJoinPrepare = true;
                onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
            } else {
                RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;
                time.sleep(retryBackoffMs);
            }
        }
    }   

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        // we store the join future in case we are woken up by the user after beginning the
        // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
        // to rejoin before the pending rebalance has completed.
        if (joinFuture == null) {
            // fence off the heartbeat thread explicitly so that it cannot interfere with the join group.
            // Note that this must come after the call to onJoinPrepare since we must be able to continue
            // sending heartbeats if that callback takes some time.
            disableHeartbeatThread();

            state = MemberState.REBALANCING;
            joinFuture = sendJoinGroupRequest();
            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    // handle join completion in the callback so that the callback will be invoked
                    // even if the consumer is woken up before finishing the rebalance
                    synchronized (AbstractCoordinator.this) {
                        log.info("Successfully joined group {} with generation {}", groupId, generation.generationId);
                        state = MemberState.STABLE;

                        if (heartbeatThread != null)
                            heartbeatThread.enable();
                    }
                }

                @Override
                public void onFailure(RuntimeException e) {
                    // we handle failures below after the request finishes. if the join completes
                    // after having been woken up, the exception is ignored and we will rejoin
                    synchronized (AbstractCoordinator.this) {
                        state = MemberState.UNJOINED;
                    }
                }
            });
        }
        return joinFuture;
    }

    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown())
            return RequestFuture.coordinatorNotAvailable();

        // send a join group request to the coordinator
        log.info("(Re-)joining group {}", groupId);
        JoinGroupRequest request = new JoinGroupRequest(
                groupId,
                this.sessionTimeoutMs,
                this.rebalanceTimeoutMs,
                this.generation.memberId,
                protocolType(),
                metadata());

        log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
        return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
                .compose(new JoinGroupResponseHandler());
    }

跟到這里,加入Group邏輯就整理完了。

再次回到coordinator.poll,上面處理完成后,假設(shè)正常,那么它就連接節(jié)點(diǎn)并加入組,但是并沒有接收消費(fèi)數(shù)據(jù),那接下來再跟著看最一行代碼maybeAutoCommitOffsetsAsync里的邏輯。

maybeAutoCommitOffsetsSync方法會(huì)根據(jù)enable.auto.commit配置項(xiàng)決定是否自動(dòng)提交偏移量,也就是說,如果配置false的話,是不會(huì)自動(dòng)提交偏移量的,既然到這里了,那就跟進(jìn)這個(gè)方法看看它是怎么做的吧,因?yàn)樯厦嬖?code>coordinator.poll的第一行,還有一個(gè)invokeCompletedOffsetCommitCallbacks需要理解下。

先來看這個(gè)doAutoCommitOffsetsAsync方法,它實(shí)際上是通過doAutoCommitOffsetsAsync調(diào)用commitOffsetsAsync方法

    private void doAutoCommitOffsetsAsync() {
        commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception != null) {
                    log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
                    if (exception instanceof RetriableException)
                        nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
                } else {
                    log.debug("Completed autocommit of offsets {} for group {}", offsets, groupId);
                }
            }
        });
    }

    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        invokeCompletedOffsetCommitCallbacks();

        if (!coordinatorUnknown()) {
            doCommitOffsetsAsync(offsets, callback);
        } else {
            // we don't know the current coordinator, so try to find it and then send the commit
            // or fail (we don't want recursive retries which can cause offset commits to arrive
            // out of order). Note that there may be multiple offset commits chained to the same
            // coordinator lookup request. This is fine because the listeners will be invoked in
            // the same order that they were added. Note also that AbstractCoordinator prevents
            // multiple concurrent coordinator lookup requests.
            lookupCoordinator().addListener(new RequestFutureListener<Void>() {
                @Override
                public void onSuccess(Void value) {
                    doCommitOffsetsAsync(offsets, callback);
                }

                @Override
                public void onFailure(RuntimeException e) {
                    completedOffsetCommits.add(new OffsetCommitCompletion(callback, offsets, new RetriableCommitFailedException(e)));
                }
            });
        }

        // ensure the commit has a chance to be transmitted (without blocking on its completion).
        // Note that commits are treated as heartbeats by the coordinator, so there is no need to
        // explicitly allow heartbeats through delayed task execution.
        client.pollNoWakeup();
    }

commitOffsetsAsync可以看到,如果節(jié)點(diǎn)不可用時(shí),會(huì)再次查找節(jié)點(diǎn),并生成一個(gè)OffsetCommitCompletion,添加到completedOffsetCommits隊(duì)列中。

還記得上文說的,在coordinator.poll方法第一行invokeCompletedOffsetCommitCallbacks做了什么嗎?就是遍歷completedOffsetCommits,并一個(gè)個(gè)執(zhí)行OffsetCommitCallback方法,這個(gè)OffsetCommitCallback就是在doAutoCommitOffsetsAsync這個(gè)方法里初始化的。

commitOffsetsAsync在節(jié)點(diǎn)正常的情況就會(huì)從doCommitOffsetsAsync走下去,而這個(gè)方法,它會(huì)調(diào)用sendOffsetCommitRequest發(fā)送提交請(qǐng)求,同時(shí)在這里可以看到,它不論成功或失敗,也都會(huì)生成一個(gè)OffsetCommitCompletion放到completedOffsetCommits隊(duì)列中,在調(diào)用invokeCompletedOffsetCommitCallbacks時(shí),會(huì)處理相關(guān)callback。


    private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
        this.subscriptions.needRefreshCommits();
        RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
        future.addListener(new RequestFutureListener<Void>() {
            @Override
            public void onSuccess(Void value) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);

                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, null));
            }

            @Override
            public void onFailure(RuntimeException e) {
                Exception commitException = e;

                if (e instanceof RetriableException)
                    commitException = new RetriableCommitFailedException(e);

                completedOffsetCommits.add(new OffsetCommitCompletion(cb, offsets, commitException));
            }
        });
    }

好了,接下來看看它是怎么發(fā)送偏移量請(qǐng)求,并獲取到數(shù)據(jù)的。從上面這個(gè)方法看到它實(shí)際上調(diào)用sendOffsetCommitRequest,這個(gè)方法在調(diào)用consumer.commitSync手動(dòng)同步提交偏移量里,也是從這里處理,所以還是得跟進(jìn)里面看看它做了啥。

    private RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.isEmpty())
            return RequestFuture.voidSuccess();

        Node coordinator = coordinator();
        if (coordinator == null)
            return RequestFuture.coordinatorNotAvailable();

        // create the offset commit request
        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<>(offsets.size());
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : offsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = entry.getValue();
            offsetData.put(entry.getKey(), new OffsetCommitRequest.PartitionData(
                    offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
        }

        final Generation generation;
        if (subscriptions.partitionsAutoAssigned())
            generation = generation();
        else
            generation = Generation.NO_GENERATION;

        // if the generation is null, we are not part of an active group (and we expect to be).
        // the only thing we can do is fail the commit and let the user rejoin the group in poll()
        if (generation == null)
            return RequestFuture.failure(new CommitFailedException());

        OffsetCommitRequest req = new OffsetCommitRequest(
                this.groupId,
                generation.generationId,
                generation.memberId,
                OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                offsetData);

        log.trace("Sending offset-commit request with {} to coordinator {} for group {}", offsets, coordinator, groupId);

        return client.send(coordinator, ApiKeys.OFFSET_COMMIT, req)
                .compose(new OffsetCommitResponseHandler(offsets));
    }

這個(gè)方法最后走的是client.send發(fā)送ApiKeys.OFFSET_COMMIT,同時(shí)添加了一個(gè)OffsetCommitResponseHandler的回調(diào)處理。

下面來看一下OffsetCommitResponseHandler類,它到底干了啥?在發(fā)送ApiKeys.OFFSET_COMMIT請(qǐng)求成功后,它獲取到的response做了什么處理。


    private class OffsetCommitResponseHandler extends CoordinatorResponseHandler<OffsetCommitResponse, Void> {

        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public OffsetCommitResponse parse(ClientResponse response) {
            return new OffsetCommitResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            sensors.commitLatency.record(response.requestLatencyMs());
            Set<String> unauthorizedTopics = new HashSet<>();

            for (Map.Entry<TopicPartition, Short> entry : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                long offset = offsetAndMetadata.offset();

                Errors error = Errors.forCode(entry.getValue());
                if (error == Errors.NONE) {
                    log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
                    if (subscriptions.isAssigned(tp))
                        // update the local cache only if the partition is still assigned
                        subscriptions.committed(tp, offsetAndMetadata);
                } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    log.error("Not authorized to commit offsets for group {}", groupId);
                    future.raise(new GroupAuthorizationException(groupId));
                    return;
                } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    unauthorizedTopics.add(tp.topic());
                } else if (error == Errors.OFFSET_METADATA_TOO_LARGE
                        || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                    // raise the error to the user
                    log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
                    future.raise(error);
                    return;
                } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    // just retry
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    future.raise(error);
                    return;
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
                        || error == Errors.NOT_COORDINATOR_FOR_GROUP
                        || error == Errors.REQUEST_TIMED_OUT) {
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    coordinatorDead();
                    future.raise(error);
                    return;
                } else if (error == Errors.UNKNOWN_MEMBER_ID
                        || error == Errors.ILLEGAL_GENERATION
                        || error == Errors.REBALANCE_IN_PROGRESS) {
                    // need to re-join group
                    log.debug("Offset commit for group {} failed: {}", groupId, error.message());
                    resetGeneration();
                    future.raise(new CommitFailedException());
                    return;
                } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                    log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
                    future.raise(new KafkaException("Partition " + tp + " may not exist or user may not have Describe access to topic"));
                    return;
                } else {
                    log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
                    future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
                    return;
                }
            }

            if (!unauthorizedTopics.isEmpty()) {
                log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }
    }

從handle方法中,看到它獲取到了OffsetAndMetadata,這個(gè)就是數(shù)據(jù)的偏移量元數(shù)據(jù)。這里,我們就只看沒有異常的處理邏輯,它最終調(diào)用的是subscriptions.committed(tp, offsetAndMetadata)將數(shù)據(jù)保存到assignment中,這個(gè)subscriptions又是何方神圣?

回過頭來,再看看KafkaConsumer的構(gòu)造函數(shù),它就是在這里生成的。

    
    OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
    this.subscriptions = new SubscriptionState(offsetResetStrategy);

好了,知道subscriptions是啥,也是知道這些偏移量元數(shù)據(jù)是保存在subscriptionsassignment中了,到這里,整個(gè)coordinator.poll方法也跟著走完了,是時(shí)候再往上一級(jí),看看KafkaConsumerpollOnce了,直接把這個(gè)方法的源碼再貼一下,不用再向上翻了。


    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        coordinator.poll(time.milliseconds());

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        // if data is available already, return it immediately
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty())
            return records;

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        long now = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);

        client.poll(pollTimeout, now, new PollCondition() {
            @Override
            public boolean shouldBlock() {
                // since a fetch might be completed by the background thread, we need this poll condition
                // to ensure that we do not block unnecessarily in poll()
                return !fetcher.hasCompletedFetches();
            }
        });

        // after the long poll, we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.needRejoin())
            return Collections.emptyMap();

        return fetcher.fetchedRecords();
    }

從這個(gè)方法可以看出,它最終返回的結(jié)果是fetcher.fetchedRecords(),并且在coordinator.poll結(jié)束后,會(huì)先調(diào)用一下fetcher.fetchedRecords()獲取數(shù)據(jù),如果有結(jié)果的話,直接返回結(jié)果,反之就調(diào)用fetcher.sendFetches()發(fā)送拉取請(qǐng)求,然后再返回fetcher.fetchedRecords()。發(fā)送請(qǐng)求,再拉取數(shù)據(jù),可以簡單這么理解。

這個(gè)fetcher是哪來的?再回過頭來看一下KafkaConsumer的構(gòu)造函數(shù),它就是在這里生成的,那么首先我們跟進(jìn)去看一下發(fā)送請(qǐng)求fetcher.sendFetches()它到底干了些什么。


    public void sendFetches() {
        for (Map.Entry<Node, FetchRequest> fetchEntry : createFetchRequests().entrySet()) {
            final FetchRequest request = fetchEntry.getValue();
            final Node fetchTarget = fetchEntry.getKey();

            client.send(fetchTarget, ApiKeys.FETCH, request)
                    .addListener(new RequestFutureListener<ClientResponse>() {
                        @Override
                        public void onSuccess(ClientResponse resp) {
                            FetchResponse response = new FetchResponse(resp.responseBody());
                            if (!matchesRequestedPartitions(request, response)) {
                                // obviously we expect the broker to always send us valid responses, so this check
                                // is mainly for test cases where mock fetch responses must be manually crafted.
                                log.warn("Ignoring fetch response containing partitions {} since it does not match " +
                                        "the requested partitions {}", response.responseData().keySet(),
                                        request.fetchData().keySet());
                                return;
                            }

                            Set<TopicPartition> partitions = new HashSet<>(response.responseData().keySet());
                            FetchResponseMetricAggregator metricAggregator = new FetchResponseMetricAggregator(sensors, partitions);

                            for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry : response.responseData().entrySet()) {
                                TopicPartition partition = entry.getKey();
                                long fetchOffset = request.fetchData().get(partition).offset;
                                FetchResponse.PartitionData fetchData = entry.getValue();
                                completedFetches.add(new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator));
                            }

                            sensors.fetchLatency.record(resp.requestLatencyMs());
                            sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
                        }

                        @Override
                        public void onFailure(RuntimeException e) {
                            log.debug("Fetch request to {} failed", fetchTarget, e);
                        }
                    });
        }
    }

看到?jīng)],它又是調(diào)用client.send發(fā)送了一個(gè)ApiKeys.FETCH的請(qǐng)求,然后在onSuccess進(jìn)行數(shù)據(jù)處理。

在Listener中可以看到,它最終在for循環(huán)中,把拉取到的數(shù)據(jù),保存到completedFetches這個(gè)隊(duì)列里,放到這個(gè)隊(duì)列里,就是給fetcher.fetchedRecords()方法獲取數(shù)據(jù)用的,那么接下來就看一下fetcher.fetchedRecords()獲取邏輯。


    private PartitionRecords<K, V> nextInLineRecords = null;

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
        int recordsRemaining = maxPollRecords;

        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
                CompletedFetch completedFetch = completedFetches.poll();
                if (completedFetch == null)
                    break;

                nextInLineRecords = parseFetchedData(completedFetch);
            } else {
                TopicPartition partition = nextInLineRecords.partition;

                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
                    if (currentRecords == null) {
                        drained.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        drained.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }

        return drained;
    }

這個(gè)方法先從completedFetches這個(gè)隊(duì)列中取出來一個(gè),通過parseFetchedData解析獲取到它的PartitionRecords。


    private PartitionRecords<K, V> parseFetchedData(CompletedFetch completedFetch) {
        TopicPartition tp = completedFetch.partition;
        FetchResponse.PartitionData partition = completedFetch.partitionData;
        long fetchOffset = completedFetch.fetchedOffset;
        int bytes = 0;
        int recordsCount = 0;
        PartitionRecords<K, V> parsedRecords = null;
        Errors error = Errors.forCode(partition.errorCode);

        try {
            if (!subscriptions.isFetchable(tp)) {
                // this can happen when a rebalance happened or a partition consumption paused
                // while fetch is still in-flight
                log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", tp);
            } else if (error == Errors.NONE) {
                // we are interested in this fetch only if the beginning offset matches the
                // current consumed position
                Long position = subscriptions.position(tp);
                if (position == null || position != fetchOffset) {
                    log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
                            "the expected offset {}", tp, fetchOffset, position);
                    return null;
                }

                ByteBuffer buffer = partition.recordSet;
                MemoryRecords records = MemoryRecords.readableRecords(buffer);
                List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                for (LogEntry logEntry : records) {
                    // Skip the messages earlier than current position.
                    if (logEntry.offset() >= position) {
                        parsed.add(parseRecord(tp, logEntry));
                        bytes += logEntry.size();
                    }
                }

                recordsCount = parsed.size();
                this.sensors.recordTopicFetchMetrics(tp.topic(), bytes, recordsCount);

                if (!parsed.isEmpty()) {
                    log.trace("Adding fetched record for partition {} with offset {} to buffered record list", tp, position);
                    parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed);
                    ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1);
                    this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset());
                }
            } else if (error == Errors.NOT_LEADER_FOR_PARTITION) {
                log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
                this.metadata.requestUpdate();
            } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition " +
                        "may not exist or the user may not have Describe access to it", tp);
                this.metadata.requestUpdate();
            } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
                if (fetchOffset != subscriptions.position(tp)) {
                    log.debug("Discarding stale fetch response for partition {} since the fetched offset {}" +
                            "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
                    log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
                    subscriptions.needOffsetReset(tp);
                } else {
                    throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
                }
            } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                log.warn("Not authorized to read from topic {}.", tp.topic());
                throw new TopicAuthorizationException(Collections.singleton(tp.topic()));
            } else if (error == Errors.UNKNOWN) {
                log.warn("Unknown error fetching data for topic-partition {}", tp);
            } else {
                throw new IllegalStateException("Unexpected error code " + error.code() + " while fetching data");
            }
        } finally {
            completedFetch.metricAggregator.record(tp, bytes, recordsCount);
        }

        // we move the partition to the end if we received some bytes or if there was an error. This way, it's more
        // likely that partitions for the same topic can remain together (allowing for more efficient serialization).
        if (bytes > 0 || error != Errors.NONE)
            subscriptions.movePartitionToEnd(tp);

        return parsedRecords;
    }

這里面有一個(gè)判斷subscriptions.isFetchable(tp),就是通過sendOffsetCommitRequest獲取到的OffsetAndMetadata,它保存的就是在subscriptionsassignment中,就是提交偏移量后,在回調(diào)里進(jìn)行保存的數(shù)據(jù)。

    public boolean isFetchable(TopicPartition tp) {
        return isAssigned(tp) && assignedState(tp).isFetchable();
    }

    public boolean isAssigned(TopicPartition tp) {
        return assignment.contains(tp);
    }

接下來跟著看正常情況下走error == Errors.NONE的邏輯,它會(huì)根據(jù)subscriptions中這個(gè)TopicPartition所在的Position,生成一個(gè)parsedRecords返回交由fetcher.fetchedRecords()的while循環(huán)繼續(xù)處理,再返回到fetcher.fetchedRecords(),這里再貼下fetcher.fetchedRecords()的代碼:


    private PartitionRecords<K, V> nextInLineRecords = null;

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
        int recordsRemaining = maxPollRecords;

        while (recordsRemaining > 0) {
            if (nextInLineRecords == null || nextInLineRecords.isDrained()) {
                CompletedFetch completedFetch = completedFetches.poll();
                if (completedFetch == null)
                    break;

                nextInLineRecords = parseFetchedData(completedFetch);
            } else {
                TopicPartition partition = nextInLineRecords.partition;

                List<ConsumerRecord<K, V>> records = drainRecords(nextInLineRecords, recordsRemaining);
                if (!records.isEmpty()) {
                    List<ConsumerRecord<K, V>> currentRecords = drained.get(partition);
                    if (currentRecords == null) {
                        drained.put(partition, records);
                    } else {
                        // this case shouldn't usually happen because we only send one fetch at a time per partition,
                        // but it might conceivably happen in some rare cases (such as partition leader changes).
                        // we have to copy to a new list because the old one may be immutable
                        List<ConsumerRecord<K, V>> newRecords = new ArrayList<>(records.size() + currentRecords.size());
                        newRecords.addAll(currentRecords);
                        newRecords.addAll(records);
                        drained.put(partition, newRecords);
                    }
                    recordsRemaining -= records.size();
                }
            }
        }

        return drained;
    }

接下來跟著看while循環(huán),在if中的處理,正常獲取到數(shù)據(jù)的話,會(huì)給nextInLineRecords賦值并不為空,所以直接看else代碼塊邏輯處理,它通過drainRecords解析出數(shù)據(jù),并將數(shù)據(jù)添加了drained這個(gè)HashMap中,跳出循環(huán),返回?cái)?shù)據(jù)。


    private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int maxRecords) {
        if (partitionRecords.isDrained())
            return Collections.emptyList();

        if (!subscriptions.isAssigned(partitionRecords.partition)) {
            // this can happen when a rebalance happened before fetched records are returned to the consumer's poll call
            log.debug("Not returning fetched records for partition {} since it is no longer assigned", partitionRecords.partition);
        } else {
            // note that the consumed position should always be available as long as the partition is still assigned
            long position = subscriptions.position(partitionRecords.partition);
            if (!subscriptions.isFetchable(partitionRecords.partition)) {
                // this can happen when a partition is paused before fetched records are returned to the consumer's poll call
                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", partitionRecords.partition);
            } else if (partitionRecords.fetchOffset == position) {
                // we are ensured to have at least one record since we already checked for emptiness
                List<ConsumerRecord<K, V>> partRecords = partitionRecords.drainRecords(maxRecords);
                long nextOffset = partRecords.get(partRecords.size() - 1).offset() + 1;

                log.trace("Returning fetched records at offset {} for assigned partition {} and update " +
                        "position to {}", position, partitionRecords.partition, nextOffset);

                subscriptions.position(partitionRecords.partition, nextOffset);
                return partRecords;
            } else {
                // these records aren't next in line based on the last consumed position, ignore them
                // they must be from an obsolete request
                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}",
                        partitionRecords.partition, partitionRecords.fetchOffset, position);
            }
        }

        partitionRecords.drain();
        return Collections.emptyList();
    }

好了,到這里,整個(gè)獲取數(shù)據(jù)的流程都跟看分析完了,我們知道,在new KafkaConsumer<>(new Properties())的這個(gè)properties中,如果沒有配置enable.auto.commit的話,是不會(huì)自動(dòng)提交偏移量的,所以,在獲取完成數(shù)據(jù)并處理后,需要手動(dòng)提交一下偏移量,調(diào)用consumer.commitSync,這里最后再跟進(jìn)去看一下偏移量手動(dòng)提交的源碼吧。


    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        invokeCompletedOffsetCommitCallbacks();

        if (offsets.isEmpty())
            return;

        while (true) {
            ensureCoordinatorReady();

            RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
            client.poll(future);

            if (future.succeeded()) {
                if (interceptors != null)
                    interceptors.onCommit(offsets);
                return;
            }

            if (!future.isRetriable())
                throw future.exception();

            time.sleep(retryBackoffMs);
        }
    }

它實(shí)際上調(diào)用的還是sendOffsetCommitRequest,和上面在coordinator.poll中調(diào)用的maybeAutoCommitOffsetsAsync走的是一個(gè)邏輯,只不過在這里,它是一個(gè)同步的調(diào)用,需要等待提交完成后并返回結(jié)果才能繼續(xù)處理,這里就不再重復(fù)分析了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一.概述KafkaConsumer類 Kafka Consumer不是個(gè)線程安全的類。為了便于分析,我們認(rèn)為消費(fèi)者...
    陳陽001閱讀 1,652評(píng)論 0 1
  • Consumer 示例 一個(gè)比較常見的 Consumer 示例代碼如下所示,其主要包含一下幾個(gè)步驟: 構(gòu)造 Pro...
    tracy_668閱讀 357評(píng)論 0 3
  • 通過前面的介紹,我們知道了offset操作的原理。這一節(jié)主要介紹消費(fèi)者如何從服務(wù)端獲取消息,KafkaConsum...
    陳陽001閱讀 2,322評(píng)論 0 3
  • 1. 從本質(zhì)上來說,大腦發(fā)育的核心在于寶寶與一位有愛心、能做出積極回應(yīng)的成年人之間的互動(dòng)。 2. 如果父母只顧往孩...
    向日葵01閱讀 160評(píng)論 0 1
  • 父親的一言一行對(duì)子女影響至深,尤其是對(duì)兒子。 蘇軾的父親蘇洵(1009—1066年),字明允,他“為人聰明,辯智過...
    開心墻頭草閱讀 1,810評(píng)論 3 17

友情鏈接更多精彩內(nèi)容