紹圣--kafka之消費(fèi)者(五)

消費(fèi)者能發(fā)送拉取請求的前提條件是:1,消費(fèi)者已經(jīng)連接上了服務(wù)端協(xié)調(diào)者所在的節(jié)點(diǎn);2,消費(fèi)者必須獲取到服務(wù)端協(xié)調(diào)者分配給此消費(fèi)者的分區(qū)。

消費(fèi)者協(xié)調(diào)者和服務(wù)端的協(xié)調(diào)者之間是通過心跳來維持關(guān)系的:讓消費(fèi)者能聯(lián)系上協(xié)調(diào)者,讓協(xié)調(diào)者知道消費(fèi)者的存在。但是當(dāng)兩邊某一方出現(xiàn)問題的時(shí)候,會(huì)發(fā)生什么?

1,消費(fèi)者沒有發(fā)送心跳(消費(fèi)者發(fā)生故障),協(xié)調(diào)者應(yīng)該知道有消費(fèi)者離開了消費(fèi)組,需要對消費(fèi)組內(nèi)所有的消費(fèi)者重新分配分區(qū)。

2,服務(wù)端協(xié)調(diào)者發(fā)生故障,服務(wù)端會(huì)自己容錯(cuò)選出一個(gè)新的協(xié)調(diào)者節(jié)點(diǎn)來管理消費(fèi)組。消費(fèi)者必須等待一定的時(shí)間重新詢問服務(wù)端是否選擇出新的協(xié)調(diào)者,如果還沒有選出,就再等一段時(shí)間再詢問。如果已經(jīng)選出新的協(xié)調(diào)者節(jié)點(diǎn),消費(fèi)者必須重新與其建立連接,并向協(xié)調(diào)者發(fā)送獲取分配的分區(qū)信息請求。

消費(fèi)者為了獲取協(xié)調(diào)者分配的分區(qū),每個(gè)消費(fèi)者都要發(fā)送加入組請求給協(xié)調(diào)者。

消費(fèi)者加入消費(fèi)組

消費(fèi)者發(fā)送加入消費(fèi)組請求的方法在:AbstractCoordinator.ensureActiveGroup(),消費(fèi)者每次輪詢操作都會(huì)調(diào)用該方法,但是并不意味著每次輪詢都會(huì)發(fā)送加入組請求。因?yàn)楹罄m(xù)發(fā)送拉取請求必須有分區(qū),所以加入消費(fèi)組請求必須采用阻塞的輪詢等待異步請求完成。異步請求完成后將分配的分區(qū)結(jié)果設(shè)置到訂閱狀態(tài)中(SubscriptionState)。

AbstractCoordinator.ensureActiveGroup()

public void ensureActiveGroup() {

ensureCoordinatorReady(); // 確保連接上服務(wù)端協(xié)調(diào)者

startHeartbeatThreadIfNeeded(); // 啟動(dòng)心跳發(fā)送線程(啟動(dòng)并不一定立即發(fā)送心跳,滿足一定條件后才會(huì)發(fā)送心跳)

joinGroupIfNeeded(); // 發(fā)送加入組請求

}

void joinGroupIfNeeded() {

// 首先判斷是否需要重新加入消費(fèi)組

// 再看上一次的加入請求完成否:異步請求對象是否為空。不等于NULL表示異步請求未完成(異步請求完成后會(huì)對異步請求對象設(shè)置為空表示這次發(fā)送請求完成)

// while循環(huán)是為了確保一定要消費(fèi)者加入消費(fèi)組中:發(fā)送加入組請求是阻塞的,拿到異步請求的結(jié)果,如果不成功,就會(huì)進(jìn)行循環(huán),加入組成功后設(shè)置needRejoin()為false

while (needRejoin() || rejoinIncomplete()) {

ensureCoordinatorReady();

// 初始為true,執(zhí)行一次后更新為false,完成后又設(shè)置為true

if (needsJoinPrepare) {

// 如果是定義自動(dòng)提交偏移量,那么在發(fā)送加入組請求之前必須提交本地保存的最新的偏移量 onJoinPrepare(generation.generationId, generation.memberId);

needsJoinPrepare = false;

}

// 初始化JoinGroup請求,并發(fā)送該請求,future此異步請求是加入消費(fèi)組在組合模式中新創(chuàng)建的異步請求

RequestFuture<ByteBuffer> future = initiateJoinGroup();

// 阻塞式的客戶端輪詢確保異步請求完成后才會(huì)返回

// 完成的概念是等待異步請求結(jié)果回來并調(diào)用callback中的方法,才算是完成 client.poll(future);

// 重置“重新加入消費(fèi)組是否完成”對象為空

resetJoinGroupFuture();

if (future.succeeded()) { // 加入組請求完成,這一步時(shí),實(shí)際上同步組也已經(jīng)成功了 needsJoinPrepare = true;

// 完成加入

onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());

} else {

// 異步請求完成,但是有異常,重新發(fā)送加入組請求

// 有異常時(shí):while循環(huán)中needRejoin()返回true,rejoinIncomplete()返回false,繼續(xù)執(zhí)行。

// 在while循環(huán)中initiateJoinGroup()在rejoinIncomplete()返回false的情況下,會(huì)重新發(fā)送加入組請求

RuntimeException exception = future.exception();

if (exception instanceof UnknownMemberIdException || exception instanceof RebalanceInProgressException || exception instanceof IllegalGenerationException) continue;

else if (!future.isRetriable())

throw exception; time.sleep(retryBackoffMs);

}

}

}

