架構(gòu)
Client
生產(chǎn)發(fā)送流程
Server:
kafka 網(wǎng)絡(luò)架構(gòu)
kafka 數(shù)據(jù)存儲(chǔ)
kafka 副本同步
kafka 元數(shù)據(jù)管理
一個(gè)demo
1、 producer核心流程:初始化對(duì)象 05}

2、producer初始化過程 06} 0:25
1:00:00-1:18:12
\kafka-0.10.1.0-src\examples\src\main\java\kafka\examples\Producer.java
構(gòu)造函數(shù)
——》Producer()
//TODO 初始化kafkaProducer
producer = new KafkaProducer<>(props);
——》KafkaProducer構(gòu)造函數(shù)((ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)
▼
//配置clientId
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
//設(shè)置分區(qū)器
this.partitioner = config.getConfiguredInstance(
//重試時(shí)間 retry.backoff.ms 默認(rèn)100ms
long retryBackoffMs = config.getLong
//設(shè)置序列化器
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(
//設(shè)置攔截器
//類似于一個(gè)過濾器
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>
//metadata.max.age.ms(默認(rèn)5分鐘)
//生產(chǎn)者每隔一段時(shí)間都要去更新一下集群的元數(shù)據(jù)。
this.metadata = new Metadata(retryBackoffMs,
//max.request.size 生產(chǎn)者往服務(wù)端發(fā)送消息的時(shí)候,規(guī)定一條消息最大多大?
//如果你超過了這個(gè)規(guī)定消息的大小,你的消息就不能發(fā)送過去。
//默認(rèn)是1M,這個(gè)值偏小,在生產(chǎn)環(huán)境中,我們需要修改這個(gè)值。
//經(jīng)驗(yàn)值是10M。但是大家也可以根據(jù)自己公司的情況來。
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//指的是緩存大小
//默認(rèn)值是32M,這個(gè)值一般是夠用,如果有特殊情況的時(shí)候,我們可以去修改這個(gè)值。
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
//kafka是支持壓縮數(shù)據(jù)的,這兒設(shè)置壓縮格式。
//提高你的系統(tǒng)的吞吐量,你可以設(shè)置壓縮格式。
//一次發(fā)送出去的消息就更多。生產(chǎn)者這兒會(huì)消耗更多的cpu.
this.compressionType = CompressionType.forName(
//TODO 初始化了一個(gè)重要的管理網(wǎng)路的組件。
/**
* (1)connections.max.idle.ms: 默認(rèn)值是9分鐘 一個(gè)網(wǎng)絡(luò)連接最多空閑多久,超過這個(gè)空閑就關(guān)閉這個(gè)網(wǎng)絡(luò)連接。
*(2)max.in.flight.requests.per.connection:默認(rèn)是5 producer向broker發(fā)送數(shù)據(jù)的時(shí)候,其實(shí)是有多個(gè)網(wǎng)絡(luò)連接。 每個(gè)網(wǎng)絡(luò)連接可以忍受 producer端發(fā)送給broker 消息然后沒有響應(yīng)的個(gè)數(shù)。
- 因?yàn)閗afka有重試機(jī)制,所以有可能會(huì)造成數(shù)據(jù)亂序,如果想要保證有序,這個(gè)值要把設(shè)置為1.
*(3)send.buffer.bytes:socket發(fā)送數(shù)據(jù)的緩沖區(qū)的大小,默認(rèn)值是128K - (4)receive.buffer.bytes:socket接受數(shù)據(jù)的緩沖區(qū)的大小,默認(rèn)值是32K。
*/
NetworkClient client = new NetworkClient(
//這個(gè)就是一個(gè)發(fā)送線程
/*(1) retries:重試的次數(shù)
* (2) acks:
* 0:producer發(fā)送數(shù)據(jù)到broker后,就完了,沒有返回值,不管寫成功還是寫失敗都不管了。
* 1: producer發(fā)送數(shù)據(jù)到broker后,數(shù)據(jù)成功寫入leader partition以后返回響應(yīng)。
- -1: producer發(fā)送數(shù)據(jù)到broker后,數(shù)據(jù)要寫入到leader partition里面,并且數(shù)據(jù)同步到所有的 follower partition里面以后,才返回響應(yīng)。
*/
this.sender = new Sender(client,
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
//創(chuàng)建了一個(gè)線程,然后里面?zhèn)鬟M(jìn)去了一個(gè)sender對(duì)象。
//把業(yè)務(wù)的代碼和關(guān)于線程的代碼給隔離開來。
//關(guān)于線程的這種代碼設(shè)計(jì)的方式,其實(shí)也值得大家積累的。
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
//啟動(dòng)線程。
this.ioThread.start();
(2)元數(shù)據(jù)管理 07} 0:6
1:18:00一1:25:00
入口
——》KafkaProducer((ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)
▼
//生產(chǎn)者從服務(wù)端那兒拉取過來的kafka的元數(shù)據(jù)。
//生產(chǎn)者要想去拉取元數(shù)據(jù), 發(fā)送網(wǎng)絡(luò)請(qǐng)求,重試,
//metadata.max.age.ms(默認(rèn)5分鐘)
//生產(chǎn)者每隔一段時(shí)間都要去更新一下集群的元數(shù)據(jù)。
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
//去更新元數(shù)據(jù)
//addresses 這個(gè)地址其實(shí)就是我們寫producer代碼的時(shí)候,傳參數(shù)的時(shí)候,傳進(jìn)去了一個(gè)broker的地址。
//所以這段代碼看起來像是去服務(wù)端拉取元數(shù)據(jù),所以我們?nèi)ヲ?yàn)證一下,是否真的去拉取元數(shù)據(jù)。
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
//TODO update方法初始化的時(shí)候并沒有去服務(wù)端拉取元數(shù)據(jù)。
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
(2.1)元數(shù)據(jù)管理2 元數(shù)據(jù)結(jié)構(gòu) 07} 0:15
java/org/apache/kafka/clients/Metadata.java
java/org/apache/kafka/common/Cluster.java
java/org/apache/kafka/common/Node.java
java/org/apache/kafka/common/PartitionInfo.java
(3.1)KafkaProducer 核心流程:發(fā)送 08} 0:14
1:25:00—1:32:00
java/kafka/examples/Producer.java
//isAsync , kafka發(fā)送數(shù)據(jù)的時(shí)候,有兩種方式
//1: 異步發(fā)送
//2: 同步發(fā)送
//isAsync: true的時(shí)候是異步發(fā)送,false就是同步發(fā)送
if (isAsync) { // Send asynchronously
//異步發(fā)送,一直發(fā)送,消息響應(yīng)結(jié)果交給回調(diào)函數(shù)處理
//這樣的方式,性能比較好,我們生產(chǎn)代碼用的就是這種方式。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));
主類 org/apache/kafka/clients/producer/KafkaProducer.java
——》KafkaProducer.send()
——》★ KafkaProducer.doSend()八大步驟
▼
/**
* 步驟一:
* 同步等待拉取元數(shù)據(jù)。
* maxBlockTimeMs 最多能等待多久。
/
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(),
/*
* 步驟二:
* 對(duì)消息的key和value進(jìn)行序列化。
/
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
/*
* 步驟三:
* 根據(jù)分區(qū)器選擇消息應(yīng)該發(fā)送的分區(qū)。
*
* 因?yàn)榍懊嫖覀円呀?jīng)獲取到了元數(shù)據(jù)
* 這兒我們就可以根據(jù)元數(shù)據(jù)的信息
* 計(jì)算一下,我們應(yīng)該要把這個(gè)數(shù)據(jù)發(fā)送到哪個(gè)分區(qū)上面。
*/
int partition = partition(record, serializedKey, serializedValue, cluster);
int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
/**
* 步驟四:
* 確認(rèn)一下消息的大小是否超過了最大值。
* KafkaProdcuer初始化的時(shí)候,指定了一個(gè)參數(shù),代表的是Producer這兒最大能發(fā)送的是一條消息能有多大
* 默認(rèn)最大是1M,我們一般都回去修改它。
*/
ensureValidRecordSize(serializedSize);
/**
* 步驟五:
* 根據(jù)元數(shù)據(jù)信息,封裝分區(qū)對(duì)象
*/
tp = new TopicPartition(record.topic(), partition);
/**
* 步驟六:
* 給每一條消息都綁定他的回調(diào)函數(shù)。因?yàn)槲覀兪褂玫氖钱惒降姆绞桨l(fā)送的消息。
/
Callback interceptCallback = this.interceptors == null ? callback :
/*
* 步驟七:
* 把消息放入accumulator(32M的一個(gè)內(nèi)存)
* 然后有accumulator把消息封裝成為一個(gè)批次一個(gè)批次的去發(fā)送。
/
RecordAccumulator.RecordAppendResult result = accumulator.append(
/*
* 步驟八:
* 喚醒sender線程。他才是真正發(fā)送數(shù)據(jù)的線程。
*/
this.sender.wakeup();
(3.2)KafkaProducer核心流程2:異常體系 08 02} 0:6
異常體系構(gòu)建 ,自定義異常+ 多層拋出
(4.1 )KafkaProducer 加載元數(shù)據(jù)1 09 01} 0:15
——》★ KafkaProducer.doSend()里面
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata
——》KafkaProducer .waitOnMetadata()
/**
* TODO 這個(gè)步驟重要
* 我們發(fā)現(xiàn)這兒去喚醒sender線程。
* 其實(shí)是因?yàn)?,拉取有拉取元?shù)據(jù)這個(gè)操作是有sender線程去完成的。
* 這個(gè)地方把線程給喚醒了以后
* 我們知道sender線程肯定就開始進(jìn)行干活了 至于怎么我們后面在繼續(xù)分析。
* 很明顯,真正去獲取元數(shù)據(jù)是這個(gè)線程完成。
*/
sender.wakeup();
(4.2)KafkaProducer 加載元數(shù)據(jù)2 09 02} 0:7
——》KafkaProducer .waitOnMetadata()
//TODO 等待元數(shù)據(jù)
//同步的等待
//等待這sender線程獲取到元數(shù)據(jù)。
metadata.awaitUpdate(version, remainingWaitMs);
(4.3)KafkaProducer 加載元數(shù)據(jù)3 09 03} 0:25
——》KafkaProducer .waitOnMetadata()
——》 sender.wakeup();
sender線程 在KafkaProducer 的 構(gòu)造函數(shù)里面
//創(chuàng)建了一個(gè)線程,然后里面?zhèn)鬟M(jìn)去了一個(gè)sender對(duì)象。
//把業(yè)務(wù)的代碼和關(guān)于線程的代碼給隔離開來。
//關(guān)于線程的這種代碼設(shè)計(jì)的方式,其實(shí)也值得大家積累的。
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
//啟動(dòng)線程。
this.ioThread.start();
——》 java/org/apache/kafka/clients/producer/internals/Sender.java
——》 run 方法
——》 void run(long now) {
//TODO 重點(diǎn)就是去看這個(gè)方法
//就是用這個(gè)方法拉取的元數(shù)據(jù)。
/**
* 步驟八:
* 真正執(zhí)行網(wǎng)絡(luò)操作的都是這個(gè)NetWordClient這個(gè)組件
* 包括:發(fā)送請(qǐng)求,接受響應(yīng)(處理響應(yīng))
*
* 拉取元數(shù)據(jù)信息,靠的就是這段代碼
*/
//我們猜這兒可能就是去建立連接。
this.client.poll(pollTimeout, now);
↓
java/org/apache/kafka/clients/NetworkClient.java
—— ★》 NetworkClient.poll ()
//步驟一:封裝了一個(gè)要拉取元數(shù)據(jù)請(qǐng)求
long metadataTimeout = metadataUpdater.maybeUpdate(now);
//步驟二: 發(fā)送請(qǐng)求,進(jìn)行復(fù)雜的網(wǎng)絡(luò)操作
//但是我們目前還沒有學(xué)習(xí)到kafka的網(wǎng)絡(luò)
//所以這兒大家就只需要知道這兒會(huì)發(fā)送網(wǎng)絡(luò)請(qǐng)求。
//TODO 執(zhí)行網(wǎng)絡(luò)IO的操作。 NIO
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
//步驟三:處理響應(yīng),響應(yīng)里面就會(huì)有我們需要的元數(shù)據(jù)。
/**
* 這個(gè)地方是我們?cè)诳瓷a(chǎn)者是如何獲取元數(shù)據(jù)的時(shí)候,看的。
* 其實(shí)Kafak獲取元數(shù)據(jù)的流程跟我們發(fā)送消息的流程是一模一樣。
* 獲取元數(shù)據(jù) -》 判斷網(wǎng)絡(luò)連接是否建立好 -》 建立網(wǎng)絡(luò)連接
* -》 發(fā)送請(qǐng)求(獲取元數(shù)據(jù)的請(qǐng)求) -》 服務(wù)端發(fā)送回來響應(yīng)(帶了集群的元數(shù)據(jù)信息)
*
*/
handleCompletedReceives(responses, updatedNow);
——》NetworkClient.DefaultMetadataUpdater.handleResponse()
(4.4)KafkaProducer 加載元數(shù)據(jù)4 流程圖 09 04} 0:7

(5)分區(qū)選擇邏輯 10 } 0:10
2:11:00---2:21:22
—— ★》 KafkaProducer.doSend()里面 步驟三
▼
/**
* 步驟三:
* 根據(jù)分區(qū)器選擇消息應(yīng)該發(fā)送的分區(qū)。
* 因?yàn)榍懊嫖覀円呀?jīng)獲取到了元數(shù)據(jù)
* 這兒我們就可以根據(jù)元數(shù)據(jù)的信息
* 計(jì)算一下,我們應(yīng)該要把這個(gè)數(shù)據(jù)發(fā)送到哪個(gè)分區(qū)上面。
*/
int partition = partition(record, serializedKey, serializedValue, cluster);
——》KafkaProducer.partition()
//如果你的這個(gè)消息已經(jīng)分配了分區(qū)號(hào),那直接就用這個(gè)分區(qū)號(hào)就可以了
//但是正常情況下,消息是沒有分區(qū)號(hào)的。
//使用分區(qū)器進(jìn)行選擇合適的分區(qū)
partitioner.partition( record.topic(), record.key(),
↓
——》DefaultPartitioner.partition()
(6.1)封裝recordAccumulator 11.01 } 0:11
2:21:00---2:26:00

—— ★》 KafkaProducer.doSend()里面 步驟四、五、六、七
/**
* 步驟四:
* 確認(rèn)一下消息的大小是否超過了最大值。
* KafkaProdcuer初始化的時(shí)候,指定了一個(gè)參數(shù),代表的是Producer這兒最大能發(fā)送的是一條消息能有多大
* 默認(rèn)最大是1M,我們一般都回去修改它。
*/
ensureValidRecordSize(serializedSize);
/**
* 步驟五:
* 根據(jù)元數(shù)據(jù)信息,封裝分區(qū)對(duì)象
/
tp = new TopicPartition(record.topic(), partition);
/*
* 步驟六:
* 給每一條消息都綁定他的回調(diào)函數(shù)。因?yàn)槲覀兪褂玫氖钱惒降姆绞桨l(fā)送的消息。
/
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
/*
* 步驟七:
* 把消息放入accumulator(32M的一個(gè)內(nèi)存)
* 然后有accumulator把消息封裝成為一個(gè)批次一個(gè)批次的去發(fā)送。
*/
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
轉(zhuǎn)到
——》 RecordAccumulator.append()
(6.2)封裝recordAccumulator 2 11.02 } 0:15
——》 RecordAccumulator.append()詳解7大步驟
/**
- 步驟一:先根據(jù)分區(qū)找到應(yīng)該插入到哪個(gè)隊(duì)列里面。
- 如果有已經(jīng)存在的隊(duì)列,那么我們就使用存在隊(duì)列
- 如果隊(duì)列不存在,那么我們新創(chuàng)建一個(gè)隊(duì)列
Deque<RecordBatch> dq = getOrCreateDeque(tp);
/**
* 步驟二:
* 嘗試往隊(duì)列里面的批次里添加數(shù)據(jù)
* 一開始添加數(shù)據(jù)肯定是失敗的,我們目前只是以后了隊(duì)列
* 數(shù)據(jù)是需要存儲(chǔ)在批次對(duì)象里面(這個(gè)批次對(duì)象是需要分配內(nèi)存的)
* 我們目前還沒有分配內(nèi)存,所以如果按場(chǎng)景驅(qū)動(dòng)的方式,
* 代碼第一次運(yùn)行到這兒其實(shí)是不成功的。
*/
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
/**
* 步驟三:計(jì)算一個(gè)批次的大小
* 在消息的大小和批次的大小之間取一個(gè)最大值,用這個(gè)值作為當(dāng)前這個(gè)批次的大小。
* 有可能我們的一個(gè)消息的大小比一個(gè)設(shè)定好的批次的大小還要大。
* 默認(rèn)一個(gè)批次的大小是16K。
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
/**
* 步驟四:
* 根據(jù)批次的大小去分配內(nèi)存
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
/**
* 步驟五:
* 嘗試把數(shù)據(jù)寫入到批次里面。
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
/**
* 步驟六:
* 根據(jù)內(nèi)存大小封裝批次
* 線程一到這兒 會(huì)根據(jù)內(nèi)存封裝出來一個(gè)批次。
*/
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
/**
* 步驟七:
* 把這個(gè)批次放入到這個(gè)隊(duì)列的隊(duì)尾
* 線程一把批次添加到隊(duì)尾
*/
dq.addLast(batch);
incomplete.add(batch);
(7) 讀寫分離 copyOnwrite數(shù)據(jù)結(jié)構(gòu)使用 12} 0:26
2:26:00---2:41:00
入口
——》 RecordAccumulator.append()
——》 getOrCreateDeque()
Deque<RecordBatch> d = this.batches.get(tp);
//把這個(gè)空的隊(duì)列存入batches 這個(gè)數(shù)據(jù)結(jié)構(gòu)里面
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);
batches 是 RecordAccumulator 的 成員變量
ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
在RecordAccumulator構(gòu)造函數(shù)里面賦值
this.batches = new CopyOnWriteMap<>();
CopyOnWriteMap是自定義類
——》CopyOnWriteMap.putIfAbsent()
▼
//如果我們傳進(jìn)來的這個(gè)key不存在
if (!containsKey(k))
//那么就調(diào)用里面內(nèi)部的put方法
return put(k, v);
else
//返回結(jié)果
return get(k);
}
//新的內(nèi)存空間
//讀寫分離
//往新的內(nèi)存空間里面插入
//讀,讀數(shù)據(jù)就老讀空間里面去
Map<K, V> copy = new HashMap<K, V>(this.map);
//插入數(shù)據(jù)
V prev = copy.put(k, v);
//賦值給map
this.map = Collections.unmodifiableMap(copy);
(8) 數(shù)據(jù)寫入對(duì)應(yīng)批次(分段加鎖) 13} 0:21
2:41:42-----2:53:11

