Kafka 簡(jiǎn)述
本節(jié)僅會(huì)以?huà)呙な降哪J胶?jiǎn)述 kafka 的設(shè)計(jì), 不會(huì)深入分析細(xì)節(jié), 如果你已經(jīng)對(duì) kafka 有所了解, 建議跳至下一節(jié) Kafka Connect. 如果你想詳細(xì)了解 Kafka 的設(shè)計(jì), 推薦: Kafka設(shè)計(jì)解析(一) - Kafka背景及架構(gòu)介紹, 本文引用了其中圖片; 以及 Kafka 設(shè)計(jì)提案 (KIP).
上一節(jié)提到, Kafka 早期定位一個(gè)高吞吐的消息隊(duì)列, 而如今已經(jīng)發(fā)展成一個(gè)分布式的流處理平臺(tái). Kafka 最底層的設(shè)計(jì)十分簡(jiǎn)單可靠, 并在其基礎(chǔ)上擴(kuò)展了相當(dāng)多的特性. 這里我們主要使用 Kafka 作為數(shù)據(jù)傳輸?shù)墓艿? 重點(diǎn)關(guān)注 Kafka 的可靠性和吞吐量.
基本概念

簡(jiǎn)單說(shuō), Kafka 中分為 3 中角色, Producer, Broker, Consumer, 如果按照 C / S 模型來(lái)理解, Producer & Consumer 即 Client, Broker 即 Server. 早期的 Kafka 版本中, Consumer 還依賴(lài)于 Zookeeper, 新版的 Consumer 直接與 Broker 通信, 不再依賴(lài) Zookeeper.
Kafka 的數(shù)據(jù)模型基本沿用了 MQ 的通用概念, 將 Topic 作為數(shù)據(jù)傳輸隊(duì)列, 但加入了 Partition 的概念; 用 Record 表示單條消息, 每個(gè) Record 包含 Key 和 Value, 并且在 Header 中攜帶 Offset, Timestamp 等信息.
可靠性
作為數(shù)據(jù)存儲(chǔ)和數(shù)據(jù)傳輸系統(tǒng), 可靠性是至關(guān)重要的, Kafka 的可靠性主要通過(guò)多實(shí)例 & 多副本機(jī)制實(shí)現(xiàn).
首先, 多個(gè) Broker 采用多實(shí)例主從機(jī)制, 集群中的 Brokers 會(huì)向 Zookeeper 同一路徑搶注一個(gè)臨時(shí)節(jié)點(diǎn), 注冊(cè)成功的即為 Leader, 其余為 Follower.
其次, 對(duì)于某一 Topic, 可以配置 Replica 數(shù)目, 與 Broker 類(lèi)似, Topic 中的每個(gè) Partition 也會(huì)選出一個(gè) Leader, 實(shí)際的讀寫(xiě)都在 Leader 節(jié)點(diǎn)上進(jìn)行, Replica 僅僅負(fù)責(zé)同步, 若同步延遲低于預(yù)設(shè)值, 稱(chēng)為 ISR (In Sync Replica), 當(dāng) Leader 掛掉后, 即可從 ISR 中選出一個(gè)新的 Leader, 以保證消息的準(zhǔn)確性和服務(wù)的高可用性.
這里要說(shuō)明一點(diǎn), Kafka 的消息過(guò)期策略與該消息是否被消費(fèi)過(guò)無(wú)關(guān), 這點(diǎn)與常規(guī) MQ 的設(shè)計(jì)不同; 在這點(diǎn)上, Kafka 更像一個(gè)時(shí)序數(shù)據(jù)庫(kù), 可以配置基于時(shí)間和基于大小的過(guò)期策略, Kafka 會(huì)定時(shí)清理達(dá)到閾值的消息以釋放空間; 消息清理策略除了直接刪除之外, 還支持 compact 策略, compact 策略是指對(duì)于相同 key 的 message, 只保留最新的一份.
吞吐量
首先, 從 Producer 角度來(lái)看, Kafka 在磁盤(pán)中對(duì) message 的存儲(chǔ)采用 append 模式, 這種順序?qū)懭雽?duì)機(jī)械硬盤(pán)物理結(jié)構(gòu)十分友好, 能獲得極高的寫(xiě)入速度; 從 Consumer 角度來(lái)看, 順序的讀取對(duì)機(jī)械硬盤(pán)也極其友好; 額外的, 如果 message 沒(méi)有加密, Kafka 還支持零拷貝機(jī)制, 避免的內(nèi)核空間到用戶(hù)空間的數(shù)據(jù)拷貝;
其次, Kafka 將 Topic 做了水平切分, 將其切分為更細(xì)粒度的 Partition, 分散在多臺(tái) Broker 上以達(dá)到水平擴(kuò)張的能力, 但相應(yīng)的, 這種設(shè)計(jì)舍棄了 Topic 級(jí)別的消息嚴(yán)格順序性, 只能做到 Partition 級(jí)別的消息嚴(yán)格順序性.
Producer
對(duì)于 Producer 來(lái)說(shuō), 在往一個(gè) Topic 投遞消息的時(shí)候, 可以通過(guò)對(duì) message 進(jìn)行 hash(key) % n 來(lái)實(shí)現(xiàn)分區(qū)的負(fù)載均衡, 并且可以保證相同 key 的 message 被投遞到相同的 Partition, 以保證嚴(yán)格的順序性;
此外, Producer 還可以通過(guò)配置 ack 策略來(lái)實(shí)現(xiàn)可靠性和吞吐量的權(quán)衡:
- 0 Producer 向 partition leader 送出 message 即認(rèn)為投遞成功, 吞吐量最好, 可靠性最差;
- 1 Partition leader 收到 message 并在本地落盤(pán)成功, 向 producer 確認(rèn)消息投遞成功;
- -1 Partition leader 收到 message 并在本地落盤(pán)成功, 并且至少有
min.insync.replicas個(gè) replicas 也收到 message 并在本地落盤(pán)成功后, 才向 producer 確認(rèn)消息投遞成功, 可靠性最佳;
Consumer
Kafka 通過(guò) consumer group 概念來(lái)實(shí)現(xiàn) Consumer 邏輯上的分組和隔離; 不同 group id 的 consumer 是互不影響的, 這種設(shè)計(jì)使得可以非常簡(jiǎn)單的通過(guò)新建一個(gè) consumer group 來(lái)增加一組訂閱者, 而不會(huì)影響現(xiàn)有的消費(fèi)者.

