前提
首先你需要了解MQ / Kafka相關(guān)的知識
本文目標(biāo)
了解 Kafka Connect 基本概念與功能
什么是Kafka Connect
Kafka Connect 是一款可擴(kuò)展并且可靠地在 Apache Kafka 和其他系統(tǒng)之間進(jìn)行數(shù)據(jù)傳輸?shù)墓ぞ摺?可以很簡單的定義 connectors(連接器) 將大量數(shù)據(jù)遷入、遷出Kafka。
例如我現(xiàn)在想要把數(shù)據(jù)從MySQL遷移到ElasticSearch,為了保證高效和數(shù)據(jù)不會丟失,我們選擇MQ作為中間件保存數(shù)據(jù)。這時候我們需要一個生產(chǎn)者線程,不斷的從MySQL中讀取數(shù)據(jù)并發(fā)送到MQ,還需要一個消費(fèi)者線程消費(fèi)MQ的數(shù)據(jù)寫到ElasticSearch,這件事情似乎很簡單,不需要任何框架。
但是如果我們想要保證生產(chǎn)者和消費(fèi)者服務(wù)的高可用性,例如重啟后生產(chǎn)者恢復(fù)到之前讀取的位置,分布式部署并且節(jié)點宕機(jī)后將任務(wù)轉(zhuǎn)移到其他節(jié)點。如果要加上這些的話,這件事就變得復(fù)雜起來了,而Kafka Connect 已經(jīng)為我們造好這些輪子。
Kafka Connect 如何工作?

Kafka Connect 特性如下:
- Kafka 連接器的通用框架:Kafka Connect 標(biāo)準(zhǔn)化了其他數(shù)據(jù)系統(tǒng)與Kafka的集成,從而簡化了連接器的開發(fā),部署和管理
- 支持分布式模式和單機(jī)模式部署
- Rest API:通過簡單的Rest API管理連接器
- 偏移量管理:針對Source和Sink都有相應(yīng)的偏移量(Offset)管理方案,程序員無須關(guān)心Offset 的提交
- 分布式模式可擴(kuò)展的,支持故障轉(zhuǎn)移
Kafka Connect Concepts
這里簡單介紹下Kafka Connect 的概念與組成
更多細(xì)節(jié)請參考 ?? https://docs.confluent.io/platform/current/connect/concepts.html
Connectors
連接器,分為兩種 Source(從源數(shù)據(jù)庫拉取數(shù)據(jù)寫入Kafka),Sink(從Kafka消費(fèi)數(shù)據(jù)寫入目標(biāo)數(shù)據(jù))
連接器其實并不參與實際的數(shù)據(jù)copy,連接器負(fù)責(zé)管理Task。連接器中定義了對應(yīng)Task的類型,對外提供配置選項(用戶創(chuàng)建連接器時需要提供對應(yīng)的配置信息)。并且連接器還可以決定啟動多少個Task線程。
用戶可以通過Rest API 啟停連接器,查看連接器狀態(tài)
Confluent 已經(jīng)提供了許多成熟的連接器,傳送門?? https://www.confluent.io/product/connectors/
Task
實際進(jìn)行數(shù)據(jù)傳輸?shù)膯卧?,和連接器一樣同樣分為 Source和Sink
Task的配置和狀態(tài)存儲在Kafka的Topic中,config.storage.topic和status.storage.topic。我們可以隨時啟動,停止任務(wù),以提供彈性、可擴(kuò)展的數(shù)據(jù)管道
Worker
剛剛我們講的Connectors 和Task 屬于邏輯單元,而Worker 是實際運(yùn)行邏輯單元的進(jìn)程,Worker 分為兩種模式,單機(jī)模式和分布式模式
單機(jī)模式:比較簡單,但是功能也受限,只有一些特殊的場景會使用到,例如收集主機(jī)的日志,通常來說更多的是使用分布式模式
分布式模式:為Kafka Connect提供了可擴(kuò)展和故障轉(zhuǎn)移。相同group.id的Worker,會自動組成集群。當(dāng)新增Worker,或者有Worker掛掉時,集群會自動協(xié)調(diào)分配所有的Connector 和 Task(這個過程稱為Rebalance)

當(dāng)使用Worker集群時,創(chuàng)建連接器,或者連接器Task數(shù)量變動時,都會觸發(fā)Rebalance 以保證集群各個Worker節(jié)點負(fù)載均衡。但是當(dāng)Task 進(jìn)入Fail狀態(tài)的時候并不會觸發(fā) Rebalance,只能通過Rest Api 對Task進(jìn)行重啟
Converters
Kafka Connect 通過 Converter 將數(shù)據(jù)在Kafka(字節(jié)數(shù)組)與Task(Object)之間進(jìn)行轉(zhuǎn)換
默認(rèn)支持以下Converter
-
AvroConverter
io.confluent.connect.avro.AvroConverter: 需要使用 Schema Registry -
ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry -
JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry -
JsonConverter
org.apache.kafka.connect.json.JsonConverter(無需 Schema Registry): 轉(zhuǎn)換為json結(jié)構(gòu) -
StringConverter
org.apache.kafka.connect.storage.StringConverter: 簡單的字符串格式 -
ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter: 不做任何轉(zhuǎn)換
Converters 與 Connector 是解耦的,下圖展示了在Kafka Connect中,Converter 在何時進(jìn)行數(shù)據(jù)轉(zhuǎn)換

