kafka圖解源碼-1發(fā)送者流程

架構(gòu)

Client
生產(chǎn)發(fā)送流程
Server:
kafka 網(wǎng)絡(luò)架構(gòu)
kafka 數(shù)據(jù)存儲(chǔ)
kafka 副本同步
kafka 元數(shù)據(jù)管理

一個(gè)demo

1、 producer核心流程:初始化對(duì)象 05}

image.png

2、producer初始化過程 06} 0:25

1:00:00-1:18:12
\kafka-0.10.1.0-src\examples\src\main\java\kafka\examples\Producer.java
構(gòu)造函數(shù)
——》Producer()
//TODO 初始化kafkaProducer
producer = new KafkaProducer<>(props);

——》KafkaProducer構(gòu)造函數(shù)((ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)

//配置clientId
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
//設(shè)置分區(qū)器
this.partitioner = config.getConfiguredInstance(
//重試時(shí)間 retry.backoff.ms 默認(rèn)100ms
long retryBackoffMs = config.getLong
//設(shè)置序列化器
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(
//設(shè)置攔截器
//類似于一個(gè)過濾器
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>
//metadata.max.age.ms(默認(rèn)5分鐘)
//生產(chǎn)者每隔一段時(shí)間都要去更新一下集群的元數(shù)據(jù)。
this.metadata = new Metadata(retryBackoffMs,
//max.request.size 生產(chǎn)者往服務(wù)端發(fā)送消息的時(shí)候,規(guī)定一條消息最大多大?
//如果你超過了這個(gè)規(guī)定消息的大小,你的消息就不能發(fā)送過去。
//默認(rèn)是1M,這個(gè)值偏小,在生產(chǎn)環(huán)境中,我們需要修改這個(gè)值。
//經(jīng)驗(yàn)值是10M。但是大家也可以根據(jù)自己公司的情況來。
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//指的是緩存大小
//默認(rèn)值是32M,這個(gè)值一般是夠用,如果有特殊情況的時(shí)候,我們可以去修改這個(gè)值。
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
//kafka是支持壓縮數(shù)據(jù)的,這兒設(shè)置壓縮格式。
//提高你的系統(tǒng)的吞吐量,你可以設(shè)置壓縮格式。
//一次發(fā)送出去的消息就更多。生產(chǎn)者這兒會(huì)消耗更多的cpu.
this.compressionType = CompressionType.forName(

//TODO 初始化了一個(gè)重要的管理網(wǎng)路的組件。
/**
* (1)connections.max.idle.ms: 默認(rèn)值是9分鐘 一個(gè)網(wǎng)絡(luò)連接最多空閑多久,超過這個(gè)空閑就關(guān)閉這個(gè)網(wǎng)絡(luò)連接。
*(2)max.in.flight.requests.per.connection:默認(rèn)是5 producer向broker發(fā)送數(shù)據(jù)的時(shí)候,其實(shí)是有多個(gè)網(wǎng)絡(luò)連接。 每個(gè)網(wǎng)絡(luò)連接可以忍受 producer端發(fā)送給broker 消息然后沒有響應(yīng)的個(gè)數(shù)。

  • 因?yàn)閗afka有重試機(jī)制,所以有可能會(huì)造成數(shù)據(jù)亂序,如果想要保證有序,這個(gè)值要把設(shè)置為1.
    *(3)send.buffer.bytes:socket發(fā)送數(shù)據(jù)的緩沖區(qū)的大小,默認(rèn)值是128K
  • (4)receive.buffer.bytes:socket接受數(shù)據(jù)的緩沖區(qū)的大小,默認(rèn)值是32K。
    */
    NetworkClient client = new NetworkClient(

//這個(gè)就是一個(gè)發(fā)送線程
/*(1) retries:重試的次數(shù)
* (2) acks:
* 0:producer發(fā)送數(shù)據(jù)到broker后,就完了,沒有返回值,不管寫成功還是寫失敗都不管了。
* 1: producer發(fā)送數(shù)據(jù)到broker后,數(shù)據(jù)成功寫入leader partition以后返回響應(yīng)。

  • -1: producer發(fā)送數(shù)據(jù)到broker后,數(shù)據(jù)要寫入到leader partition里面,并且數(shù)據(jù)同步到所有的 follower partition里面以后,才返回響應(yīng)。

*/
this.sender = new Sender(client,
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),

         //創(chuàng)建了一個(gè)線程,然后里面?zhèn)鬟M(jìn)去了一個(gè)sender對(duì)象。
        //把業(yè)務(wù)的代碼和關(guān)于線程的代碼給隔離開來。
        //關(guān)于線程的這種代碼設(shè)計(jì)的方式,其實(shí)也值得大家積累的。
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        
      //啟動(dòng)線程。
        this.ioThread.start();

(2)元數(shù)據(jù)管理 07} 0:6

1:18:00一1:25:00
入口
——》KafkaProducer((ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer)

//生產(chǎn)者從服務(wù)端那兒拉取過來的kafka的元數(shù)據(jù)。
//生產(chǎn)者要想去拉取元數(shù)據(jù), 發(fā)送網(wǎng)絡(luò)請(qǐng)求,重試,
//metadata.max.age.ms(默認(rèn)5分鐘)
//生產(chǎn)者每隔一段時(shí)間都要去更新一下集群的元數(shù)據(jù)。
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);

         //去更新元數(shù)據(jù)
        //addresses 這個(gè)地址其實(shí)就是我們寫producer代碼的時(shí)候,傳參數(shù)的時(shí)候,傳進(jìn)去了一個(gè)broker的地址。
        //所以這段代碼看起來像是去服務(wù)端拉取元數(shù)據(jù),所以我們?nèi)ヲ?yàn)證一下,是否真的去拉取元數(shù)據(jù)。
          List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
     
        //TODO update方法初始化的時(shí)候并沒有去服務(wù)端拉取元數(shù)據(jù)。
        this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());

(2.1)元數(shù)據(jù)管理2 元數(shù)據(jù)結(jié)構(gòu) 07} 0:15

java/org/apache/kafka/clients/Metadata.java
java/org/apache/kafka/common/Cluster.java
java/org/apache/kafka/common/Node.java
java/org/apache/kafka/common/PartitionInfo.java

(3.1)KafkaProducer 核心流程:發(fā)送 08} 0:14

1:25:00—1:32:00
java/kafka/examples/Producer.java
//isAsync , kafka發(fā)送數(shù)據(jù)的時(shí)候,有兩種方式
//1: 異步發(fā)送
//2: 同步發(fā)送
//isAsync: true的時(shí)候是異步發(fā)送,false就是同步發(fā)送
if (isAsync) { // Send asynchronously
//異步發(fā)送,一直發(fā)送,消息響應(yīng)結(jié)果交給回調(diào)函數(shù)處理
//這樣的方式,性能比較好,我們生產(chǎn)代碼用的就是這種方式。
producer.send(new ProducerRecord<>(topic,
messageNo,
messageStr), new DemoCallBack(startTime, messageNo, messageStr));

主類 org/apache/kafka/clients/producer/KafkaProducer.java
——》KafkaProducer.send()
——》★ KafkaProducer.doSend()八大步驟

/**
* 步驟一:
* 同步等待拉取元數(shù)據(jù)。
* maxBlockTimeMs 最多能等待多久。
/
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(),
/
*
* 步驟二:
* 對(duì)消息的key和value進(jìn)行序列化。
/
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.key());
/
*
* 步驟三:
* 根據(jù)分區(qū)器選擇消息應(yīng)該發(fā)送的分區(qū)。
*
* 因?yàn)榍懊嫖覀円呀?jīng)獲取到了元數(shù)據(jù)
* 這兒我們就可以根據(jù)元數(shù)據(jù)的信息
* 計(jì)算一下,我們應(yīng)該要把這個(gè)數(shù)據(jù)發(fā)送到哪個(gè)分區(qū)上面。
*/
int partition = partition(record, serializedKey, serializedValue, cluster);

        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        /**
         * 步驟四:
         *  確認(rèn)一下消息的大小是否超過了最大值。
         *  KafkaProdcuer初始化的時(shí)候,指定了一個(gè)參數(shù),代表的是Producer這兒最大能發(fā)送的是一條消息能有多大
         *  默認(rèn)最大是1M,我們一般都回去修改它。
         */
        ensureValidRecordSize(serializedSize);
        /**
         * 步驟五:
         *  根據(jù)元數(shù)據(jù)信息,封裝分區(qū)對(duì)象
         */
        tp = new TopicPartition(record.topic(), partition);

/**
* 步驟六:
* 給每一條消息都綁定他的回調(diào)函數(shù)。因?yàn)槲覀兪褂玫氖钱惒降姆绞桨l(fā)送的消息。
/
Callback interceptCallback = this.interceptors == null ? callback :
/
*
* 步驟七:
* 把消息放入accumulator(32M的一個(gè)內(nèi)存)
* 然后有accumulator把消息封裝成為一個(gè)批次一個(gè)批次的去發(fā)送。
/
RecordAccumulator.RecordAppendResult result = accumulator.append(
/
*
* 步驟八:
* 喚醒sender線程。他才是真正發(fā)送數(shù)據(jù)的線程。
*/
this.sender.wakeup();

(3.2)KafkaProducer核心流程2:異常體系 08 02} 0:6

異常體系構(gòu)建 ,自定義異常+ 多層拋出

(4.1 )KafkaProducer 加載元數(shù)據(jù)1 09 01} 0:15

——》★ KafkaProducer.doSend()里面
ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata

——》KafkaProducer .waitOnMetadata()
/**
* TODO 這個(gè)步驟重要
* 我們發(fā)現(xiàn)這兒去喚醒sender線程。
* 其實(shí)是因?yàn)?,拉取有拉取元?shù)據(jù)這個(gè)操作是有sender線程去完成的。
* 這個(gè)地方把線程給喚醒了以后
* 我們知道sender線程肯定就開始進(jìn)行干活了 至于怎么我們后面在繼續(xù)分析。
* 很明顯,真正去獲取元數(shù)據(jù)是這個(gè)線程完成。
*/
sender.wakeup();

(4.2)KafkaProducer 加載元數(shù)據(jù)2 09 02} 0:7