入口
——》 RecordAccumulator.append()
分段加鎖 步驟1~ 6 可以最大并發(fā)
synchronized (dq) {
}
(9) 發(fā)送者內(nèi)存池設(shè)計(jì) 14} 0:28

人口——》 RecordAccumulator.append()
* 步驟四:
* 根據(jù)批次的大小去分配內(nèi)存
——》BufferPool.allocate()
/**
*
* 總的分配的思路,可能一下子分配不了這么大的內(nèi)存,但是可以先有點(diǎn)分配一點(diǎn)。
*
*/
//如果分配的內(nèi)存的大小 還是沒有要申請(qǐng)的內(nèi)存大小大。
//內(nèi)存池就會(huì)一直分配的內(nèi)存,一點(diǎn)一點(diǎn)的去分配。
//等著別人會(huì)釋放內(nèi)存。
//accumulated 5K+16K=21K 16K
// size 32K
while (accumulated < size) {
(9) 發(fā)送者線程sender詳解1 15} 0:15
入口 KafkaProducer 構(gòu)造函數(shù)
//這個(gè)就是一個(gè)線程
this.sender = new Sender(client,
——》Sender.run(long now) 方法 , 8個(gè)步驟
- 步驟一:(上面章節(jié)講了)
獲取元數(shù)據(jù)
Cluster cluster = metadata.fetch();
- 步驟二:首先是判斷哪些partition有消息可以發(fā)送:
* 獲取到這個(gè)partition的leader partition對(duì)應(yīng)的broker主機(jī)(根據(jù)元數(shù)據(jù)信息來就可以了)
* 哪些broker上面需要我們?nèi)グl(fā)送消息?
*/
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
/**
* 步驟三:
* 標(biāo)識(shí)還沒有拉取到元數(shù)據(jù)的topic
*/
if (!result.unknownLeaderTopics.isEmpty()) {
/**
* 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
*/
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
/**
* 步驟五:
*
* 我們有可能要發(fā)送的partition有很多個(gè),
* 很有可能有一些partition的leader partition是在同一臺(tái)服務(wù)器上面。
- 按照broker進(jìn)行分組,同一個(gè)broker的partition為同一組
* 0:{p0,p1} -> 批次
* 1:{p2}
* 2:{p3}
/**
* 步驟六:
* 對(duì)超時(shí)的批次是如何處理的?
*
/
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
/*
* 步驟七:
* 創(chuàng)建發(fā)送消息的請(qǐng)求
* 我們往partition上面去發(fā)送消息的時(shí)候,有一些partition他們?cè)谕慌_(tái)服務(wù)器上面
* ,如果我們一分區(qū)一個(gè)分區(qū)的發(fā)送我們網(wǎng)絡(luò)請(qǐng)求,那網(wǎng)絡(luò)請(qǐng)求就會(huì)有一些頻繁
* 我們要知道,我們集群里面網(wǎng)絡(luò)資源是非常珍貴的。
* 會(huì)把發(fā)往同個(gè)broker上面partition的數(shù)據(jù) 組合成為一個(gè)請(qǐng)求。
* 然后統(tǒng)一一次發(fā)送過去,這樣子就減少了網(wǎng)絡(luò)請(qǐng)求。
*/
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
//如果網(wǎng)絡(luò)連接沒有建立好 batches其實(shí)是為空。
//也就說其實(shí)這段代碼也是不會(huì)執(zhí)行。
/**
* 步驟八:
* 真正執(zhí)行網(wǎng)絡(luò)操作的都是這個(gè)NetWordClient這個(gè)組件
* 包括:發(fā)送請(qǐng)求,接受響應(yīng)(處理響應(yīng))
* 拉取元數(shù)據(jù)信息,靠的就是這段代碼
*/
//我們猜這兒可能就是去建立連接。
this.client.poll(pollTimeout, now);
(10) 發(fā)送者線程sender詳解2 batch什么條件下發(fā)送? 16} 0:25
——》Sender.run(long now) 方法 里面
/**
* 步驟二:
* 首先是判斷哪些partition有消息可以發(fā)送:
* 我們看一下一個(gè)批次可以發(fā)送出去的條件
* 獲取到這個(gè)partition的leader partition對(duì)應(yīng)的broker主機(jī)(根據(jù)元數(shù)據(jù)信息來就可以了)
* 哪些broker上面需要我們?nèi)グl(fā)送消息?
*/
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
——》RecordAccumulator.ready()
▼
//waiters里面有數(shù)據(jù),說明我們的這個(gè)內(nèi)存池里面內(nèi)存不夠了。
//如果exhausted的值等于true,說明內(nèi)存池里面的內(nèi)存不夠用了。
boolean exhausted = this.free.queued() > 0;
//首先從隊(duì)列的隊(duì)頭獲取到批次
RecordBatch batch = deque.peekFirst();
//如果這個(gè)catch不null,我們判斷一下是否可以發(fā)送這個(gè)批次。
if (batch != null) {
(11) 發(fā)送者線程sender詳解3 篩選可以發(fā)送消息的broker(1) 17 1} 0:16
——》Sender.run(long now) 方法 里面
/**
* 步驟三:
* 標(biāo)識(shí)還沒有拉取到元數(shù)據(jù)的topic
*/
if (!result.unknownLeaderTopics.isEmpty()) {
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
/**
* 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
*/
if (!this.client.ready(node, now)) {
(12) 發(fā)送者線程sender詳解3 篩選可以發(fā)送消息的broker(2) 17 2} 0:16

(13) 發(fā)送者 源碼之kafka網(wǎng)絡(luò)設(shè)計(jì) 18 1 2 3} 0:10 0:5 0:8
——》Sender.run(long now) 方法 里面
/**
* 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
*/
if (!this.client.ready(node, now)) {
↓
——》NetworkClient.ready()
——》NetworkClient. initiateConnect()
//TODO 嘗試建立連接
selector.connect(nodeConnectionId
1 ——》Selector類 各成員變量 詳解
▼
* TODO 這個(gè)selector是kafka主機(jī)封裝的一個(gè)selector
* 他是基于java NIO里面的selector去封裝的。
//這個(gè)對(duì)象就是javaNIO里面的Selector
//Selector是負(fù)責(zé)網(wǎng)絡(luò)的建立,發(fā)送網(wǎng)絡(luò)請(qǐng)求,處理實(shí)際的網(wǎng)絡(luò)IO。
//所以他是最最核心的這么樣的一個(gè)組件。
private final java.nio.channels.Selector nioSelector;
//broker 和 KafkaChannel(SocketChnnel)的映射
//這兒的kafkaChannel大家暫時(shí)可以理解為就是SocketChannel
//代表的就是一個(gè)網(wǎng)絡(luò)連接。
private final Map<String, KafkaChannel> channels;
//已經(jīng)完成發(fā)送的請(qǐng)求
private final List<Send> completedSends;
//已經(jīng)接收到的,并且處理完了的響應(yīng)。
private final List<NetworkReceive> completedReceives;
//已經(jīng)接收到了,但是還沒來得及處理的響應(yīng)。
//一個(gè)連接,對(duì)應(yīng)一個(gè)響應(yīng)隊(duì)列
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
//沒有建立連接的主機(jī)
private final List<String> disconnected;
//完成建立連接的主機(jī)
private final List<String> connected;
//建立連接失敗的主機(jī)。
private final List<String> failedSends;
2 ——》KafkaChannel類 各成員變量 詳解
//一個(gè)broker就對(duì)應(yīng)一個(gè)KafkaChannel
//這個(gè)id就是broker的id
private final String id;
//接收到的響應(yīng)
private NetworkReceive receive;
//發(fā)送出去的請(qǐng)求
private Send send;
//我們推測(cè)這個(gè)里面應(yīng)該會(huì)有SocketChannel
private final TransportLayer transportLayer;
▼
* returns underlying socketChannel
* 這個(gè)核心的組件,就是javaNIO里面的SocketChannel
SocketChannel socketChannel();

(14) 發(fā)送者 源碼 Selector.connect 與broker建立連接 20 1 } 0:13
——》Sender.run(long now) 方法 里面
/**
* 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
*/
if (!this.client.ready(node, now)) {
↓
——》NetworkClient.ready()
——》NetworkClient. initiateConnect()
//TODO 嘗試建立連接
selector.connect(nodeConnectionId
——》Selector.connect()
▼
//獲取到SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//設(shè)置為非阻塞的模式
socketChannel.configureBlocking(false);
//設(shè)置一些參數(shù)
//這些網(wǎng)絡(luò)的參數(shù),我們之前在分析Producer的時(shí)候給大家看過
//都有一些默認(rèn)值。
//這個(gè)的默認(rèn)值是false,代表要開啟Nagle的算法
//它會(huì)把網(wǎng)絡(luò)中的一些小的數(shù)據(jù)包收集起來,組合成一個(gè)大的數(shù)據(jù)包
//然后再發(fā)送出去。因?yàn)樗J(rèn)為如果網(wǎng)絡(luò)中有大量的小的數(shù)據(jù)包在傳輸
//其實(shí)是會(huì)影響網(wǎng)絡(luò)擁塞。
//kafka一定不能把這兒設(shè)置為false,因?yàn)槲覀冇行r(shí)候可能有些數(shù)據(jù)包就是比較
//小,他這兒就不幫我們發(fā)送了,顯然是不合理的。
socket.setTcpNoDelay(true);
//嘗試去服務(wù)器去連接。
//因?yàn)檫@兒非阻塞的
//有可能就立馬連接成功,如果成功了就返回true
//也有可能需要很久才能連接成功,返回false。
connected = socketChannel.connect(address);
//SocketChannel往Selector上注冊(cè)了一個(gè)OP_CONNECT
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
//根據(jù)根據(jù)SocketChannel 封裝出來一個(gè)KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
//把key和KafkaChannel關(guān)聯(lián)起來
//后面使用起來會(huì)比較方便
//我們可以根據(jù)key就找到KafkaChannel
//也可以根據(jù)KafkaChannel找到key
key.attach(channel);
//緩存起來了
this.channels.put(id, channel);
//所以正常情況下,這兒網(wǎng)絡(luò)不能完成連接。
//如果這兒不能完成連接。大家猜一下
//kafka會(huì)在哪兒完成網(wǎng)絡(luò)最后的連接呢?
//如果里面就連接上了
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
// 取消前面注冊(cè) OP_CONNECT 事件。
key.interestOps(0);
}
(15) 發(fā)送者 源碼 Selector.connect 與broker建立連接2 完成最后的 連接 20 2 } 0:10
——》Sender.run(long now) 方法 里面
/**
* 步驟八:
* 真正執(zhí)行網(wǎng)絡(luò)操作的都是這個(gè)NetWordClient這個(gè)組件
-
包括:發(fā)送請(qǐng)求,接受響應(yīng)(處理響應(yīng))
* 拉取元數(shù)據(jù)信息,靠的就是這段代碼
//我們猜這兒可能就是去建立連接。
this.client.poll(pollTimeout, now);
↓
——》NetworkClient.poll();
▼
//步驟二: 發(fā)送請(qǐng)求,進(jìn)行復(fù)雜的網(wǎng)絡(luò)操作
//但是我們目前還沒有學(xué)習(xí)到kafka的網(wǎng)絡(luò)
//所以這兒大家就只需要知道這兒會(huì)發(fā)送網(wǎng)絡(luò)請(qǐng)求。
//TODO 執(zhí)行網(wǎng)絡(luò)IO的操作。 NIO
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
↓
——》Selector.poll()
▼
//從Selector上找到有多少個(gè)key注冊(cè)了
int readyKeys = select(timeout);
//因?yàn)槲覀冇脠?chǎng)景驅(qū)動(dòng)的方式
//我們剛剛確實(shí)是注冊(cè)了一個(gè)key
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//立馬就要對(duì)這個(gè)Selector上面的key要進(jìn)行處理。
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);——》Selector.pollSelectionKeys() ▼
//遍歷所有的key
while (iterator.hasNext()) {
//根據(jù)key找到對(duì)應(yīng)的KafkaChannel
KafkaChannel channel = channel(key);
/**
*
* 我們代碼第一次進(jìn)來應(yīng)該要走的是這兒分支,因?yàn)槲覀兦懊孀?cè)的是
* SelectionKey key = socketChannel.register(nioSelector,
* SelectionKey.OP_CONNECT);
*
*/
if (isImmediatelyConnected || key.isConnectable()) {
//TODO 核心的代碼
//去最后完成網(wǎng)絡(luò)的連接
//如果我們之前初始化的時(shí)候,沒有完成網(wǎng)絡(luò)連接的話,這兒一定會(huì)幫你
//完成網(wǎng)絡(luò)的連接。
1:》 if (channel.finishConnect()) {
//網(wǎng)絡(luò)連接已經(jīng)完成了以后,就把這個(gè)channel存儲(chǔ)到
this.connected.add(channel.id());
(16) 發(fā)送者 源碼 Selector.connect 與broker建立連接3 連接示意圖 20 3 } 0:17

(17) 發(fā)送者 源碼產(chǎn)生者 發(fā)送網(wǎng)絡(luò)請(qǐng)求了 21 1 } 0:20
接上期 1:》
//完成網(wǎng)絡(luò)的連接。
if (channel.finishConnect()) {
——》PlaintextTransportLayer.finishConnect()
▼
//完成的最后的網(wǎng)絡(luò)的連接
boolean connected = socketChannel.finishConnect();
//如果連接完成了以后。
if (connected)
//取消了OP_CONNECT事件
//增加了OP_READ 事件 我們這兒的這個(gè)key對(duì)應(yīng)的KafkaChannel是不是就可以接受服務(wù)
//端發(fā)送回來的響應(yīng)了。
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
▲
回到 ——》Sender.run(long now) 方法 里面 步驟4 ,現(xiàn)在網(wǎng)絡(luò)已經(jīng)建立連接
進(jìn)入下面
/**
* 步驟七:
* 創(chuàng)建發(fā)送消息的請(qǐng)求
* 我們往partition上面去發(fā)送消息的時(shí)候,有一些partition他們?cè)谕慌_(tái)服務(wù)器上面
* ,如果我們一分區(qū)一個(gè)分區(qū)的發(fā)送我們網(wǎng)絡(luò)請(qǐng)求,那網(wǎng)絡(luò)請(qǐng)求就會(huì)有一些頻繁
* 我們要知道,我們集群里面網(wǎng)絡(luò)資源是非常珍貴的。
* 會(huì)把發(fā)往同個(gè)broker上面partition的數(shù)據(jù) 組合成為一個(gè)請(qǐng)求。
* 然后統(tǒng)一一次發(fā)送過去,這樣子就減少了網(wǎng)絡(luò)請(qǐng)求。
*/
List<ClientRequest> requests = createProduceRequests(batches, now);
//TODO 發(fā)送請(qǐng)求的操作
for (ClientRequest request : requests)
//綁定 op_write
client.send(request, now);
↓
——》 NetworkClient..send()
——》NetworkClient..doSend()
▼
//這兒往inFlightRequests 組件里存 Request請(qǐng)求。
//存儲(chǔ)的就是還沒有收到響應(yīng)的請(qǐng)求。
//這個(gè)里面默認(rèn)最多能存5個(gè)請(qǐng)求。
//其實(shí)我們可以猜想一個(gè)事,如果我們的請(qǐng)求發(fā)送出去了
//然后也成功的接受到了響應(yīng),后面就會(huì)到這兒把這個(gè)請(qǐng)求移除。
this.inFlightRequests.add(request);
//TODO
selector.send(request.request());
↓
——》Selector.send()
//獲取到一個(gè)KafakChannel
KafkaChannel channel = channelOrFail(send.destination());
try {
//TODO
channel.setSend(send);
——》KafkaChannel .setSend()
▼
//關(guān)鍵的代碼來了
//這兒綁定了一個(gè)OP_WRITE事件。
//一旦綁定了這個(gè)事件以后,我們就可以往服務(wù)端發(fā)送請(qǐng)求了。
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);

▲
回到 ——》Sender.run(long now) 方法 里面 步驟8 ,
this.client.poll(pollTimeout, now);
↓
——》NetworkClient.poll();
↓
——》Selector.poll()
▼
//因?yàn)槲覀冇脠?chǎng)景驅(qū)動(dòng)的方式
//我們剛剛確實(shí)是注冊(cè)了一個(gè)key
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//立馬就要對(duì)這個(gè)Selector上面的key要進(jìn)行處理。
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
——》Selector. pollSelectionKeys()
▼
//核心代碼,處理發(fā)送請(qǐng)求的事件
//selector 注冊(cè)了一個(gè)OP_WRITE
//selector 注冊(cè)了一個(gè)OP_READ
if (channel.ready() && key.isWritable()) {
//獲取到我們要發(fā)送的那個(gè)網(wǎng)絡(luò)請(qǐng)求。
//是這句代碼就是要往服務(wù)端發(fā)送數(shù)據(jù)了。
//TODO:服務(wù)端
//里面我們發(fā)現(xiàn)如果消息被發(fā)送出去了,然后移除OP_WRITE
Send send = channel.write();
//已經(jīng)完成響應(yīng)消息的發(fā)送
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}
(18) 發(fā)送者 源碼 產(chǎn)生者 發(fā)送網(wǎng)絡(luò)請(qǐng)求2 21 2 } 0:10
流程圖圖示如下:

(19) 發(fā)送者 源碼 producer是如何處理粘包問題的1 22 1} 0:5
粘包: 一個(gè)請(qǐng)求里面有多個(gè)響應(yīng),多個(gè)消息粘在一起回來