注意:initiateJoinGroup()返回的加入組請求的異步請求(組合模式中新創(chuàng)建的異步請求對象)。

疑問:什么情況下會(huì)出現(xiàn)needRejoin()為false,rejoinIncomplete()為true的情況?

回答:在我看的kafka-0.10.1.0版本中,從代碼邏輯來看是不會(huì)出現(xiàn)以上情況的,因?yàn)閏lient.poll(future)會(huì)一直阻塞直到異步請求對象完成。所以都會(huì)執(zhí)行resetJoinGroupFuture()重置邏輯。

加入組請求對象:JoinGroupRequest

public class JoinGroupRequest {

private final String groupId; //?消費(fèi)組名稱

private final String memberId; //?消費(fèi)者成員編號(hào);消費(fèi)者初次加入消費(fèi)組時(shí);此值為:UNKNON_MEMBER,協(xié)調(diào)者在處理每個(gè)消費(fèi)者的加入組請求的時(shí)候,會(huì)為每個(gè)消費(fèi)者指定唯一的消費(fèi)者成員編號(hào),在加入組響應(yīng)中返回給消費(fèi)者;后面消費(fèi)者再次發(fā)送加入組請求的時(shí)候,memberId就是前面協(xié)調(diào)者分配的編號(hào)

private final String protocolType; //?協(xié)議類型 (消費(fèi)者:consumer,連接器:connect)

private final List<ProtocolMetadata> groupProtocols;

/** * 協(xié)議元數(shù)據(jù) * */

public static class ProtocolMetadata {

private final String name; //?分區(qū)分配器的類名(PartitionAssignor兩種分區(qū)方式:RoundRobinAssignor循環(huán),RangeAssignor范圍)

private final ByteBuffer metadata;? //?元數(shù)據(jù)內(nèi)容。協(xié)議類型為消費(fèi)者:那么元數(shù)據(jù)內(nèi)容是訂閱狀態(tài)對象其中包含:消費(fèi)訂閱的topic

}

}

分區(qū)分配算法有兩種:循環(huán)(RoundRobinAssignor)和范圍(RangeAssignor),在調(diào)用assign()方法執(zhí)行分配算法時(shí),必須要兩個(gè)參數(shù):partitionsPerTopic:Map<String, Integer>:有哪些topic這些topic有多少分區(qū)(<topic,3>),subscriptions:Map<String, List<String>>:消費(fèi)者成員編號(hào),訂閱的主題信息。

PartitionAssignor

public interface PartitionAssignor {

Subscription subscription(Set<String> topics); //?每個(gè)消費(fèi)者訂閱的主題列表

Map<String, Assignment> assign(Cluster metadata, Map<String, Subscription> subscriptions); //?只有主消費(fèi)者會(huì)調(diào)用assign(),其中subscriptions是所有消費(fèi)者的訂閱信息

void onAssignment(Assignment assignment); //?分配到結(jié)果后的回調(diào)處理

String name();

/** * 消費(fèi)者的訂閱信息,即訂閱了哪些主題 */

class Subscription { private final List<String> topics; private final ByteBuffer userData;}

/** * 消費(fèi)者的分配結(jié)果,即分配了哪些分區(qū) */

class Assignment { private final List<TopicPartition> partitions; private final ByteBuffer userData;}

}

分配方法返回值包含每個(gè)消費(fèi)者的分配結(jié)果,分配結(jié)果是一個(gè)主題分區(qū)集合,表示分配給消費(fèi)者的所有主題分區(qū)。

主消費(fèi)者