——》KafkaProducer .waitOnMetadata()
//TODO 等待元數(shù)據(jù)
//同步的等待
//等待這sender線程獲取到元數(shù)據(jù)。
metadata.awaitUpdate(version, remainingWaitMs);

(4.3)KafkaProducer 加載元數(shù)據(jù)3 09 03} 0:25

——》KafkaProducer .waitOnMetadata()
——》 sender.wakeup();

sender線程 在KafkaProducer 的 構(gòu)造函數(shù)里面
//創(chuàng)建了一個(gè)線程,然后里面?zhèn)鬟M(jìn)去了一個(gè)sender對(duì)象。
//把業(yè)務(wù)的代碼和關(guān)于線程的代碼給隔離開來。
//關(guān)于線程的這種代碼設(shè)計(jì)的方式,其實(shí)也值得大家積累的。
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
//啟動(dòng)線程。
this.ioThread.start();
——》 java/org/apache/kafka/clients/producer/internals/Sender.java
——》 run 方法
——》 void run(long now) {

//TODO 重點(diǎn)就是去看這個(gè)方法
//就是用這個(gè)方法拉取的元數(shù)據(jù)。

    /**
     * 步驟八:
     * 真正執(zhí)行網(wǎng)絡(luò)操作的都是這個(gè)NetWordClient這個(gè)組件
     * 包括:發(fā)送請(qǐng)求,接受響應(yīng)(處理響應(yīng))
     *
     * 拉取元數(shù)據(jù)信息,靠的就是這段代碼
     */
    //我們猜這兒可能就是去建立連接。
   this.client.poll(pollTimeout, now);
                              ↓

java/org/apache/kafka/clients/NetworkClient.java
—— ★》 NetworkClient.poll ()
//步驟一:封裝了一個(gè)要拉取元數(shù)據(jù)請(qǐng)求
long metadataTimeout = metadataUpdater.maybeUpdate(now);
//步驟二: 發(fā)送請(qǐng)求,進(jìn)行復(fù)雜的網(wǎng)絡(luò)操作
//但是我們目前還沒有學(xué)習(xí)到kafka的網(wǎng)絡(luò)
//所以這兒大家就只需要知道這兒會(huì)發(fā)送網(wǎng)絡(luò)請(qǐng)求。
//TODO 執(zhí)行網(wǎng)絡(luò)IO的操作。 NIO
this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
//步驟三:處理響應(yīng),響應(yīng)里面就會(huì)有我們需要的元數(shù)據(jù)。
/**
* 這個(gè)地方是我們?cè)诳瓷a(chǎn)者是如何獲取元數(shù)據(jù)的時(shí)候,看的。
* 其實(shí)Kafak獲取元數(shù)據(jù)的流程跟我們發(fā)送消息的流程是一模一樣。
* 獲取元數(shù)據(jù) -》 判斷網(wǎng)絡(luò)連接是否建立好 -》 建立網(wǎng)絡(luò)連接
* -》 發(fā)送請(qǐng)求(獲取元數(shù)據(jù)的請(qǐng)求) -》 服務(wù)端發(fā)送回來響應(yīng)(帶了集群的元數(shù)據(jù)信息)
*
*/
handleCompletedReceives(responses, updatedNow);

——》NetworkClient.DefaultMetadataUpdater.handleResponse()

(4.4)KafkaProducer 加載元數(shù)據(jù)4 流程圖 09 04} 0:7