轉(zhuǎn)到這里
——》NetworkClient..doSend()
▼
//這兒往inFlightRequests 隊(duì)列組件里存 Request請(qǐng)求。
//存儲(chǔ)的就是還沒有收到響應(yīng)的請(qǐng)求。
//這個(gè)里面默認(rèn)最多能存5個(gè)請(qǐng)求。
//其實(shí)我們可以猜想一個(gè)事,如果我們的請(qǐng)求發(fā)送出去了
//然后也成功的接受到了響應(yīng),后面就會(huì)到這兒把這個(gè)請(qǐng)求移除。
this.inFlightRequests.add(request);
inFlightRequests 類里面有 Map 成員變量
private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();
(20) 發(fā)送者 源碼 producer是如何處理粘包問題的2 22 2} 0:12
接上面:發(fā)送代碼
——》Selector. pollSelectionKeys()
//里面不斷的讀取數(shù)據(jù),讀取數(shù)據(jù)的代碼我們之前就已經(jīng)分析過
//里面還涉及到粘包和拆包的一些問題。
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
——》NetworkReceive.readFromReadableChannel()
▼
//先讀取4字節(jié)的數(shù)據(jù),(代表的意思就是后面跟著的消息體的大?。?br>
int bytesRead = channel.read(size);
//一直要讀取到當(dāng)這個(gè)size沒有剩余空間
//說明已經(jīng)讀取到了一個(gè)4字節(jié)的int類型的數(shù)了。
if (!size.hasRemaining()) {
//分配一個(gè)內(nèi)存空間,這個(gè)內(nèi)存空間的大小
//就是剛剛讀出來的那個(gè)4字節(jié)的int的大小。
//10
this.buffer = ByteBuffer.allocate(receiveSize);
if (buffer != null) {
//去讀取數(shù)據(jù)
int bytesRead = channel.read(buffer);
?? 如何判斷讀完了消息呢? 回到
while ((networkReceive = channel.read()) != null)
——》KafkaChannel.read()
▼
//一直在讀取數(shù)據(jù)。
receive(receive);
//是否讀完一個(gè)完整的響應(yīng)消息
if (receive.complete()) {
——》NetworkReceive.complete()
//size 沒有剩余空間(50) &&
return !size.hasRemaining() && !buffer.hasRemaining();
(21) 發(fā)送者 源碼 producer是如何處理拆包問題 23 } 0:5
拆包:

和粘包類似
——》NetworkReceive.readFromReadableChannel()
▼
if (buffer != null) {
//去讀取數(shù)據(jù)
int bytesRead = channel.read(buffer);
?? 如何判斷讀完了消息呢? 回到
while ((networkReceive = channel.read()) != null)
——》KafkaChannel.read()
▼
//一直在讀取數(shù)據(jù)。
receive(receive);
//是否讀完一個(gè)完整的響應(yīng)消息
if (receive.complete()) {
(22) 發(fā)送者 源碼 如何處理暫存狀態(tài)的響應(yīng)消息 24 } 0:32
——》Sender.run(long now) 方法入口
——》NetworkClient.poll()
——》NetworkClient.handleCompletedReceives()

(23) 發(fā)送者 源碼 響應(yīng)消息處理流程 25 } 0:25
——》Sender.run(long now) 方法入口
——》NetworkClient.poll()
▼
//步驟三:處理響應(yīng),響應(yīng)里面就會(huì)有我們需要的元數(shù)據(jù)。
/**
* 這個(gè)地方是我們?cè)诳瓷a(chǎn)者是如何獲取元數(shù)據(jù)的時(shí)候,看的。
* 其實(shí)Kafak獲取元數(shù)據(jù)的流程跟我們發(fā)送消息的流程是一模一樣。
* 獲取元數(shù)據(jù) -》 判斷網(wǎng)絡(luò)連接是否建立好 -》 建立網(wǎng)絡(luò)連接
* -》 發(fā)送請(qǐng)求(獲取元數(shù)據(jù)的請(qǐng)求) -》 服務(wù)端發(fā)送回來響應(yīng)(帶了集群的元數(shù)據(jù)信息)
*
*/
//調(diào)用的響應(yīng)的里面的我們之前發(fā)送出去的請(qǐng)求的回調(diào)函數(shù)
//看到了這兒,我們回頭再去看一下
//我們當(dāng)時(shí)發(fā)送請(qǐng)求的時(shí)候,是如何封裝這個(gè)請(qǐng)求。
//不過雖然目前我們還沒看到,但是我們可以大膽猜一下。
//當(dāng)時(shí)封裝網(wǎng)絡(luò)請(qǐng)求的時(shí)候,肯定是給他綁定了一個(gè)回調(diào)函數(shù)。
response.request().callback().onComplete(response);
回調(diào)函數(shù)在封裝時(shí)發(fā)送
——》Sender.produceRequest()
▼
public void onComplete(ClientResponse response) {
//回調(diào)函數(shù)要是被調(diào)用
//其實(shí)就是這個(gè)方法被執(zhí)行了。
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}
——》Sender.handleProduceResponse()回調(diào)函數(shù)
▼
//如果處理成功那就是成功了,但是如果服務(wù)端那兒處理失敗了
//是不是也要給我們發(fā)送回來異常的信息。
//error 這個(gè)里面存儲(chǔ)的就是服務(wù)端發(fā)送回來的異常碼
Errors error = Errors.forCode(partResp.errorCode);
//獲取到當(dāng)前分區(qū)的響應(yīng)。
RecordBatch batch = batches.get(tp);
//對(duì)響應(yīng)進(jìn)行處理
completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);
——》Sender.completeBatch()
▼
//如果響應(yīng)里面帶有異常 并且 這個(gè)請(qǐng)求是可以重試的
if (error != Errors.NONE && canRetry(batch, error)) {
// retry
} else {
//TODO 這兒過來的數(shù)據(jù):(1)帶有異常,但是不可以重試(1:壓根就不讓重試2:重試次數(shù)超了。
// (2)沒有異常
//TODO 核心代碼 把異常的信息也給帶過去了
//我們剛剛看的就是這兒的代碼
//里面調(diào)用了用戶傳進(jìn)來的回調(diào)函數(shù)
//回調(diào)函數(shù)調(diào)用了以后
//說明我們的一個(gè)完整的消息的發(fā)送流程就結(jié)束了。
batch.done(baseOffset, timestamp, exception);
//看起來這個(gè)代碼就是要回收資源的。
this.accumulator.deallocate(batch);
}
——》RecordBatch.done()
▼
//如果沒有異常
if(exception == null){
//調(diào)用我們發(fā)送的消息的回調(diào)函數(shù)
//大家還記不記得我們?cè)诎l(fā)送數(shù)據(jù)的時(shí)候
//還不是綁定了一個(gè)回調(diào)函數(shù)。
//這兒說的調(diào)用的回調(diào)函數(shù)
//就是我們開發(fā),生產(chǎn)者代碼的時(shí)候,我們用戶傳進(jìn)去的那個(gè)
//回調(diào)函數(shù)。
thunk.callback.onCompletion(metadata, null);//帶過去的就是沒有異常
//也就是說我們生產(chǎn)者那兒的代碼,捕獲異常的時(shí)候就是發(fā)現(xiàn)沒有異常。
} else {
//如果有異常就會(huì)把異常傳給回調(diào)函數(shù)。
//由我們用戶自己去捕獲這個(gè)異常。
//然后對(duì)這個(gè)異常進(jìn)行處理
//大家根據(jù)自己公司的業(yè)務(wù)規(guī)則進(jìn)行處理就可以了。
//如果走這個(gè)分支的話,我們的用戶的代碼是可以捕獲到timeoutexception
//這個(gè)異常,如果用戶捕獲到了,做對(duì)應(yīng)的處理就可以了。
thunk.callback.onCompletion(null, exception);
(24) 發(fā)送者 源碼 消息發(fā)送完了以后內(nèi)存如何處理 26} 0:6
接上
——》Sender.completeBatch()
▼
batch.done(baseOffset, timestamp, exception);
//看起來這個(gè)代碼就是要回收資源的。
this.accumulator.deallocate(batch);
——》RecordAccumulator.deallocate()
▼
//從某個(gè)數(shù)據(jù)結(jié)構(gòu)里面移除 已經(jīng)成功處理的批次
incomplete.remove(batch);
//釋放內(nèi)存,回收到內(nèi)存池里面
free.deallocate(batch.records.buffer(), batch.records.initialCapacity());
——》BufferPool.deallocate()
▼
//如果你還回來的內(nèi)存的大小 就等于一個(gè)批次的大小,
//我們的參數(shù)設(shè)置的內(nèi)存是16K,你計(jì)算出來一個(gè)批次的大小也是16,申請(qǐng)的內(nèi)存也是16k
//16K 32K
if (size == this.poolableSize && size == buffer.capacity()) {
//內(nèi)存里面的東西清空
buffer.clear();
//把內(nèi)存放入到內(nèi)存池
this.free.add(buffer);
} else {
//但是如果 我們釋放的內(nèi)存的大小
//不是一個(gè)批次的大小,那就把歸為可用內(nèi)存
//等著垃圾回收即可
this.availableMemory += size;
}
(25) 發(fā)送者 源碼 消息有異常是如何處理的 27} 0:6 ?
接上
——》Sender.completeBatch()
▼ 1、如果異??梢灾匦掳l(fā)
//重新把發(fā)送失敗等著批次 加入到隊(duì)列里面。
this.accumulator.reenqueue(batch, now);
——》RecordAccumulator.reenqueue()
synchronized (deque) {
//重新放入到隊(duì)列里面
//放入到隊(duì)頭
deque.addFirst(batch);
}
▼2、如果異常不可以重新發(fā)
exception = error.exception();
batch.done(baseOffset, timestamp, exception);
——》RecordBatch.done()
//如果走這個(gè)分支的話,我們的用戶的代碼是可以捕獲到
//這個(gè)異常,如果用戶捕獲到了,做對(duì)應(yīng)的處理就可以了。
thunk.callback.onCompletion(null, exception);
(26) 發(fā)送者 源碼 如何處理超時(shí)的批次 28} 0:12
回到
——》Sender.run()
/* * 步驟六:
* 對(duì)超時(shí)的批次是如何處理的?
*/
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(
——》RecordAccumulator.abortExpiredBatches()
//迭代的看每個(gè)分區(qū)里面的每個(gè)批次
Iterator<RecordBatch> batchIterator = dq.iterator();
//判斷一下是否超時(shí)
if (batch.maybeExpire(requestTimeout,, retryBackoffMs, now, this.lingerMs, isFull))
——》RecordBatch.maybeExpire()
/**
* requestTimeoutMs:代表的是請(qǐng)求發(fā)送的超時(shí)的時(shí)間。默認(rèn)值是30.
* now:當(dāng)前時(shí)間
* lastAppendTime:批次的創(chuàng)建的時(shí)間(上一次重試的時(shí)間)
* now - this.lastAppendTime 大于30秒,說明批次超時(shí)了 還沒發(fā)送出去。
*/
//調(diào)用done方法
//方法里面?zhèn)鬟^去了一個(gè)TimeoutException的異常。(超時(shí)了)
//TODO 處理超時(shí)的批次
this.done(-1L, Record.NO_TIMESTAMP,
(27) 發(fā)送者 源碼 如何處理長(zhǎng)時(shí)間沒有接受到響應(yīng)的消息 29} 0:10
回到
—— ★》 NetworkClient.poll ()
//步驟三:處理響應(yīng),響應(yīng)里面就會(huì)有我們需要的元數(shù)據(jù)。
//TODO 處理長(zhǎng)時(shí)間沒有接受到響應(yīng)
handleTimedOutRequests(responses, updatedNow);
▼
——》InFlightRequests.getNodesWithTimedOutRequests()
//是否超時(shí),如果有主機(jī)超時(shí)了
if (timeSinceSend > requestTimeout)
//把超時(shí)的主機(jī)的信息加入到nodeIds里面
nodeIds.add(nodeId);
▲
for (String nodeId : nodeIds) {
// close connection to the node
//關(guān)閉請(qǐng)求超時(shí)的主機(jī)的連接
this.selector.close(nodeId);
processDisconnection(responses, nodeId, now);
//對(duì)這些請(qǐng)求進(jìn)行處理
//大家會(huì)看到一個(gè)比較有意思的事
//自己封裝了一個(gè)響應(yīng)。這個(gè)響應(yīng)里面沒有服務(wù)端響應(yīng)消息(服務(wù)端沒給響應(yīng))
//失去連接的狀態(tài)表標(biāo)識(shí)為true
responses.add(new ClientResponse(request, now, true, null));
—— 》 NetworkClient.processDisconnection()
//修改連接狀態(tài)
connectionStates.disconnected(nodeId, now);
—— 》 ClusterConnectionStates.disconnected()
(28) 發(fā)送者 源碼 生產(chǎn)者源碼精華總結(jié) 30} 0:10
【截圖】
