kafka 協(xié)議分析 (一) 基礎(chǔ)篇
kafka 協(xié)議分析 (二) Produce API
kafka 協(xié)議分析 (三) Fetch API
上一篇文章介紹了如何生產(chǎn)消息,本篇將介紹一下如何從Kafka消費(fèi)消息
一個(gè)最簡單的消息消費(fèi)流程如下圖:

image.png
- 先檢查服務(wù)器支持的API版本,并從中選擇一個(gè)合適的使用。ApiVersions API
- 獲取目標(biāo)Topic的MetaData,主要包括Topic的partition數(shù)量和leader地址。Metadata API
- 獲取目標(biāo)Topic的Offset情況(第一個(gè)offset和最后一個(gè)offset在哪兒)ListOffsets API
- 從kafka獲取消息Fetch API
其中第1、2步已經(jīng)在上一篇文章詳細(xì)介紹過。
ListOffsets API (Key: 2)
ListOffsets API用來獲取Topic當(dāng)前的offset情況。按照kafka的設(shè)計(jì),拉取消息(Fetch API)需要提供拉取消息的目標(biāo)位置(offset)。所以真正拉取消息之前,需要事先知道topic的最后一個(gè)offset和第一個(gè)offset是什么。(kafka的消息有超時(shí)時(shí)間,默認(rèn)一周消息就會(huì)被清理,這意味著topic中現(xiàn)存消費(fèi)的offset不一定從0開始)

image.png
| 域 | 值 | 描述 |
|---|---|---|
| Size | 40 | 消息長度 |
| Api Key | 1 | Api |
| Api Version | 0 | Api版本 |
| Correlation ID | 1 | 請(qǐng)求ID,服務(wù)器會(huì)將這個(gè)ID原樣返回 |
| Replica Id | -1 | follower的broker id,通常設(shè)置-1 |
| Topic Len | 1 | Topic個(gè)數(shù) |
| Topic | w1 | Topic名 |
| Partition Len | 1 | Partiton個(gè)數(shù) |
| Partition | 0 | Partition號(hào) |
| Timestamp | -2 | 時(shí)間戳 (-1最后一個(gè);-2第一個(gè)) |
Fetch API (Key: 1):
Fetch API用于從Topic中拉取數(shù)據(jù),它需要提供一個(gè)起始o(jì)ffset

image.png
| 域 | 值 | 描述 |
|---|---|---|
| Size | 40 | 消息長度 |
| Api Key | 1 | Api |
| Api Version | 0 | Api版本 |
| Correlation ID | 1 | 請(qǐng)求ID,服務(wù)器會(huì)將這個(gè)ID原樣返回 |
| Replica Id | -1 | follower的broker id,通常設(shè)置-1 |
| Max wait Time | 5000 | 請(qǐng)求超時(shí)時(shí)間毫秒 |
| Min bytes | 1 | 最小返回大小 |
| Max bytes | 2^30 | 最大返回大小 |
| Topic Len | 1 | Topic個(gè)數(shù) |
| Topic | w1 | Topic名 |
| Partition Len | 1 | Partiton個(gè)數(shù) |
| Partition | 0 | Partition號(hào) |
| Offset | 0 | 時(shí)間戳 起始o(jì)ffset |