Transforms
連接器可以通過配置Transform 實現(xiàn)對單個消息(對應(yīng)代碼中的Record)的轉(zhuǎn)換和修改,可以配置多個Transform 組成一個鏈。例如讓所有消息的topic加一個前綴、sink無法消費(fèi)source 寫入的數(shù)據(jù)格式,這些場景都可以使用Transform 解決
Transform 如果配置在Source 則在Task之后執(zhí)行,如果配置在Sink 則在Task之前執(zhí)行
Dead Letter Queue
與其他MQ不同,Kafka 并沒有死信隊列這個功能。但是Kafka Connect提供了這一功能。
當(dāng)Sink Task遇到無法處理的消息,會根據(jù)errors.tolerance配置項決定如何處理,默認(rèn)情況下(errors.tolerance=none) Sink 遇到無法處理的記錄會直接拋出異常,Task進(jìn)入Fail 狀態(tài)。開發(fā)人員需要根據(jù)Worker的錯誤日志解決問題,然后重啟Task,才能繼續(xù)消費(fèi)數(shù)據(jù)
設(shè)置 errors.tolerance=all,Sink Task 會忽略所有的錯誤,繼續(xù)處理。Worker中不會有任何錯誤日志。可以通過配置errors.deadletterqueue.topic.name = <dead-letter-topic-name> 讓無法處理的消息路由到 Dead Letter Topic
快速上手
下面我來實戰(zhàn)一下,如何使用Kafka Connect,我們先定一個小目標(biāo) 將MySQL中的全量數(shù)據(jù)同步到Redis
- 新建文件 docker-compose.yaml
version: '3.7'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zk
ports:
- 2182:2181
kafka:
image: wurstmeister/kafka:2.13-2.7.0
container_name: kafka
ports:
- 9092:9092
environment:
KAFKA_BROKER_ID: 0
# 宿主機(jī)ip
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.3.21:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
depends_on:
- zookeeper
在終端上執(zhí)行 docker-compose -f docker-compose.yaml up -d 啟動docker容器
準(zhǔn)備連接器,這里我是自己寫了一個簡單的連接器??。下載地址:https://github.com/TavenYin/kafka-connect-example/blob/master/release/kafka-connector-example-bin.jar
# 將連接器上傳到kafka 容器中
docker cp kafka-connector-example-bin.jar kafka:/opt/connectors
- 修改配置并啟動Worker
#在配置文件末尾追加 plugin.path=/opt/connectors
vi /opt/kafka/config/connect-distributed.properties
# 啟動Worker
bin/connect-distributed.sh -daemon config/connect-distributed.properties
- 準(zhǔn)備MySQL
由于我宿主機(jī)里已經(jīng)安裝了MySQL,我就直接使用了,使用如下Sql創(chuàng)建表。創(chuàng)建之后隨便造幾條數(shù)據(jù)
CREATE TABLE `test_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ;
- 創(chuàng)建連接器
新建 source.json
{
"name" : "example-source",
"config" : {
"connector.class" : "com.github.taven.source.ExampleSourceConnector",
"tasks.max" : "1",
"database.url" : "jdbc:mysql://192.168.3.21:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true",
"database.username" : "root",
"database.password" : "root",
"database.tables" : "test_user"
}
}
向Worker 發(fā)送請求,創(chuàng)建連接器
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
source.json中,有一些屬性是Kafka Connect 提供的,例如上述文件中name,connector.class,tasks.max,剩下的屬性可以在開發(fā)Connector 時自定義。關(guān)于Kafka Connect Configuration 相關(guān)請閱讀這里 ?? https://docs.confluent.io/platform/current/installation/configuration/connect/index.html
- 確認(rèn)數(shù)據(jù)是否寫入Kafka
首先查看一下Worker中的運(yùn)行狀態(tài),如果Task的state = RUNNING,代表Task沒有拋出任何異常,平穩(wěn)運(yùn)行
bash-4.4# curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},
"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"source"}
查看kafka 中Topic 是否創(chuàng)建
bash-4.4# bin/kafka-topics.sh --list --zookeeper zookeeper:2181
__consumer_offsets
connect-configs
connect-offsets
connect-status
test_user
這些Topic 都存儲了什么?
- __consumer_offsets: 記錄所有Kafka Consumer Group的Offset
- connect-configs: 存儲連接器的配置,對應(yīng)Connect 配置文件中
config.storage.topic - connect-offsets: 存儲Source 的Offset,對應(yīng)Connect 配置文件中
offset.storage.topic - connect-status: 連接器與Task的狀態(tài),對應(yīng)Connect 配置文件中
status.storage.topic
查看topic中數(shù)據(jù),此時說明MySQL數(shù)據(jù)已經(jīng)成功寫入Kafka
bash-4.4# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":1,"name":"yyyyyy"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":2,"name":"qqqq"}}
{"schema":{"type":"struct","fields":[{"type":"int64","optional":false,"field":"id"},{"type":"string","optional":false,"field":"name"}],"optional":false,"name":"test_user"},"payload":{"id":3,"name":"eeee"}}
數(shù)據(jù)結(jié)構(gòu)為Json,可以回顧一下上面我們修改的connect-distributed.properties,默認(rèn)提供的Converter 為JsonConverter,所有的數(shù)據(jù)包含schema 和 payload 兩項是因為配置文件中默認(rèn)啟動了key.converter.schemas.enable=true和value.converter.schemas.enable=true兩個選項
- 啟動 Sink
新建sink.json
{
"name" : "example-sink",
"config" : {
"connector.class" : "com.github.taven.sink.ExampleSinkConnector",
"topics" : "test_user, test_order",
"tasks.max" : "1",
"redis.host" : "192.168.3.21",
"redis.port" : "6379",
"redis.password" : "",
"redis.database" : "0"
}
}
創(chuàng)建Sink Connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @sink.json
然后查看Sink Connector Status,這里我發(fā)現(xiàn)由于我的Redis端口只對localhost開發(fā),所以這里我的Task Fail了,修改了Redis配置之后,重啟Task curl -X POST localhost:8083/connectors/example-sink/tasks/0/restart
在確認(rèn)了Sink Status 為RUNNING 后,可以確認(rèn)下Redis中是否有數(shù)據(jù)
關(guān)于Kafka Connect Rest api 文檔,請參考??https://docs.confluent.io/platform/current/connect/references/restapi.html
- 如何查看Sink Offset消費(fèi)情況
使用命令
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink
下圖代表 test_user topic 三條數(shù)據(jù)已經(jīng)全部消費(fèi)

Kafka Connect 高級功能
我們的小目標(biāo)已經(jīng)達(dá)成了?,F(xiàn)在兩個Task無事可做,正好借此機(jī)會我們來體驗一下可擴(kuò)展和故障轉(zhuǎn)移
集群擴(kuò)展
我啟動了開發(fā)環(huán)境中的Kafka Connect Worker,根據(jù)官方文檔所示通過注冊同一個Kafka 并且使用相同的 group.id=connect-cluster 可以自動組成集群
啟動我開發(fā)環(huán)境中的Kafka Connect,之后檢查兩個連接器狀態(tài)
bash-4.4# curl -X GET localhost:8083/connectors/example-source/status
{"name":"example-source","connector":{"state":"RUNNING","worker_id":"172.23.176.1:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.23.176.1:8083"}],"type":"source"}bash-4.4#
bash-4.4# curl -X GET localhost:8083/connectors/example-sink/status
{"name":"example-sink","connector":{"state":"RUNNING","worker_id":"172.21.0.3:8083"},"tasks":[{"id":0,"state":"RUNNING","worker_id":"172.21.0.3:8083"}],"type":"sink"}
觀察worker_id 可以發(fā)現(xiàn),兩個Connectors 已經(jīng)分別運(yùn)行在兩個Worker上了
故障轉(zhuǎn)移
此時我們通過kill pid結(jié)束docker中的Worker進(jìn)程觀察是否宕機(jī)之后自動轉(zhuǎn)移,但是發(fā)現(xiàn)Task并沒有轉(zhuǎn)移到僅存的Worker中,Task 狀態(tài)變?yōu)閁NASSIGNED,這是為啥呢?難道是有什么操作錯了?
在網(wǎng)上查閱了一番得知,Kafka Connect 的集群擴(kuò)展與故障轉(zhuǎn)移機(jī)制是通過Kafka Rebalance 協(xié)議實現(xiàn)的(Consumer也是該協(xié)議),當(dāng)Worker節(jié)點宕機(jī)時間超過 scheduled.rebalance.max.delay.ms 時,Kafka才會將其踢出集群。踢出后將該節(jié)點的連接器和任務(wù)分配給其他Worker,scheduled.rebalance.max.delay.ms默認(rèn)值為五分鐘。
后來經(jīng)測試發(fā)現(xiàn),五分鐘之后查看連接器信息,已經(jīng)轉(zhuǎn)移到存活的Worker節(jié)點了
本來還計劃寫一下如何開發(fā)連接器和Kafka Rebalance,但是這篇已經(jīng)夠長了,所以計劃后續(xù)更新這兩篇文章