9 KafkaPrducer拉取元數(shù)據(jù)的流程.png

(5)分區(qū)選擇邏輯 10 } 0:10

2:11:00---2:21:22
—— ★》 KafkaProducer.doSend()里面 步驟三

/**
* 步驟三:
* 根據(jù)分區(qū)器選擇消息應(yīng)該發(fā)送的分區(qū)。
* 因?yàn)榍懊嫖覀円呀?jīng)獲取到了元數(shù)據(jù)
* 這兒我們就可以根據(jù)元數(shù)據(jù)的信息
* 計(jì)算一下,我們應(yīng)該要把這個(gè)數(shù)據(jù)發(fā)送到哪個(gè)分區(qū)上面。
*/
int partition = partition(record, serializedKey, serializedValue, cluster);
——》KafkaProducer.partition()
//如果你的這個(gè)消息已經(jīng)分配了分區(qū)號(hào),那直接就用這個(gè)分區(qū)號(hào)就可以了
//但是正常情況下,消息是沒有分區(qū)號(hào)的。
//使用分區(qū)器進(jìn)行選擇合適的分區(qū)
partitioner.partition( record.topic(), record.key(),

——》DefaultPartitioner.partition()

(6.1)封裝recordAccumulator 11.01 } 0:11

2:21:00---2:26:00


11 RecordAccumulator原理.png

—— ★》 KafkaProducer.doSend()里面 步驟四、五、六、七
/**
* 步驟四:
* 確認(rèn)一下消息的大小是否超過了最大值。
* KafkaProdcuer初始化的時(shí)候,指定了一個(gè)參數(shù),代表的是Producer這兒最大能發(fā)送的是一條消息能有多大
* 默認(rèn)最大是1M,我們一般都回去修改它。
*/
ensureValidRecordSize(serializedSize);

/**
* 步驟五:
* 根據(jù)元數(shù)據(jù)信息,封裝分區(qū)對(duì)象
/
tp = new TopicPartition(record.topic(), partition);
/
*
* 步驟六:
* 給每一條消息都綁定他的回調(diào)函數(shù)。因?yàn)槲覀兪褂玫氖钱惒降姆绞桨l(fā)送的消息。
/
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
/
*
* 步驟七:
* 把消息放入accumulator(32M的一個(gè)內(nèi)存)
* 然后有accumulator把消息封裝成為一個(gè)批次一個(gè)批次的去發(fā)送。
*/
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
轉(zhuǎn)到
——》 RecordAccumulator.append()

(6.2)封裝recordAccumulator 2 11.02 } 0:15

——》 RecordAccumulator.append()詳解7大步驟
/**

  • 步驟一:先根據(jù)分區(qū)找到應(yīng)該插入到哪個(gè)隊(duì)列里面。
  • 如果有已經(jīng)存在的隊(duì)列,那么我們就使用存在隊(duì)列
  • 如果隊(duì)列不存在,那么我們新創(chuàng)建一個(gè)隊(duì)列
    Deque<RecordBatch> dq = getOrCreateDeque(tp);

/**
* 步驟二:
* 嘗試往隊(duì)列里面的批次里添加數(shù)據(jù)
* 一開始添加數(shù)據(jù)肯定是失敗的,我們目前只是以后了隊(duì)列
* 數(shù)據(jù)是需要存儲(chǔ)在批次對(duì)象里面(這個(gè)批次對(duì)象是需要分配內(nèi)存的)
* 我們目前還沒有分配內(nèi)存,所以如果按場(chǎng)景驅(qū)動(dòng)的方式,
* 代碼第一次運(yùn)行到這兒其實(shí)是不成功的。
*/
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);

/**
* 步驟三:計(jì)算一個(gè)批次的大小
* 在消息的大小和批次的大小之間取一個(gè)最大值,用這個(gè)值作為當(dāng)前這個(gè)批次的大小。
* 有可能我們的一個(gè)消息的大小比一個(gè)設(shè)定好的批次的大小還要大。
* 默認(rèn)一個(gè)批次的大小是16K。
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));

/**
* 步驟四:
* 根據(jù)批次的大小去分配內(nèi)存
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);

/**
* 步驟五:
* 嘗試把數(shù)據(jù)寫入到批次里面。
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);

/**
* 步驟六:
* 根據(jù)內(nèi)存大小封裝批次
* 線程一到這兒 會(huì)根據(jù)內(nèi)存封裝出來一個(gè)批次。
*/
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());

/**
* 步驟七:
* 把這個(gè)批次放入到這個(gè)隊(duì)列的隊(duì)尾
* 線程一把批次添加到隊(duì)尾
*/
dq.addLast(batch);
incomplete.add(batch);

(7) 讀寫分離 copyOnwrite數(shù)據(jù)結(jié)構(gòu)使用 12} 0:26

2:26:00---2:41:00
入口
——》 RecordAccumulator.append()
——》 getOrCreateDeque()
Deque<RecordBatch> d = this.batches.get(tp);
//把這個(gè)空的隊(duì)列存入batches 這個(gè)數(shù)據(jù)結(jié)構(gòu)里面
Deque<RecordBatch> previous = this.batches.putIfAbsent(tp, d);

batches 是 RecordAccumulator 的 成員變量
ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;

在RecordAccumulator構(gòu)造函數(shù)里面賦值
this.batches = new CopyOnWriteMap<>();

CopyOnWriteMap是自定義類

——》CopyOnWriteMap.putIfAbsent()

//如果我們傳進(jìn)來的這個(gè)key不存在
if (!containsKey(k))
//那么就調(diào)用里面內(nèi)部的put方法
return put(k, v);
else
//返回結(jié)果
return get(k);
}

//新的內(nèi)存空間
//讀寫分離
//往新的內(nèi)存空間里面插入
//讀,讀數(shù)據(jù)就老讀空間里面去
Map<K, V> copy = new HashMap<K, V>(this.map);
//插入數(shù)據(jù)
V prev = copy.put(k, v);
//賦值給map
this.map = Collections.unmodifiableMap(copy);