在每個(gè) consumer group 內(nèi), 每個(gè) partition 至多只能被一個(gè) consumer 訂閱, 即 partition 與 consumer 是多對(duì)一的關(guān)系, 那就意味著, 如果 partition 數(shù)量 < consumer 數(shù)量, 將會(huì)有 consumer 閑置.
在確定 partition 與 consumer 的綁定關(guān)系時(shí), 即 assign 策略, kafka 內(nèi)置了 3 種算法, RangeAssignor, RoundRobinAssignor, StickyAssignor, 不過(guò)多詳述.
當(dāng) partition 或 consumer 數(shù)量發(fā)生變化時(shí), 目前版本的 kafka 使用了一種簡(jiǎn)單粗暴的 rebalance 方案, 即對(duì)于每個(gè) consumer group, 任何 partition 或 consumer 的變化都會(huì)觸發(fā)所有 consumer 的 stop, start, assign 過(guò)程, 這種策略對(duì)于滾動(dòng)發(fā)布或者網(wǎng)絡(luò)抖動(dòng)十分不友好. 現(xiàn)在有一個(gè)設(shè)計(jì)提案 KIP-429 正在對(duì)此問(wèn)題進(jìn)行改進(jìn). Kafka Connect 的調(diào)度也存在類(lèi)似的 rebalance 問(wèn)題, 下文會(huì)提到.

Kafka 使用 offset 來(lái)表示 consumer 的消費(fèi)進(jìn)度, 在一個(gè) consumer group 內(nèi), 很顯然每個(gè) partition 的 offset 是各自獨(dú)立的, 用戶(hù)對(duì) offset 有完全的掌控權(quán), 可以選擇使用 high level consumer API, 將 offset 托管給 broker 維護(hù) (早期版本是放在 zookeeper 中, 新版本是存儲(chǔ)在 __consumer_offset topic 中), 也可以選擇手動(dòng)維護(hù) offset, 將其存儲(chǔ)在外部存儲(chǔ) (如 redis) 中, 每次 poll 時(shí)顯式指定 offset 參數(shù). 此外, 自動(dòng)模式下, 還可以選擇 auto commit 或 manual commit, 靈活的組合可以實(shí)現(xiàn)至少一次, 至多一次, 嚴(yán)格一次 (需配合其他特性, 下文會(huì)提到) 消費(fèi)語(yǔ)義.
基于 partition 與 consumer 多對(duì)一關(guān)系的設(shè)計(jì), 使得 consumer commit 機(jī)制在實(shí)現(xiàn)上比較容易, 不做過(guò)多詳述.
Kafka Connect
上一節(jié)簡(jiǎn)述了 kafka 最基本的設(shè)計(jì) (對(duì)應(yīng)下圖中的 core), 基于這樣一個(gè)簡(jiǎn)單可靠的底層系統(tǒng), conflent 公司以及社區(qū)構(gòu)建了大量?jī)?yōu)秀組件, 營(yíng)造了一個(gè)繁榮的生態(tài).
下圖中是 Confluent 公司以 Kafka 為核心的發(fā)行版, 本數(shù)據(jù)同步系統(tǒng)即使用了圖中 Connect API 和 Connectors 組件.