在協(xié)調(diào)者收集完所有的消費(fèi)者及其訂閱信息后,協(xié)調(diào)者并不執(zhí)行分區(qū)分配算法,而是交給其中一個(gè)消費(fèi)者來執(zhí)行分區(qū)分配,選出的這個(gè)消費(fèi)者叫主消費(fèi)者(通常協(xié)調(diào)者會(huì)把第一個(gè)發(fā)送加入組請求的消費(fèi)者選為主消費(fèi)者,當(dāng)主消費(fèi)者掛掉后,協(xié)調(diào)者再選擇下一個(gè)消費(fèi)者作為主消費(fèi)者)

使用主消費(fèi)者來執(zhí)行分區(qū)分配算法而不是協(xié)調(diào)者本身來執(zhí)行,這樣可以減少協(xié)調(diào)者的負(fù)擔(dān),但是也會(huì)增加消費(fèi)者和協(xié)調(diào)者之間的通信次數(shù):主消費(fèi)者完成分配后需要把結(jié)果同步回協(xié)調(diào)者,然后協(xié)調(diào)者再把分配的結(jié)果發(fā)送給消費(fèi)者包括主消費(fèi)者。這樣會(huì)出現(xiàn)以下問題:

1,協(xié)調(diào)者如何選擇主消費(fèi)者?

2,主消費(fèi)者失敗,協(xié)調(diào)者怎么處理?

3,協(xié)調(diào)者將所有的消費(fèi)者以及訂閱信息,通過加入組請求的響應(yīng)結(jié)果給主消費(fèi)者,那么其他的發(fā)送加入組請求的消費(fèi)者也應(yīng)該得到響應(yīng)結(jié)果,此時(shí)推送給其他消費(fèi)者的響應(yīng)結(jié)果是什么喃?

4,在收到響應(yīng)結(jié)果中其實(shí)并不包含分區(qū)分配的結(jié)果(因?yàn)檫@是主消費(fèi)者還沒有計(jì)算完成或者說還沒有把結(jié)果發(fā)送給協(xié)調(diào)者),這是消費(fèi)者怎么來獲取分區(qū)分配結(jié)果喃?

應(yīng)對:

1,協(xié)調(diào)者會(huì)選擇第一個(gè)發(fā)送加入組請求的消費(fèi)者作為主消費(fèi)者。

2,主消費(fèi)者實(shí)質(zhì)就是一個(gè)普通消費(fèi)者,所以主消費(fèi)者和協(xié)調(diào)者之間還是以心跳的方式來監(jiān)聽對方是否宕機(jī)。

3,協(xié)調(diào)者在收集完成所有的消費(fèi)者以及訂閱信息后,主消費(fèi)者收到的加入組響應(yīng)結(jié)果中會(huì)包含所有的消費(fèi)者和訂閱信息來執(zhí)行分區(qū)分配。而非主消費(fèi)者里面不包含這些。

4,每個(gè)消費(fèi)者收到加入組響應(yīng)后,都會(huì)發(fā)送同步組請求給協(xié)調(diào)者來獲取分區(qū)。主消費(fèi)者在執(zhí)行完分區(qū)分配任務(wù)后才會(huì)發(fā)送同步組請求,非主消費(fèi)者會(huì)立即發(fā)送同步組請求。但是這時(shí)主消費(fèi)者還沒有將分配的結(jié)果發(fā)送給協(xié)調(diào)者,這時(shí)非主消費(fèi)者的同步組請求在服務(wù)費(fèi)會(huì)被延遲處理。協(xié)調(diào)者收到主消費(fèi)者的同步請求后會(huì)將分配結(jié)果放在同步組請求響應(yīng)中,返回給所有的消費(fèi)者。

加入組請求和同步組請求

由于消費(fèi)者接收到的加入組請求響應(yīng)中沒有分區(qū)信息,所以不能直接完成加入組異步請求,這時(shí)要求客戶端要發(fā)送同步組請求,但是如果加入組請求有異常,就不需要繼續(xù)發(fā)送同步組請求了。消費(fèi)者這時(shí)需要重新發(fā)送加入組請求。非主消費(fèi)者在收到加入組請求的響應(yīng)后會(huì)立即發(fā)送同步組請求給協(xié)調(diào)者,主消費(fèi)者會(huì)執(zhí)行完分區(qū)分配后再發(fā)送同步組請求。

注意:收到加入組請求的響應(yīng),調(diào)用加入組響應(yīng)處理器的回調(diào)方法只是表示收到結(jié)果,并不代表加入組的異步請求完成(如果有異常的話,就完成加入組的異步請對象)。收到同步組請求響應(yīng)結(jié)果,調(diào)用同步組響應(yīng)處理器回調(diào)方法,這時(shí)同步組請求響應(yīng)結(jié)果包含了分配給消費(fèi)者的分區(qū)信息,這時(shí)就可以完成同步組請求的異步請求,并一起完成加入組異步請求。這時(shí)消費(fèi)者就可以從加入組的異步請求的結(jié)果中獲取分區(qū)分配結(jié)果(代碼中組合使用了組合模式和鏈?zhǔn)侥J剑?/p>

