Kafka Connect基本概念介紹
Kafka Connect是一個(gè)用于將數(shù)據(jù)流輸入和輸出Kafka的框架。Confluent平臺(tái)附帶了幾個(gè)內(nèi)置connector,可以使用這些connector進(jìn)行關(guān)系數(shù)據(jù)庫或HDFS等常用系統(tǒng)到Kafka的數(shù)據(jù)傳輸,也是用來構(gòu)建ETL的一種方案。
Kafka Connect基本概念:
- Kafka Connect實(shí)際上是Kafka流式計(jì)算的一部分
- Kafka Connect主要用來與其他中間件建立流式通道
- Kafka Connect支持流式和批處理集成
Kafka Connect的架構(gòu)如下圖所示:

Kafka Connect關(guān)鍵詞:
- Connectors:通過管理task來協(xié)調(diào)數(shù)據(jù)流的高級(jí)抽象
- Tasks:如何將數(shù)據(jù)復(fù)制到Kafka或從Kafka復(fù)制數(shù)據(jù)的實(shí)現(xiàn)
- Workers:執(zhí)行Connector和Task的運(yùn)行進(jìn)程
- Converters: 用于在Connect和外部系統(tǒng)發(fā)送或接收數(shù)據(jù)之間轉(zhuǎn)換數(shù)據(jù)的代碼
- Transforms:更改由連接器生成或發(fā)送到連接器的每個(gè)消息的簡(jiǎn)單邏輯
Connectors
Kafka Connect中的connector定義了數(shù)據(jù)應(yīng)該從哪里復(fù)制到哪里。connector實(shí)例是一種邏輯作業(yè),負(fù)責(zé)管理Kafka與另一個(gè)系統(tǒng)之間的數(shù)據(jù)復(fù)制。
我們?cè)诖蠖鄶?shù)情況下都是使用一些平臺(tái)提供的現(xiàn)成的connector。但是,也可以從頭編寫一個(gè)新的connector插件。在高層次上,希望編寫新連接器插件的開發(fā)人員遵循以下工作流:

Task
Task是Connect數(shù)據(jù)模型中的主要處理數(shù)據(jù)的角色,也就是真正干活的。每個(gè)connector實(shí)例協(xié)調(diào)一組實(shí)際復(fù)制數(shù)據(jù)的task。通過允許connector將單個(gè)作業(yè)分解為多個(gè)task,Kafka Connect提供了內(nèi)置的對(duì)并行性和可伸縮數(shù)據(jù)復(fù)制的支持,只需很少的配置。
這些任務(wù)沒有存儲(chǔ)任何狀態(tài)。任務(wù)狀態(tài)存儲(chǔ)在Kafka中的特殊主題config.storage.topic和status.storage.topic中。因此,可以在任何時(shí)候啟動(dòng)、停止或重新啟動(dòng)任務(wù),以提供彈性的、可伸縮的數(shù)據(jù)管道。

Workers
Workers是負(fù)責(zé)管理和執(zhí)行connector和task的,Workers有兩種模式,Standalone(單機(jī))和Distributed(分布式)。
Standalone Workers:
Standalone模式是最簡(jiǎn)單的模式,用單一進(jìn)程負(fù)責(zé)執(zhí)行所有connector和task
Distributed Workers:
Distributed模式為Kafka Connect提供了可擴(kuò)展性和自動(dòng)容錯(cuò)能力。在分布式模式下,你可以使用相同的組啟動(dòng)許多worker進(jìn)程。它們自動(dòng)協(xié)調(diào)以跨所有可用的worker調(diào)度connector和task的執(zhí)行。
如果你添加一個(gè)worker、關(guān)閉一個(gè)worker或某個(gè)worker意外失敗,那么其余的worker將檢測(cè)到這一點(diǎn),并自動(dòng)協(xié)調(diào),在可用的worker集重新分發(fā)connector和task。
image.png
Task Rebalance
當(dāng)connector首次提交到集群時(shí),workers會(huì)重新平衡集群中的所有connector及其tasks,以便每個(gè)worker的工作量大致相同。當(dāng)connector增加或減少它們所需的task數(shù)量,或者更改connector的配置時(shí),也會(huì)使用相同的重新平衡過程。
當(dāng)一個(gè)worker失敗時(shí),task在活動(dòng)的worker之間重新平衡。當(dāng)一個(gè)task失敗時(shí),不會(huì)觸發(fā)再平衡,因?yàn)閠ask失敗被認(rèn)為是一個(gè)例外情況。因此,失敗的task不會(huì)被框架自動(dòng)重新啟動(dòng),應(yīng)該通過REST API重新啟動(dòng)。