(8) 數(shù)據(jù)寫入對(duì)應(yīng)批次(分段加鎖) 13} 0:21

2:41:42-----2:53:11

13 把數(shù)據(jù)寫入批次.png

入口
——》 RecordAccumulator.append()

    分段加鎖    步驟1~ 6   可以最大并發(fā) 

synchronized (dq) {

}

(9) 發(fā)送者內(nèi)存池設(shè)計(jì) 14} 0:28

14 內(nèi)存池.png

人口——》 RecordAccumulator.append()
* 步驟四:
* 根據(jù)批次的大小去分配內(nèi)存
——》BufferPool.allocate()
/**
*
* 總的分配的思路,可能一下子分配不了這么大的內(nèi)存,但是可以先有點(diǎn)分配一點(diǎn)。
*
*/
//如果分配的內(nèi)存的大小 還是沒有要申請(qǐng)的內(nèi)存大小大。
//內(nèi)存池就會(huì)一直分配的內(nèi)存,一點(diǎn)一點(diǎn)的去分配。
//等著別人會(huì)釋放內(nèi)存。

            //accumulated 5K+16K=21K 16K
            // size 32K
            while (accumulated < size) {

(9) 發(fā)送者線程sender詳解1 15} 0:15

入口 KafkaProducer 構(gòu)造函數(shù)

//這個(gè)就是一個(gè)線程
this.sender = new Sender(client,

——》Sender.run(long now) 方法 , 8個(gè)步驟

  • 步驟一:(上面章節(jié)講了)
  •  獲取元數(shù)據(jù)
    

Cluster cluster = metadata.fetch();

  • 步驟二:首先是判斷哪些partition有消息可以發(fā)送:
    * 獲取到這個(gè)partition的leader partition對(duì)應(yīng)的broker主機(jī)(根據(jù)元數(shù)據(jù)信息來就可以了)
    * 哪些broker上面需要我們?nèi)グl(fā)送消息?
    */
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

/**
* 步驟三:
* 標(biāo)識(shí)還沒有拉取到元數(shù)據(jù)的topic
*/
if (!result.unknownLeaderTopics.isEmpty()) {

       /**
          * 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
         */
            if (!this.client.ready(node, now)) { 
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }

/**
* 步驟五:
*
* 我們有可能要發(fā)送的partition有很多個(gè),
* 很有可能有一些partition的leader partition是在同一臺(tái)服務(wù)器上面。

  • 按照broker進(jìn)行分組,同一個(gè)broker的partition為同一組
    * 0:{p0,p1} -> 批次
    * 1:{p2}
    * 2:{p3}

/**
* 步驟六:
* 對(duì)超時(shí)的批次是如何處理的?
*
/
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
/
*
* 步驟七:
* 創(chuàng)建發(fā)送消息的請(qǐng)求
* 我們往partition上面去發(fā)送消息的時(shí)候,有一些partition他們?cè)谕慌_(tái)服務(wù)器上面
* ,如果我們一分區(qū)一個(gè)分區(qū)的發(fā)送我們網(wǎng)絡(luò)請(qǐng)求,那網(wǎng)絡(luò)請(qǐng)求就會(huì)有一些頻繁
* 我們要知道,我們集群里面網(wǎng)絡(luò)資源是非常珍貴的。
* 會(huì)把發(fā)往同個(gè)broker上面partition的數(shù)據(jù) 組合成為一個(gè)請(qǐng)求。
* 然后統(tǒng)一一次發(fā)送過去,這樣子就減少了網(wǎng)絡(luò)請(qǐng)求。
*/
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
//如果網(wǎng)絡(luò)連接沒有建立好 batches其實(shí)是為空。
//也就說其實(shí)這段代碼也是不會(huì)執(zhí)行。

    /**
     * 步驟八:
     * 真正執(zhí)行網(wǎng)絡(luò)操作的都是這個(gè)NetWordClient這個(gè)組件
     * 包括:發(fā)送請(qǐng)求,接受響應(yīng)(處理響應(yīng))
     * 拉取元數(shù)據(jù)信息,靠的就是這段代碼
     */
    //我們猜這兒可能就是去建立連接。
    this.client.poll(pollTimeout, now);

(10) 發(fā)送者線程sender詳解2 batch什么條件下發(fā)送? 16} 0:25

——》Sender.run(long now) 方法 里面

 /**
     * 步驟二:
     *      首先是判斷哪些partition有消息可以發(fā)送:
     *        我們看一下一個(gè)批次可以發(fā)送出去的條件
     *      獲取到這個(gè)partition的leader partition對(duì)應(yīng)的broker主機(jī)(根據(jù)元數(shù)據(jù)信息來就可以了)
     *      哪些broker上面需要我們?nèi)グl(fā)送消息?
     */
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

——》RecordAccumulator.ready()

//waiters里面有數(shù)據(jù),說明我們的這個(gè)內(nèi)存池里面內(nèi)存不夠了。
//如果exhausted的值等于true,說明內(nèi)存池里面的內(nèi)存不夠用了。
boolean exhausted = this.free.queued() > 0;

//首先從隊(duì)列的隊(duì)頭獲取到批次
RecordBatch batch = deque.peekFirst();
//如果這個(gè)catch不null,我們判斷一下是否可以發(fā)送這個(gè)批次。
if (batch != null) {

(11) 發(fā)送者線程sender詳解3 篩選可以發(fā)送消息的broker(1) 17 1} 0:16

——》Sender.run(long now) 方法 里面

 /**
     * 步驟三:
     *      標(biāo)識(shí)還沒有拉取到元數(shù)據(jù)的topic
     */
    if (!result.unknownLeaderTopics.isEmpty()) {
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);
        this.metadata.requestUpdate();
    }

/**
* 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
*/
if (!this.client.ready(node, now)) {

(12) 發(fā)送者線程sender詳解3 篩選可以發(fā)送消息的broker(2) 17 2} 0:16

17 篩選可以發(fā)送消息的broker.png

(13) 發(fā)送者 源碼之kafka網(wǎng)絡(luò)設(shè)計(jì) 18 1 2 3} 0:10 0:5 0:8

——》Sender.run(long now) 方法 里面