簡(jiǎn)單的說(shuō), Kafka Connect 基于 kafka client 的 Producer API 和 Consumer API 進(jìn)行封裝, 抽象出了 Source Plugin 和 Sink Plugin 編程接口, 并保證了 Exactly Once At lease Once 傳輸語(yǔ)義, 大大簡(jiǎn)化了數(shù)據(jù)傳輸系統(tǒng)的開(kāi)發(fā)成本. Source Plugin 和 Sink Plugin 在運(yùn)行時(shí)會(huì)被實(shí)例化成 Connector 和 Task 并由 Kafka Connect 集群調(diào)度.
注: Kafka Connect 支持單機(jī)模式和集群模式, 并且 debezium 項(xiàng)目中還有一個(gè)嵌入式版本的實(shí)現(xiàn), 本文只討論集群模式.

Exactly Once
這是一個(gè)非常嚴(yán)謹(jǐn)?shù)脑?huà)題, 本節(jié)依然只是簡(jiǎn)述設(shè)計(jì), 如果你想深入了解設(shè)計(jì)和實(shí)現(xiàn)細(xì)節(jié), 推薦查閱 Kafka 設(shè)計(jì)提案: Kafka Exactly Once Semantics, 以及這篇博客文章: Kafka設(shè)計(jì)解析(八) - Exactly Once語(yǔ)義與事務(wù)機(jī)制原理.
首先解釋一下 Exactly Once 語(yǔ)義, 是指一組對(duì) Kafka Partitions 的讀寫(xiě)操作, 要么全部成功, 要么全部失敗, 并且每個(gè)操作嚴(yán)格只執(zhí)行一次. 也就是說(shuō), 如果操作涉及了第三方系統(tǒng), 比如從 Kafka 消費(fèi)數(shù)據(jù)落入 MySQL, 這已經(jīng)超出了 Kafka Exactly Once 語(yǔ)義涉及的范疇, 需要額外的工作 (比如 Kafka 至少一次消費(fèi)語(yǔ)義 + MySQL 冪等寫(xiě)入) 來(lái)保證系統(tǒng)整體的正確性.
明確了 Exactly Once 語(yǔ)義, 可以將其按照如下三個(gè)場(chǎng)景分別討論:

