
parseOffsetSpec 根據(jù) time 配置解析出獲取 offset 的方式,earliest、lastest或者指定的時間戳
然后通過 KafkaAdminClient.listOffsets 方法獲取 offset,通過 getListOffsetsCalls 構(gòu)建了獲取 offset 的調(diào)用

getListOffsetsCalls 首先構(gòu)建 leader 節(jié)點和 topic 的關(guān)系

然后構(gòu)建了對于 leader broker 的 ListOffset 的調(diào)用

調(diào)用會被放到 AdminClientRunnable 做異步請求,在 processRequests 方法中被處理

生成 request 通過 NetworkClinet.send 發(fā)送到對應(yīng)的節(jié)點

訪問的 ApiKey 為 LIST_OFFSETS

調(diào)用 selector.send 根據(jù)目的地 id 打開對應(yīng)的 channel 發(fā)送請求


對于請求的處理在 KafkaApis.handleListOffsetRequest 中,0.x和1.x以上版本有不同的處理方式

對于 0.x 版本,調(diào)用 ReplicaManager.legacyFetchOffsetsForTimestamp 獲取 offset

調(diào)用 Partition.legacyFetchOffsetsForTimestamp,從 local log file 獲取小于制定時間的全部offset,然后把 大于 highWatermark 的 offset 丟棄后返回結(jié)果

對于 1.x 及以上版本,,調(diào)用 ReplicaManager.fetchOffsetForTimestamp 獲取 offset

最后調(diào)用 Partition.fetchOffsetForTimestamp,通過 logManager 獲取對應(yīng) partition 的 offset