/**
* 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
*/
if (!this.client.ready(node, now)) {

——》NetworkClient.ready()
——》NetworkClient. initiateConnect()
//TODO 嘗試建立連接
selector.connect(nodeConnectionId

1 ——》Selector類 各成員變量 詳解

* TODO 這個(gè)selector是kafka主機(jī)封裝的一個(gè)selector
* 他是基于java NIO里面的selector去封裝的。
//這個(gè)對(duì)象就是javaNIO里面的Selector
//Selector是負(fù)責(zé)網(wǎng)絡(luò)的建立,發(fā)送網(wǎng)絡(luò)請(qǐng)求,處理實(shí)際的網(wǎng)絡(luò)IO。
//所以他是最最核心的這么樣的一個(gè)組件。
private final java.nio.channels.Selector nioSelector;
//broker 和 KafkaChannel(SocketChnnel)的映射
//這兒的kafkaChannel大家暫時(shí)可以理解為就是SocketChannel
//代表的就是一個(gè)網(wǎng)絡(luò)連接。
private final Map<String, KafkaChannel> channels;

//已經(jīng)完成發(fā)送的請(qǐng)求
private final List<Send> completedSends;
//已經(jīng)接收到的,并且處理完了的響應(yīng)。
private final List<NetworkReceive> completedReceives;
//已經(jīng)接收到了,但是還沒來得及處理的響應(yīng)。
//一個(gè)連接,對(duì)應(yīng)一個(gè)響應(yīng)隊(duì)列
private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
private final Set<SelectionKey> immediatelyConnectedKeys;
//沒有建立連接的主機(jī)
private final List<String> disconnected;
//完成建立連接的主機(jī)
private final List<String> connected;
//建立連接失敗的主機(jī)。
private final List<String> failedSends;

2 ——》KafkaChannel類 各成員變量 詳解

 //一個(gè)broker就對(duì)應(yīng)一個(gè)KafkaChannel
//這個(gè)id就是broker的id
private final String id;

//接收到的響應(yīng)
private NetworkReceive receive;
//發(fā)送出去的請(qǐng)求
private Send send;

//我們推測(cè)這個(gè)里面應(yīng)該會(huì)有SocketChannel
private final TransportLayer transportLayer;
                                      ▼
      * returns underlying socketChannel
     * 這個(gè)核心的組件,就是javaNIO里面的SocketChannel 
                      SocketChannel socketChannel();
18 Kafka的網(wǎng)絡(luò)設(shè)計(jì).png

(14) 發(fā)送者 源碼 Selector.connect 與broker建立連接 20 1 } 0:13

——》Sender.run(long now) 方法 里面

/**
* 步驟四:檢查與要發(fā)送數(shù)據(jù)的主機(jī)的網(wǎng)絡(luò)是否已經(jīng)建立好。
*/
if (!this.client.ready(node, now)) {

——》NetworkClient.ready()
——》NetworkClient. initiateConnect()
//TODO 嘗試建立連接
selector.connect(nodeConnectionId

    ——》Selector.connect()
                                        ▼

//獲取到SocketChannel
SocketChannel socketChannel = SocketChannel.open();
//設(shè)置為非阻塞的模式
socketChannel.configureBlocking(false);

//設(shè)置一些參數(shù)
//這些網(wǎng)絡(luò)的參數(shù),我們之前在分析Producer的時(shí)候給大家看過
//都有一些默認(rèn)值。
//這個(gè)的默認(rèn)值是false,代表要開啟Nagle的算法
//它會(huì)把網(wǎng)絡(luò)中的一些小的數(shù)據(jù)包收集起來,組合成一個(gè)大的數(shù)據(jù)包
//然后再發(fā)送出去。因?yàn)樗J(rèn)為如果網(wǎng)絡(luò)中有大量的小的數(shù)據(jù)包在傳輸
//其實(shí)是會(huì)影響網(wǎng)絡(luò)擁塞。

    //kafka一定不能把這兒設(shè)置為false,因?yàn)槲覀冇行r(shí)候可能有些數(shù)據(jù)包就是比較
    //小,他這兒就不幫我們發(fā)送了,顯然是不合理的。
    socket.setTcpNoDelay(true);

//嘗試去服務(wù)器去連接。
//因?yàn)檫@兒非阻塞的
//有可能就立馬連接成功,如果成功了就返回true
//也有可能需要很久才能連接成功,返回false。
connected = socketChannel.connect(address);

//SocketChannel往Selector上注冊(cè)了一個(gè)OP_CONNECT
SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
//根據(jù)根據(jù)SocketChannel 封裝出來一個(gè)KafkaChannel
KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
//把key和KafkaChannel關(guān)聯(lián)起來
//后面使用起來會(huì)比較方便
//我們可以根據(jù)key就找到KafkaChannel
//也可以根據(jù)KafkaChannel找到key
key.attach(channel);
//緩存起來了
this.channels.put(id, channel);
//所以正常情況下,這兒網(wǎng)絡(luò)不能完成連接。
//如果這兒不能完成連接。大家猜一下
//kafka會(huì)在哪兒完成網(wǎng)絡(luò)最后的連接呢?

//如果里面就連接上了
if (connected) {
// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", channel.id());
immediatelyConnectedKeys.add(key);
// 取消前面注冊(cè) OP_CONNECT 事件。
key.interestOps(0);
}

(15) 發(fā)送者 源碼 Selector.connect 與broker建立連接2 完成最后的 連接 20 2 } 0:10

——》Sender.run(long now) 方法 里面

/**
* 步驟八:
* 真正執(zhí)行網(wǎng)絡(luò)操作的都是這個(gè)NetWordClient這個(gè)組件

  • 包括:發(fā)送請(qǐng)求,接受響應(yīng)(處理響應(yīng))
    * 拉取元數(shù)據(jù)信息,靠的就是這段代碼
    //我們猜這兒可能就是去建立連接。
    this.client.poll(pollTimeout, now);

    ——》NetworkClient.poll();

    //步驟二: 發(fā)送請(qǐng)求,進(jìn)行復(fù)雜的網(wǎng)絡(luò)操作
    //但是我們目前還沒有學(xué)習(xí)到kafka的網(wǎng)絡(luò)
    //所以這兒大家就只需要知道這兒會(huì)發(fā)送網(wǎng)絡(luò)請(qǐng)求。
    //TODO 執(zhí)行網(wǎng)絡(luò)IO的操作。 NIO
    this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));

    ——》Selector.poll()

    //從Selector上找到有多少個(gè)key注冊(cè)了
    int readyKeys = select(timeout);
    //因?yàn)槲覀冇脠?chǎng)景驅(qū)動(dòng)的方式
    //我們剛剛確實(shí)是注冊(cè)了一個(gè)key
    if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
    //立馬就要對(duì)這個(gè)Selector上面的key要進(jìn)行處理。
    pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);

       ——》Selector.pollSelectionKeys()
                                          ▼
    

//遍歷所有的key
while (iterator.hasNext()) {
//根據(jù)key找到對(duì)應(yīng)的KafkaChannel
KafkaChannel channel = channel(key);

          /**
             *
             * 我們代碼第一次進(jìn)來應(yīng)該要走的是這兒分支,因?yàn)槲覀兦懊孀?cè)的是
             * SelectionKey key = socketChannel.register(nioSelector,
             * SelectionKey.OP_CONNECT);
             *
             */
            if (isImmediatelyConnected || key.isConnectable()) {

                  //TODO 核心的代碼
                //去最后完成網(wǎng)絡(luò)的連接
                //如果我們之前初始化的時(shí)候,沒有完成網(wǎng)絡(luò)連接的話,這兒一定會(huì)幫你
                //完成網(wǎng)絡(luò)的連接。
      1:》     if (channel.finishConnect()) {
                    //網(wǎng)絡(luò)連接已經(jīng)完成了以后,就把這個(gè)channel存儲(chǔ)到
                    this.connected.add(channel.id());

(16) 發(fā)送者 源碼 Selector.connect 與broker建立連接3 連接示意圖 20 3 } 0:17

image.png

(17) 發(fā)送者 源碼產(chǎn)生者 發(fā)送網(wǎng)絡(luò)請(qǐng)求了 21 1 } 0:20

接上期 1:》
//完成網(wǎng)絡(luò)的連接。
if (channel.finishConnect()) {
——》PlaintextTransportLayer.finishConnect()

//完成的最后的網(wǎng)絡(luò)的連接
boolean connected = socketChannel.finishConnect();
//如果連接完成了以后。
if (connected)
//取消了OP_CONNECT事件
//增加了OP_READ 事件 我們這兒的這個(gè)key對(duì)應(yīng)的KafkaChannel是不是就可以接受服務(wù)
//端發(fā)送回來的響應(yīng)了。
key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT | SelectionKey.OP_READ);

回到 ——》Sender.run(long now) 方法 里面 步驟4 ,現(xiàn)在網(wǎng)絡(luò)已經(jīng)建立連接
進(jìn)入下面
/**
* 步驟七:
* 創(chuàng)建發(fā)送消息的請(qǐng)求
* 我們往partition上面去發(fā)送消息的時(shí)候,有一些partition他們?cè)谕慌_(tái)服務(wù)器上面
* ,如果我們一分區(qū)一個(gè)分區(qū)的發(fā)送我們網(wǎng)絡(luò)請(qǐng)求,那網(wǎng)絡(luò)請(qǐng)求就會(huì)有一些頻繁
* 我們要知道,我們集群里面網(wǎng)絡(luò)資源是非常珍貴的。
* 會(huì)把發(fā)往同個(gè)broker上面partition的數(shù)據(jù) 組合成為一個(gè)請(qǐng)求。
* 然后統(tǒng)一一次發(fā)送過去,這樣子就減少了網(wǎng)絡(luò)請(qǐng)求。
*/
List<ClientRequest> requests = createProduceRequests(batches, now);
//TODO 發(fā)送請(qǐng)求的操作
for (ClientRequest request : requests)
//綁定 op_write
client.send(request, now);

——》 NetworkClient..send()
——》NetworkClient..doSend()

//這兒往inFlightRequests 組件里存 Request請(qǐng)求。
//存儲(chǔ)的就是還沒有收到響應(yīng)的請(qǐng)求。
//這個(gè)里面默認(rèn)最多能存5個(gè)請(qǐng)求。
//其實(shí)我們可以猜想一個(gè)事,如果我們的請(qǐng)求發(fā)送出去了
//然后也成功的接受到了響應(yīng),后面就會(huì)到這兒把這個(gè)請(qǐng)求移除。
this.inFlightRequests.add(request);
//TODO
selector.send(request.request());

——》Selector.send()
//獲取到一個(gè)KafakChannel
KafkaChannel channel = channelOrFail(send.destination());
try {
//TODO
channel.setSend(send);
——》KafkaChannel .setSend()

//關(guān)鍵的代碼來了
//這兒綁定了一個(gè)OP_WRITE事件。
//一旦綁定了這個(gè)事件以后,我們就可以往服務(wù)端發(fā)送請(qǐng)求了。
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);

selector事件.png

回到 ——》Sender.run(long now) 方法 里面 步驟8 ,
this.client.poll(pollTimeout, now);

——》NetworkClient.poll();

——》Selector.poll()

//因?yàn)槲覀冇脠?chǎng)景驅(qū)動(dòng)的方式
//我們剛剛確實(shí)是注冊(cè)了一個(gè)key
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
//立馬就要對(duì)這個(gè)Selector上面的key要進(jìn)行處理。
pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);

——》Selector. pollSelectionKeys()
                                  ▼

//核心代碼,處理發(fā)送請(qǐng)求的事件
//selector 注冊(cè)了一個(gè)OP_WRITE
//selector 注冊(cè)了一個(gè)OP_READ
if (channel.ready() && key.isWritable()) {
//獲取到我們要發(fā)送的那個(gè)網(wǎng)絡(luò)請(qǐng)求。
//是這句代碼就是要往服務(wù)端發(fā)送數(shù)據(jù)了。
//TODO:服務(wù)端
//里面我們發(fā)現(xiàn)如果消息被發(fā)送出去了,然后移除OP_WRITE
Send send = channel.write();
//已經(jīng)完成響應(yīng)消息的發(fā)送
if (send != null) {
this.completedSends.add(send);
this.sensors.recordBytesSent(channel.id(), send.size());
}

(18) 發(fā)送者 源碼 產(chǎn)生者 發(fā)送網(wǎng)絡(luò)請(qǐng)求2 21 2 } 0:10

流程圖圖示如下:


21 終于發(fā)送網(wǎng)絡(luò)請(qǐng)求了!.png

(19) 發(fā)送者 源碼 producer是如何處理粘包問題的1 22 1} 0:5