- 單個(gè) Producer 的寫(xiě)入操作
// todo - 單個(gè) Consumer 的消費(fèi)操作
// todo - 多個(gè)操作, 可能既有 Consumer 的消費(fèi)又有 Producer 的寫(xiě)入操作
/// todo
Source Plugin & Sink Plugin
基礎(chǔ)聊完了, 現(xiàn)在我們來(lái)看一看 Kafka Connect 到底給我們抽象出來(lái)了什么樣的編程接口. 這里我只列舉出了個(gè)別的 API, 省略了很多生命周期相關(guān)的 API, 如果你想詳細(xì)了解, 可以參考 kafka 源碼, 其中還包括一個(gè)簡(jiǎn)易的 FileConnector 的實(shí)現(xiàn).
首先, Kafka Connect 抽象出來(lái)了兩種調(diào)度對(duì)象: Connector 和 Task. 從 API 定義上很容易看出, 由 Connector 負(fù)責(zé)初始化 Task, 一個(gè) Connector 可以生成多個(gè)相同類(lèi)型的 Task.
public abstract class Connector implements Versioned {
...
public abstract Class<? extends Task> taskClass();
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
...
}
public abstract class SourceConnector extends Connector {
...
}
public abstract class SinkConnector extends Connector {
...
}
顯而易見(jiàn), Task 接口就是一個(gè)生產(chǎn)者消費(fèi)者接口, 不過(guò)這里的 API 名字要以 Kafka Connector 集群的視角看待, 從 Source 端 poll Record 送入 Kafka, 從 Kafka 讀取 Record put 到 Sink 端 . 當(dāng)然, 為了效率, 這里的接口是批傳輸?shù)?
public interface Task {
...
void start(Map<String, String> props);
...
}
public abstract class SourceTask implements Task {
...
public abstract List<SourceRecord> poll() throws InterruptedException;
...
}
public abstract class SinkTask implements Task {
...
public abstract void put(Collection<SinkRecord> records);
...
}
再來(lái)關(guān)注一下 Record 攜帶了哪些信息. 這里說(shuō)明一下, keySchema 和 valueSchema 是可選的 , key 和 value 的具體序列化策略也可以自定義實(shí)現(xiàn), 常用的有支持 Jackson 和 Avro 方式.
從我的使用經(jīng)驗(yàn)看, 如果是在系統(tǒng)驗(yàn)證環(huán)節(jié), 或者業(yè)務(wù)上吞吐量不是瓶頸時(shí), 建議使用 Jackson 序列化模式, 并開(kāi)啟 schema, 這會(huì)大大簡(jiǎn)化問(wèn)題的排查難度. 若比較側(cè)重于吞吐量, 可以選擇使用 Avro, 并將 schema 注冊(cè)到 Schema Registry (你可以在上文的 Confluent Platform 圖中找到該組件). 若追求更極致的性能, 可以自定義實(shí)現(xiàn)序列化器, 比如使用 protobuf 等.
timestamp 標(biāo)識(shí)了該條消息的時(shí)間戳, 可以是消息本身產(chǎn)生的時(shí)間, 也可以是消息投遞到 kafka 的時(shí)間. 這里推薦使用消息投遞到 kafka 的時(shí)間, 將消息本身產(chǎn)生的時(shí)間作為消息中的一個(gè)字段存儲(chǔ), 從而更完整的標(biāo)識(shí)消息在各系統(tǒng)中的傳輸延遲.
public abstract class ConnectRecord<R extends ConnectRecord<R>> {
private final String topic;
private final Integer kafkaPartition;
private final Schema keySchema;
private final Object key;
private final Schema valueSchema;
private final Object value;
private final Long timestamp;
private final Headers headers;
...
}
public class SourceRecord extends ConnectRecord<SourceRecord> {
private final Map<String, ?> sourcePartition;
private final Map<String, ?> sourceOffset;
...
}
public class SinkRecord extends ConnectRecord<SinkRecord> {
private final long kafkaOffset;
private final TimestampType timestampType;
...
}
下面考慮一個(gè)問(wèn)題, 如果 Task 重啟了, 怎么恢復(fù)之前的狀態(tài)?
對(duì)于 Source Task 來(lái)說(shuō), 需要額外的狀態(tài)存儲(chǔ), 即 SourceRecord 中的 sourcePartition 和 sourceOffset 字段, 注意, 這兩個(gè)字段與 kafka 的 partition 和 offset 觀念不是一碼事兒, 只是取了相似的名字, 由 Source Connector 的實(shí)現(xiàn)者自行決定存儲(chǔ)邏輯, Kafka Connect 會(huì)確保持久化的記錄 Source Task 的狀態(tài)信息, 以確保在重啟后 Source Task 能夠恢復(fù)狀態(tài).
對(duì)于 Sink Task 來(lái)說(shuō), 只要使用由 broker 維護(hù)的 consumer 的 offset 信息就可以了.
除此之外, Kafka Connect 還要存儲(chǔ)每個(gè) Connector 的配置信息, 以及狀態(tài)信息 (比如 Task 失敗后的異常堆棧等), 這些信息存在哪里? 自然是存在 Kafka 本身的 Topic 最可靠, 所以 Kafka Connect 會(huì)要求指定三個(gè) Topic connect-configs, connect-offsets, connect-status 分別保存這些信息.
可以看到, Kafka Connect 通過(guò)封裝向我們提供了一個(gè)極其簡(jiǎn)易的編程接口, 使我們無(wú)須過(guò)多的關(guān)注消息的丟失和重試機(jī)制, Kafka Connect 會(huì)確保 List<SourceRecord> poll() 生產(chǎn)的數(shù)據(jù)被嚴(yán)格一次的送往 kafka, 也會(huì)保證嚴(yán)格一次的通過(guò) void put(Collection<SinkRecord> records) 向消費(fèi)者投遞. 也即實(shí)現(xiàn)了 Kafka 層面的 exactly once 傳輸語(yǔ)義.
當(dāng)然, 如前所述, 兩側(cè)的系統(tǒng)屬于外部系統(tǒng), 邏輯的正確性要靠開(kāi)發(fā)者自行設(shè)計(jì)保證. 如果兩個(gè)接口拋出異常, 則會(huì)導(dǎo)致 Task 失敗, 需要排查原因后手動(dòng)重啟 Task.
Rebalance
前面在介紹 Consumer Group 時(shí)已經(jīng)提到了 rebalance, Kafka Connect Cluster 也有類(lèi)似的機(jī)制. 每當(dāng)往 cluster 提交一個(gè)新的 Connector / Task, 或者更新一個(gè) Connector / Task, 刪除一個(gè) Connector / Task, 都會(huì)觸發(fā) Cluster 的 rebalance, 進(jìn)而導(dǎo)致所有 Connector / Task 的重新分配和重啟, 如果 Task 的狀態(tài)恢復(fù)代價(jià)比較大 (其實(shí) debezium 恢復(fù)代價(jià)就很大, 后續(xù)的篇章中會(huì)提到), 那么在滾動(dòng)發(fā)布, 任務(wù)提交, 節(jié)點(diǎn)擴(kuò)張或收縮時(shí), 是極不友好的.
幸運(yùn)的是, Kafka 2.3 已經(jīng)實(shí)現(xiàn)了一個(gè)漸進(jìn)式的 Rebalance 機(jī)制, 用于解決這些問(wèn)題, 可參考 KIP-415.
這里順便提一下, 既然提到 Kafka Connect 集群模式, 以及 Connector / Task 的分配, 必然涉及到 worker 節(jié)點(diǎn)的協(xié)調(diào)機(jī)制. Kafka Connect 已經(jīng)將所有信息持久化到 Kakfa Topic 中, worker 節(jié)點(diǎn)本身已經(jīng)無(wú)狀態(tài)了, 然后 Kafka Connect 會(huì)從所有節(jié)點(diǎn)中選出一個(gè) Leader, 負(fù)責(zé) Connector / Task 的分配, 這個(gè)選舉協(xié)議是由 Kafka Connect 直接實(shí)現(xiàn)的, 并未引入類(lèi)似 Zookeeper 這種專(zhuān)門(mén)的協(xié)調(diào)系統(tǒng).
Restful 管理接口
Restful 接口似乎已經(jīng)成為了一種時(shí)尚, 目前只要提到 http 接口或者 web 接口, 幾乎都會(huì)想到 restful. 但是, 就我的使用體驗(yàn)而言, restful 強(qiáng)調(diào)對(duì)資源的語(yǔ)義化操作, 強(qiáng)行照搬 restful 的規(guī)范來(lái)設(shè)計(jì)業(yè)務(wù)系統(tǒng)的接口總感覺(jué)有一種被 ??光的感覺(jué), 總擔(dān)心會(huì)暴露出內(nèi)部的敏感信息; 而對(duì)于中間件或者說(shuō)這種基礎(chǔ)技術(shù)設(shè)施的管理接口而言, restful 則顯得十分自然.
GET /connectors
POST /connectors
GET /connectors/{name}
GET /connectors/{name}/config
PUT /connectors/{name}/config
PUT /connectors/{name}/pause
POST /connectors/{name}/restart
POST /connectors/{name}/tasks/{taskId}/restart
DELETE /connectors/{name}
Kafka Connect 主要的管理接口即對(duì) Connector / Task 的增刪改查, 這里我截取了部分 restful 接口, 即便沒(méi)有說(shuō)明, 語(yǔ)義也已經(jīng)十分明確了. 完整的接口列表請(qǐng)查閱 官方文檔.
OK, 能看到這里, 相信你大概已經(jīng)知道 Kafka 和 Kafka Connect 是什么東西了, 然而很遺憾, 我們使用的 CDH 發(fā)行版并不支持 Kafka Connect, 并且我們需要對(duì)開(kāi)源版的 Kafka Connect 做一些功能增強(qiáng)和 bug 修復(fù), 下一篇就是實(shí)操環(huán)節(jié)了, 將展示如何獨(dú)立部署 Kafka Connect Cluster.