Converters
在向Kafka寫入或從Kafka讀取數(shù)據(jù)時(shí),Converter是使Kafka Connect支持特定數(shù)據(jù)格式所必需的。task使用Converters將數(shù)據(jù)格式從字節(jié)轉(zhuǎn)換為連接內(nèi)部數(shù)據(jù)格式,反之亦然。并且Converter與Connector本身是解耦的,以便在Connector之間自然地重用Converter。
默認(rèn)提供以下Converters:
- AvroConverter(建議):與Schema Registry一起使用
- JsonConverter:適合結(jié)構(gòu)數(shù)據(jù)
- StringConverter:簡(jiǎn)單的字符串格式
- ByteArrayConverter:提供不進(jìn)行轉(zhuǎn)換的“傳遞”選項(xiàng)
AvroConverter處理數(shù)據(jù)的流程圖:

Transforms
Connector可以配置Transforms,以便對(duì)單個(gè)消息進(jìn)行簡(jiǎn)單且輕量的修改。這對(duì)于小數(shù)據(jù)的調(diào)整和事件路由十分方便,且可以在connector配置中將多個(gè)Transforms連接在一起。然而,應(yīng)用于多個(gè)消息的更復(fù)雜的Transforms最好使用KSQL和Kafka Stream來實(shí)現(xiàn)。
Transforms是一個(gè)簡(jiǎn)單的函數(shù),輸入一條記錄,并輸出一條修改過的記錄。Kafka Connect提供許多Transforms,它們都執(zhí)行簡(jiǎn)單但有用的修改??梢允褂米约旱倪壿嫸ㄖ茖?shí)現(xiàn)轉(zhuǎn)換接口,將它們打包為Kafka Connect插件,將它們與connector一起使用。
當(dāng)Transforms與Source Connector一起使用時(shí),Kafka Connect通過第一個(gè)Transforms傳遞connector生成的每條源記錄,第一個(gè)Transforms對(duì)其進(jìn)行修改并輸出一個(gè)新的源記錄。將更新后的源記錄傳遞到鏈中的下一個(gè)Transforms,該Transforms再生成一個(gè)新的修改后的源記錄。最后更新的源記錄會(huì)被轉(zhuǎn)換為二進(jìn)制格式寫入到Kafka。Transforms也可以與Sink Connector一起使用。
以下為Confluent平臺(tái)提供的Transforms:
Kakfa Connect環(huán)境準(zhǔn)備
前面已經(jīng)鋪墊了Kakfa Connect的基本概念,接下來用一個(gè)簡(jiǎn)單的例子演示一下Kakfa Connect的使用方式,以便對(duì)其作用有一個(gè)直觀的了解。
在演示Kakfa Connect的使用之前我們需要先做一些準(zhǔn)備,因?yàn)橐蕾囈恍╊~外的集成。例如在本文中使用MySQL作為數(shù)據(jù)源的輸入和輸出,所以首先得在MySQL中創(chuàng)建兩張表(作為Data Source和Data Sink)。建表SQL如下:
CREATE TABLE `users_input` (
`uid` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL,
`age` int(11) NOT NULL,
PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `users_output` (
`uid` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL,
`age` int(11) NOT NULL,
PRIMARY KEY (`uid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
接下來就是考慮怎么實(shí)現(xiàn)Kafka Connect了,前面有提到過Kafka Connect中的connector定義了數(shù)據(jù)應(yīng)該從哪里復(fù)制到哪里。connector實(shí)例是一種邏輯作業(yè),負(fù)責(zé)管理Kafka與另一個(gè)系統(tǒng)之間的數(shù)據(jù)復(fù)制。
因此,如果要自己實(shí)現(xiàn)一個(gè)Connect的話還是稍微有些復(fù)雜的,好在Confluent平臺(tái)有些現(xiàn)成的Connect。例如Confluent平臺(tái)就有JDBC的Connect,下載地址如下:
我們需要到Kafka Server上進(jìn)行相應(yīng)的配置才能使用該Connect,所以復(fù)制下載鏈接到服務(wù)器上使用wget命令進(jìn)行下載:
[root@txy-server2 ~]# cd /usr/local/src
[root@txy-server2 /usr/local/src]# wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/5.5.0/confluentinc-kafka-connect-jdbc-5.5.0.zip
除此之外,由于要連接MySQL,所以還得去maven倉庫上復(fù)制mysql-connector驅(qū)動(dòng)包的下載鏈接,然后使用同樣命令進(jìn)行下載:
[root@txy-server2 /usr/local/src]# wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
解壓下載好的Connect壓縮包,創(chuàng)建一個(gè)存放目錄,將解壓后的文件移到到該目錄下,并將MySQL驅(qū)動(dòng)包移動(dòng)到kafka-connect-jdbc的lib目錄下:
[root@txy-server2 /usr/local/src]# unzip confluentinc-kafka-connect-jdbc-5.5.0.zip
[root@txy-server2 /usr/local/src]# mkdir -p /opt/kafka/plugins
[root@txy-server2 /usr/local/src]# mv confluentinc-kafka-connect-jdbc-5.5.0 /opt/kafka/plugins/kafka-connect-jdbc
[root@txy-server2 /usr/local/src]# mv mysql-connector-java-8.0.20.jar /opt/kafka/plugins/kafka-connect-jdbc/lib/
Connect包準(zhǔn)備好后,編輯connect-distributed.properties配置文件,修改如下配置項(xiàng):
[root@txy-server2 ~]# vim /usr/local/kafka/config/connect-distributed.properties
# Broker Server的訪問ip和端口號(hào)
bootstrap.servers=172.21.0.10:9092
# 指定集群id
group.id=connect-cluster
# 指定rest服務(wù)的端口號(hào)
rest.port=8083
# 指定Connect插件包的存放路徑
plugin.path=/opt/kafka/plugins
由于rest服務(wù)監(jiān)聽了8083端口號(hào),如果你的服務(wù)器開啟了防火墻就需要使用以下命令開放8083端口,否則外部無法訪問:
[root@txy-server2 ~]# firewall-cmd --zone=public --add-port=8083/tcp --permanent
[root@txy-server2 ~]# firewall-cmd --reload
完成前面的步驟后,我們就可以啟動(dòng)Kafka Connect了。有兩種啟動(dòng)方式,分別是:前臺(tái)啟動(dòng)和后臺(tái)啟動(dòng),前者用于開發(fā)調(diào)試,后者則通常用于正式環(huán)境。具體命令如下:
# 前臺(tái)啟動(dòng)
[root@txy-server2 ~]# connect-distributed.sh /usr/local/kafka/config/connect-distributed.properties
# 后臺(tái)啟動(dòng)
[root@txy-server2 ~]# connect-distributed.sh -daemon /usr/local/kafka/config/connect-distributed.properties
啟動(dòng)成功后,使用瀏覽器訪問http://{ip}:8083/connector-plugins,正常情況下會(huì)返回這樣一段JSON數(shù)據(jù):

到此為止,我們就已經(jīng)完成Kafka Connect的環(huán)境準(zhǔn)備了,接下來演示一下Source Connector與Sink Connector如何與MySQL做集成。
Kafka Connect Source和MySQL集成
首先我們要知道rest服務(wù)提供了一些API去操作connector,如下表:

使用瀏覽器訪問http://{ip}:8083/connectors,可以查看所有的connector,此時(shí)返回的是一個(gè)空數(shù)組,說明沒有任何的connector:

此時(shí)我們可以使用POST方式請(qǐng)求/connectors接口來新增一個(gè)connector,這里以curl命令為例,調(diào)用示例如下:
curl -X POST -H 'Content-Type: application/json' -i 'http://{ip}:8083/connectors' \
--data \
'{"name":"test-upload-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://{ip}:3306/kafka_store?user=root&password=123456a.",
"table.whitelist":"users_input",
"incrementing.column.name": "uid",
"mode":"incrementing",
"topic.prefix": "test-mysql-"}}'
參數(shù)說明:
-
name:指定新增的connector的名稱 -
config:指定該connector的配置信息 -
connector.class:指定使用哪個(gè)Connector類 -
connection.url:指定MySQL的連接url -
table.whitelist:指定需要加載哪些數(shù)據(jù)表 -
incrementing.column.name:指定表中自增列的名稱 -
mode:指定connector的模式,這里為增量模式 -
topic.prefix:Kafka會(huì)創(chuàng)建一個(gè)Topic,該配置項(xiàng)就是用于指定Topic名稱的前綴,后綴為數(shù)據(jù)表的名稱。例如在本例中將生成的Topic名稱為:test-mysql-users_input
調(diào)用成功后,會(huì)返回如下響應(yīng)數(shù)據(jù):
HTTP/1.1 201 Created
Date: Mon, 25 May 2020 13:48:16 GMT
Location: http://{ip}:8083/connectors/test-upload-mysql
Content-Type: application/json
Content-Length: 368
Server: Jetty(9.4.24.v20191120)
{"name":"test-upload-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:mysql://{ip}:3306/kafka_store?user=root&password=123456a.","table.whitelist":"users_input","incrementing.column.name":"uid","mode":"incrementing","topic.prefix":"test-mysql-","name":"test-upload-mysql"},"tasks":[],"type":"source"}
然后刷新瀏覽器頁面,可以看到test-upload-mysql這個(gè)connector已經(jīng)能被列出來了:

新增connector完成后,我們嘗試往數(shù)據(jù)表里添加一些數(shù)據(jù),具體的sql如下:
insert into users_input(`name`, `age`) values('小明', 15);
insert into users_input(`name`, `age`) values('小白', 13);
insert into users_input(`name`, `age`) values('小李', 17);
接著使用kafka-console-consumer.sh腳本命令去拉取test-mysql-users_input中的數(shù)據(jù):
[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-mysql-users_input --from-beginning
拉取出來的數(shù)據(jù)是JSON結(jié)構(gòu)的,其中的payload就是數(shù)據(jù)表中的數(shù)據(jù),如下:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"uid"},{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"age"}],"optional":false,"name":"users_input"},"payload":{"uid":1,"name":"小明","age":15}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"uid"},{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"age"}],"optional":false,"name":"users_input"},"payload":{"uid":2,"name":"小白","age":13}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"uid"},{"type":"string","optional":false,"field":"name"},{"type":"int32","optional":false,"field":"age"}],"optional":false,"name":"users_input"},"payload":{"uid":3,"name":"小李","age":17}}
能拉取到這樣的數(shù)據(jù)就代表已經(jīng)成功將MySQL數(shù)據(jù)表中的數(shù)據(jù)傳輸?shù)終afka Connect Source里了,也就是完成輸入端的工作了。
Kafka Connect Sink和MySQL集成
現(xiàn)在我們已經(jīng)能夠通過Kafka Connect將MySQL中的數(shù)據(jù)寫入到Kafka中了,接下來就是完成輸出端的工作,將Kafka里的數(shù)據(jù)輸出到MySQL中。
首先,我們需要調(diào)用Rest API新增一個(gè)Sink類型的connector。具體請(qǐng)求如下:
curl -X POST -H 'Content-Type: application/json' -i 'http://{ip}:8083/connectors' \
--data \
'{"name":"test-download-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://{ip}:3306/kafka_store?user=root&password=123456a.",
"topics":"test-mysql-users_input",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"uid",
"table.name.format": "users_output"}}'
參數(shù)說明:
-
name:指定新增的connector的名稱 -
config:指定該connector的配置信息 -
connector.class:指定使用哪個(gè)Connector類 -
connection.url:指定MySQL的連接url -
topics:指定從哪個(gè)Topic中讀取數(shù)據(jù) -
auto.create:是否自動(dòng)創(chuàng)建數(shù)據(jù)表 -
insert.mode:指定寫入模式,upsert表示可以更新及寫入 -
pk.mode:指定主鍵模式,record_value表示從消息的value中獲取數(shù)據(jù) -
pk.fields:指定主鍵字段的名稱 -
table.name.format:指定將數(shù)據(jù)輸出到哪張數(shù)據(jù)表上
調(diào)用成功后,會(huì)返回如下響應(yīng)數(shù)據(jù):
HTTP/1.1 201 Created
Date: Mon, 25 May 2020 14:37:41 GMT
Location: http://49.232.153.84:8083/connectors/test-download-mysql
Content-Type: application/json
Content-Length: 409
Server: Jetty(9.4.24.v20191120)
{"name":"test-download-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://47.106.206.51:3306/kafka_store?user=root&password=Zero-One1.","topics":"test-mysql-users_input","auto.create":"false","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"uid","table.name.format":"users_output","name":"test-download-mysql"},"tasks":[],"type":"sink"}
刷新瀏覽器頁面,此時(shí)就有兩個(gè)connector了:

該Sink類型的connector創(chuàng)建完成后,就會(huì)讀取Kafka里對(duì)應(yīng)Topic的數(shù)據(jù),并輸出到指定的數(shù)據(jù)表中。如下:

小結(jié)
回顧一下本文中的示例,可以直觀的看到Kafka Connect實(shí)際上就做了兩件事情:使用Source Connector從數(shù)據(jù)源(MySQL)中讀取數(shù)據(jù)寫入到Kafka Topic中,然后再通過Sink Connector讀取Kafka Topic中的數(shù)據(jù)輸出到另一端(MySQL)。
雖然本例中的Source端和Sink端都是MySQL,但是不要被此局限了,因?yàn)镾ource端和Sink端可以是不一樣的,這也是Kafka Connect的作用所在。它就像一個(gè)倒賣數(shù)據(jù)的中間商,將Source端的數(shù)據(jù)讀取出來寫到自己的Topic,這就像進(jìn)貨一樣,然后再將數(shù)據(jù)輸出給Sink端。至此,就完成了一個(gè)端到端的數(shù)據(jù)同步,其實(shí)會(huì)發(fā)現(xiàn)與ETL過程十分類似,這也是為啥Kafka Connect可以作為實(shí)現(xiàn)ETL方案的原因。