粘包: 一個(gè)請(qǐng)求里面有多個(gè)響應(yīng),多個(gè)消息粘在一起回來


粘包.png

轉(zhuǎn)到這里
——》NetworkClient..doSend()

//這兒往inFlightRequests 隊(duì)列組件里存 Request請(qǐng)求。
//存儲(chǔ)的就是還沒有收到響應(yīng)的請(qǐng)求。
//這個(gè)里面默認(rèn)最多能存5個(gè)請(qǐng)求。
//其實(shí)我們可以猜想一個(gè)事,如果我們的請(qǐng)求發(fā)送出去了
//然后也成功的接受到了響應(yīng),后面就會(huì)到這兒把這個(gè)請(qǐng)求移除。
this.inFlightRequests.add(request);

inFlightRequests 類里面有 Map 成員變量
private final Map<String, Deque<ClientRequest>> requests = new HashMap<>();

(20) 發(fā)送者 源碼 producer是如何處理粘包問題的2 22 2} 0:12

接上面:發(fā)送代碼
——》Selector. pollSelectionKeys()

//里面不斷的讀取數(shù)據(jù),讀取數(shù)據(jù)的代碼我們之前就已經(jīng)分析過
//里面還涉及到粘包和拆包的一些問題。
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);

——》NetworkReceive.readFromReadableChannel()

