Kafka源碼分析-Consumer(11)-總結(jié)(1)

現(xiàn)在回顧KafkaConsumer的整體架構(gòu)。KafkaConsumer依賴SubscriptionState管理訂閱topic集合和Partition的消費狀態(tài),通過ConsumerCoordinator與服務(wù)端的GroupCoordinator交互,完成Rebalance操作并請求最近提交的offset。Fetcher負責(zé)從Kafka拉取消息并進行解析,同時參與position的重置操作,提供獲取指定topic的集群元數(shù)據(jù)的操作。
上面所有的請求都是通過ConsumerNetworkClient緩存并發(fā)送的,在ConsumerNetworkClient還維護了定時任務(wù)隊列,用來完成HeartbeatTask任務(wù)和AutoCommit任務(wù),NetworkClient在接收到上述請求的響應(yīng)時會調(diào)用相應(yīng)的回調(diào),最終交給對應(yīng)的*Handler以及RequestFuture的監(jiān)聽器進行處理。
KafkaConsumer的整體架構(gòu)圖如下:


KafkaConsumer整體架構(gòu).jpg

下面分析下KafkaConsumer剩余的代碼:
KafkaConsumer不是一個線程安全的類,為了防止多線程并發(fā)操作造成的一致性問題,KafkaConsumer提供了多線程并發(fā)的檢測機制,涉及到的方法是acquire()和release()。這兩個方法的代碼如下:

 /**
     * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
     * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
     * supported).
     * @throws IllegalStateException if the consumer has been closed
     * @throws ConcurrentModificationException if another thread already has the lock
     */
    private void acquire() {
        ensureNotClosed();
        long threadId = Thread.currentThread().getId();
        //記錄當前線程Id,通過CAS操作完成
        if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
            throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
        //記錄重入次數(shù)
        refcount.incrementAndGet();
    }

/**
     * Release the light lock protecting the consumer from multi-threaded access.
     */
    private void release() {
        if (refcount.decrementAndGet() == 0)
            //更新線程id
            currentThread.set(NO_CURRENT_THREAD);
    }

上面的這兩個方法并不是一種鎖的實現(xiàn),僅僅是實現(xiàn)了檢測多線程并發(fā)操作的檢測。使用CAS保證線程間的可見性。
分析下KafkaConsumer.poll()方法進行消息消費的整個流程以及相關(guān)代碼如下:

    public ConsumerRecords<K, V> poll(long timeout) {
        acquire();//防止多線程操作。
        try {
            if (timeout < 0)
                throw new IllegalArgumentException("Timeout must not be negative");

            // poll for new data until the timeout expires
            long start = time.milliseconds();
            long remaining = timeout;
            do {
                Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);//核心方法
                if (!records.isEmpty()) {//檢測是否有消息返回
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    // Additionally, pollNoWakeup does not allow automatic commits to get triggered.
                    // 為了提升效率,在對records集合進行處理之前,先發(fā)送一次FetchRequest。這樣,線程處理完
                    // 本次records集合的操作,與 FetchRequest 及其響應(yīng)在網(wǎng)絡(luò)上傳輸以及在服務(wù)端的處理就變成并行
                    // 這樣就減少等待網(wǎng)絡(luò)IO的時間。
                    fetcher.sendFetches();//創(chuàng)建并緩存 FetchRequest
                    
                    //調(diào)用ConsumerNetworkClient.pollNoWakeUp()方法將FetchRequest發(fā)送
                    //出去。這里的pollNoWakeup()方法并不會阻塞,不能被中斷,不會執(zhí)行定時任務(wù)
                    client.pollNoWakeup();

                    if (this.interceptors == null)
                        return new ConsumerRecords<>(records);
                    else
                        //調(diào)用ConsumerInterceptors
                        return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

                long elapsed = time.milliseconds() - start;//計算超時時間
                remaining = timeout - elapsed;
            } while (remaining > 0);

            return ConsumerRecords.empty();
        } finally {
            release();
        }
    }

在消費完成后,客戶端還要commit offset,手動提交調(diào)offset用commitSync(),手動異步提交用commitAsync(),自動commit offset使用定時任務(wù)AutoCommitTask。
在pollOnce()方法中先通過ConsumerCoordinator與GroupCoordinator交互完成Rebalance操作,之后從GroupCoordinator獲取最近一次提交的offset(或重置position),最后才是使用Fetcher,從Kafka獲取消息進行消費。pollOnce()方法如下:

/**
     * Do one round of polling. In addition to checking for new data, this does any needed
     * heart-beating, auto-commits, and offset updates.
     * @param timeout The maximum time to block in the underlying poll
     * @return The fetched records (may be empty)
     */
    private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
        // ensure we have partitions assigned if we expect to
        
        //如果是AUTO_TOPICS或AUTO_PATTERN訂閱模式
        if (subscriptions.partitionsAutoAssigned())
            coordinator.ensurePartitionAssignment();//完成rebalance操作

        // fetch positions if we have partitions we're subscribed to that we
        // don't know the offset for
        //恢復(fù)SubscriptionState中對應(yīng)的TopicPartitionState狀態(tài)
        //主要是committed字段和position字段
        if (!subscriptions.hasAllFetchPositions())
            updateFetchPositions(this.subscriptions.missingFetchPositions());

        long now = time.milliseconds();

        // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
        client.executeDelayedTasks(now);//執(zhí)行定時任務(wù),HeartbeatTask和AutoCommitTask

        // init any new fetches (won't resend pending fetches)
        //嘗試從completedFetches緩存中解析消息
        Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();

        // if data is available already, e.g. from a previous network client poll() call to commit,
        // then just return it immediately
        if (!records.isEmpty())
            return records;

        fetcher.sendFetches();//創(chuàng)建并緩存FetchRequest請求
        client.poll(timeout, now);//發(fā)送FetchRequest
        return fetcher.fetchedRecords();//從completedFetches緩存中解析消息
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容