加入組請求 同步組請求分解流程

發(fā)送加入組請求
收到加入組響應(yīng)的處理邏輯
收到同步組響應(yīng)結(jié)果的處理邏輯

疑問:

1,為什么說加入組異步請求成功完成時(shí),同步組異步請求也成功完成了?

2,加入組異步請求是在什么時(shí)候完成的?

通過以上的流程的分解可以知道,通過加入組請求異步請求的鏈接模式,將同步組異步請求的結(jié)果設(shè)置為加入組異步請求的結(jié)果,從而完成加入組的異步請求。即消費(fèi)者收到同步組響應(yīng)后會(huì)完成同步組的異步請求,再完成加入組的異步請求。

總結(jié)一下以上流程:

1,每個(gè)消費(fèi)者都向協(xié)調(diào)者發(fā)送加入組請求,申請加入消費(fèi)組。

2,協(xié)調(diào)者接收每個(gè)消費(fèi)者的加入組請求,收集消費(fèi)組的消費(fèi)者成員。

3,協(xié)調(diào)者選擇一個(gè)消費(fèi)者作為主消費(fèi)者(一般是第一個(gè)發(fā)送加入組請求的消費(fèi)者)。

4,協(xié)調(diào)者向發(fā)送加入組請求的每個(gè)消費(fèi)者返回響應(yīng)結(jié)果,包含消費(fèi)者成員信息和訂閱信息。

5,步驟三選出的主消費(fèi)者做分區(qū)分配算法,并在計(jì)算完成后發(fā)送同步組請求給協(xié)調(diào)者。

6,非主消費(fèi)者收到包含消費(fèi)者成員信息和訂閱信息的響應(yīng)結(jié)果后不做計(jì)算,立即發(fā)送同步組請求給協(xié)調(diào)者。

7,協(xié)調(diào)者收到主消費(fèi)者發(fā)送的同步組請求后(其中包含分區(qū)分配結(jié)果),向每一個(gè)發(fā)送同步組請求的消費(fèi)者組發(fā)生同步組響應(yīng)(包含每個(gè)消費(fèi)者的分區(qū)結(jié)果)。

加入組前的準(zhǔn)備工作

消費(fèi)者在加入組過程中會(huì)調(diào)用onJoinPrepare(),表示在加入組之前要預(yù)處理一些事情:

1,執(zhí)行一次同步提交偏移量:把本地消費(fèi)的最新偏移量提交到服務(wù)端,這樣再平衡完成后,消費(fèi)者從協(xié)調(diào)者得到的分區(qū)偏移量就是最新(該偏移量以前的消息都是已經(jīng)被消費(fèi)過的)。

2,觸發(fā)用戶自定義再平衡監(jiān)聽器:客戶端可以在再平衡發(fā)生時(shí)做一些額外的操作,比如把偏移量保存到數(shù)據(jù)庫中等操作。

參考資料:

Kafka技術(shù)內(nèi)幕:圖文詳解Kafka源碼設(shè)計(jì)與實(shí)現(xiàn)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 此篇開始進(jìn)入kafka的另外一側(cè):消費(fèi)者。kafka中的消費(fèi)者比生產(chǎn)者要復(fù)雜的多,里面涉及到的消費(fèi)組,偏移量等概念...
    紹圣閱讀 2,052評(píng)論 0 0
  • 消費(fèi)者網(wǎng)絡(luò)客戶端輪詢:ConsumerNetworkClient。ConsumerNetworkClient是對N...
    紹圣閱讀 842評(píng)論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,678評(píng)論 19 139
  • 消費(fèi)者輪詢通過拉取器(Fetcher)發(fā)送拉取請求,拉取器會(huì)調(diào)用消費(fèi)者網(wǎng)絡(luò)客戶端的發(fā)送方法(send)和網(wǎng)絡(luò)輪詢方...
    紹圣閱讀 796評(píng)論 0 0
  • 原文地址 當(dāng)kafka最初被創(chuàng)建的時(shí)候,它附帶一個(gè)Scala的生產(chǎn)者和消費(fèi)者客戶端。隨著時(shí)間的推移,我們逐漸意識(shí)到...
    堯字節(jié)閱讀 6,720評(píng)論 0 9

友情鏈接更多精彩內(nèi)容