//先讀取4字節(jié)的數(shù)據(jù),(代表的意思就是后面跟著的消息體的大?。?br> int bytesRead = channel.read(size);

  //一直要讀取到當(dāng)這個(gè)size沒有剩余空間
        //說明已經(jīng)讀取到了一個(gè)4字節(jié)的int類型的數(shù)了。
        if (!size.hasRemaining()) {

//分配一個(gè)內(nèi)存空間,這個(gè)內(nèi)存空間的大小
//就是剛剛讀出來的那個(gè)4字節(jié)的int的大小。
//10
this.buffer = ByteBuffer.allocate(receiveSize);

if (buffer != null) {
//去讀取數(shù)據(jù)
int bytesRead = channel.read(buffer);

?? 如何判斷讀完了消息呢? 回到
while ((networkReceive = channel.read()) != null)
——》KafkaChannel.read()

//一直在讀取數(shù)據(jù)。
receive(receive);
//是否讀完一個(gè)完整的響應(yīng)消息
if (receive.complete()) {

——》NetworkReceive.complete()
//size 沒有剩余空間(50) &&
return !size.hasRemaining() && !buffer.hasRemaining();

(21) 發(fā)送者 源碼 producer是如何處理拆包問題 23 } 0:5

拆包:


拆包.png

和粘包類似
——》NetworkReceive.readFromReadableChannel()

if (buffer != null) {
//去讀取數(shù)據(jù)
int bytesRead = channel.read(buffer);

?? 如何判斷讀完了消息呢? 回到
while ((networkReceive = channel.read()) != null)
——》KafkaChannel.read()

//一直在讀取數(shù)據(jù)。
receive(receive);
//是否讀完一個(gè)完整的響應(yīng)消息
if (receive.complete()) {

(22) 發(fā)送者 源碼 如何處理暫存狀態(tài)的響應(yīng)消息 24 } 0:32

——》Sender.run(long now) 方法入口
——》NetworkClient.poll()
——》NetworkClient.handleCompletedReceives()

24 響應(yīng)消息的流轉(zhuǎn).png

(23) 發(fā)送者 源碼 響應(yīng)消息處理流程 25 } 0:25

——》Sender.run(long now) 方法入口
——》NetworkClient.poll()

//步驟三:處理響應(yīng),響應(yīng)里面就會(huì)有我們需要的元數(shù)據(jù)。
/**
* 這個(gè)地方是我們?cè)诳瓷a(chǎn)者是如何獲取元數(shù)據(jù)的時(shí)候,看的。
* 其實(shí)Kafak獲取元數(shù)據(jù)的流程跟我們發(fā)送消息的流程是一模一樣。
* 獲取元數(shù)據(jù) -》 判斷網(wǎng)絡(luò)連接是否建立好 -》 建立網(wǎng)絡(luò)連接
* -》 發(fā)送請(qǐng)求(獲取元數(shù)據(jù)的請(qǐng)求) -》 服務(wù)端發(fā)送回來響應(yīng)(帶了集群的元數(shù)據(jù)信息)
*
*/

//調(diào)用的響應(yīng)的里面的我們之前發(fā)送出去的請(qǐng)求的回調(diào)函數(shù)
//看到了這兒,我們回頭再去看一下
//我們當(dāng)時(shí)發(fā)送請(qǐng)求的時(shí)候,是如何封裝這個(gè)請(qǐng)求。
//不過雖然目前我們還沒看到,但是我們可以大膽猜一下。
//當(dāng)時(shí)封裝網(wǎng)絡(luò)請(qǐng)求的時(shí)候,肯定是給他綁定了一個(gè)回調(diào)函數(shù)。
response.request().callback().onComplete(response);

回調(diào)函數(shù)在封裝時(shí)發(fā)送
——》Sender.produceRequest()

public void onComplete(ClientResponse response) {
//回調(diào)函數(shù)要是被調(diào)用
//其實(shí)就是這個(gè)方法被執(zhí)行了。
handleProduceResponse(response, recordsByPartition, time.milliseconds());
}

   ——》Sender.handleProduceResponse()回調(diào)函數(shù)
                                          ▼ 

//如果處理成功那就是成功了,但是如果服務(wù)端那兒處理失敗了
//是不是也要給我們發(fā)送回來異常的信息。
//error 這個(gè)里面存儲(chǔ)的就是服務(wù)端發(fā)送回來的異常碼
Errors error = Errors.forCode(partResp.errorCode);
//獲取到當(dāng)前分區(qū)的響應(yīng)。
RecordBatch batch = batches.get(tp);
//對(duì)響應(yīng)進(jìn)行處理
completeBatch(batch, error, partResp.baseOffset, partResp.timestamp, correlationId, now);

 ——》Sender.completeBatch()
                                          ▼ 

//如果響應(yīng)里面帶有異常 并且 這個(gè)請(qǐng)求是可以重試的
if (error != Errors.NONE && canRetry(batch, error)) {
// retry
} else {
//TODO 這兒過來的數(shù)據(jù):(1)帶有異常,但是不可以重試(1:壓根就不讓重試2:重試次數(shù)超了。
// (2)沒有異常
//TODO 核心代碼 把異常的信息也給帶過去了
//我們剛剛看的就是這兒的代碼
//里面調(diào)用了用戶傳進(jìn)來的回調(diào)函數(shù)
//回調(diào)函數(shù)調(diào)用了以后
//說明我們的一個(gè)完整的消息的發(fā)送流程就結(jié)束了。
batch.done(baseOffset, timestamp, exception);
//看起來這個(gè)代碼就是要回收資源的。
this.accumulator.deallocate(batch);

——》RecordBatch.done()

//如果沒有異常
if(exception == null){
//調(diào)用我們發(fā)送的消息的回調(diào)函數(shù)
//大家還記不記得我們?cè)诎l(fā)送數(shù)據(jù)的時(shí)候
//還不是綁定了一個(gè)回調(diào)函數(shù)。
//這兒說的調(diào)用的回調(diào)函數(shù)
//就是我們開發(fā),生產(chǎn)者代碼的時(shí)候,我們用戶傳進(jìn)去的那個(gè)
//回調(diào)函數(shù)。
thunk.callback.onCompletion(metadata, null);//帶過去的就是沒有異常
//也就是說我們生產(chǎn)者那兒的代碼,捕獲異常的時(shí)候就是發(fā)現(xiàn)沒有異常。
} else {
//如果有異常就會(huì)把異常傳給回調(diào)函數(shù)。
//由我們用戶自己去捕獲這個(gè)異常。
//然后對(duì)這個(gè)異常進(jìn)行處理
//大家根據(jù)自己公司的業(yè)務(wù)規(guī)則進(jìn)行處理就可以了。
//如果走這個(gè)分支的話,我們的用戶的代碼是可以捕獲到timeoutexception
//這個(gè)異常,如果用戶捕獲到了,做對(duì)應(yīng)的處理就可以了。
thunk.callback.onCompletion(null, exception);

(24) 發(fā)送者 源碼 消息發(fā)送完了以后內(nèi)存如何處理 26} 0:6

接上
——》Sender.completeBatch()

batch.done(baseOffset, timestamp, exception);
//看起來這個(gè)代碼就是要回收資源的。
this.accumulator.deallocate(batch);
——》RecordAccumulator.deallocate()

//從某個(gè)數(shù)據(jù)結(jié)構(gòu)里面移除 已經(jīng)成功處理的批次
incomplete.remove(batch);
//釋放內(nèi)存,回收到內(nèi)存池里面
free.deallocate(batch.records.buffer(), batch.records.initialCapacity());

      ——》BufferPool.deallocate()
                                  ▼ 

//如果你還回來的內(nèi)存的大小 就等于一個(gè)批次的大小,
//我們的參數(shù)設(shè)置的內(nèi)存是16K,你計(jì)算出來一個(gè)批次的大小也是16,申請(qǐng)的內(nèi)存也是16k
//16K 32K
if (size == this.poolableSize && size == buffer.capacity()) {
//內(nèi)存里面的東西清空
buffer.clear();
//把內(nèi)存放入到內(nèi)存池
this.free.add(buffer);
} else {
//但是如果 我們釋放的內(nèi)存的大小
//不是一個(gè)批次的大小,那就把歸為可用內(nèi)存
//等著垃圾回收即可
this.availableMemory += size;
}

(25) 發(fā)送者 源碼 消息有異常是如何處理的 27} 0:6 ?

接上
——》Sender.completeBatch()
▼ 1、如果異??梢灾匦掳l(fā)
//重新把發(fā)送失敗等著批次 加入到隊(duì)列里面。
this.accumulator.reenqueue(batch, now);
——》RecordAccumulator.reenqueue()

         synchronized (deque) {
             //重新放入到隊(duì)列里面
               //放入到隊(duì)頭
            deque.addFirst(batch);
        }

▼2、如果異常不可以重新發(fā)
exception = error.exception();
batch.done(baseOffset, timestamp, exception);

——》RecordBatch.done()
//如果走這個(gè)分支的話,我們的用戶的代碼是可以捕獲到
//這個(gè)異常,如果用戶捕獲到了,做對(duì)應(yīng)的處理就可以了。
thunk.callback.onCompletion(null, exception);

(26) 發(fā)送者 源碼 如何處理超時(shí)的批次 28} 0:12

回到
——》Sender.run()
/* * 步驟六:
* 對(duì)超時(shí)的批次是如何處理的?
*/
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(

——》RecordAccumulator.abortExpiredBatches()

//迭代的看每個(gè)分區(qū)里面的每個(gè)批次
Iterator<RecordBatch> batchIterator = dq.iterator();
//判斷一下是否超時(shí)

                    if (batch.maybeExpire(requestTimeout,, retryBackoffMs, now, this.lingerMs, isFull))

——》RecordBatch.maybeExpire()
/**
* requestTimeoutMs:代表的是請(qǐng)求發(fā)送的超時(shí)的時(shí)間。默認(rèn)值是30.
* now:當(dāng)前時(shí)間
* lastAppendTime:批次的創(chuàng)建的時(shí)間(上一次重試的時(shí)間)
* now - this.lastAppendTime 大于30秒,說明批次超時(shí)了 還沒發(fā)送出去。
*/
//調(diào)用done方法
//方法里面?zhèn)鬟^去了一個(gè)TimeoutException的異常。(超時(shí)了)
//TODO 處理超時(shí)的批次
this.done(-1L, Record.NO_TIMESTAMP,

(27) 發(fā)送者 源碼 如何處理長(zhǎng)時(shí)間沒有接受到響應(yīng)的消息 29} 0:10

回到
—— ★》 NetworkClient.poll ()

//步驟三:處理響應(yīng),響應(yīng)里面就會(huì)有我們需要的元數(shù)據(jù)。
//TODO 處理長(zhǎng)時(shí)間沒有接受到響應(yīng)
handleTimedOutRequests(responses, updatedNow);

——》InFlightRequests.getNodesWithTimedOutRequests()
//是否超時(shí),如果有主機(jī)超時(shí)了
if (timeSinceSend > requestTimeout)
//把超時(shí)的主機(jī)的信息加入到nodeIds里面
nodeIds.add(nodeId);

for (String nodeId : nodeIds) {
// close connection to the node
//關(guān)閉請(qǐng)求超時(shí)的主機(jī)的連接
this.selector.close(nodeId);
processDisconnection(responses, nodeId, now);
//對(duì)這些請(qǐng)求進(jìn)行處理
//大家會(huì)看到一個(gè)比較有意思的事
//自己封裝了一個(gè)響應(yīng)。這個(gè)響應(yīng)里面沒有服務(wù)端響應(yīng)消息(服務(wù)端沒給響應(yīng))
//失去連接的狀態(tài)表標(biāo)識(shí)為true
responses.add(new ClientResponse(request, now, true, null));
—— 》 NetworkClient.processDisconnection()
//修改連接狀態(tài)
connectionStates.disconnected(nodeId, now);
—— 》 ClusterConnectionStates.disconnected()

(28) 發(fā)送者 源碼 生產(chǎn)者源碼精華總結(jié) 30} 0:10

【截圖】


image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容