Kafka Connect用于在Kafka與其他系統(tǒng)間數(shù)據(jù)傳輸?shù)墓ぞ摺afka connect可以獲取整個(gè)數(shù)據(jù)庫(kù)或從所有應(yīng)用程序服務(wù)器收集指標(biāo)到Kafka;也可以從Kafka中topic數(shù)據(jù)傳輸至其他存儲(chǔ)或者查詢系統(tǒng)或者批處理系統(tǒng)進(jìn)行離線分析。
Kafka Connect功能
Kafka能用框架,提供統(tǒng)一的集成API
支持分布式模式(distributed)及單機(jī)模式(standalone)
REST接口,用于查看和管理Kafka connectors
自動(dòng)化offset管理,開發(fā)人員不必?fù)?dān)心錯(cuò)誤處理的影響
分布式,可擴(kuò)展
流/批處理集成
Kafka connect兩個(gè)核心組成 Source和Sink。
Source:負(fù)責(zé)導(dǎo)入數(shù)據(jù)到Kafka;
Sink :負(fù)責(zé)從Kafka導(dǎo)出數(shù)據(jù);
(如上二者都稱為 connector)

Kafka connect的幾個(gè)重要的概念包括:connectors、tasks、workers和converters。
Connectors-通過管理任務(wù)來細(xì)條數(shù)據(jù)流的高級(jí)抽象;
Tasks- 數(shù)據(jù)寫入kafka和數(shù)據(jù)從kafka讀出的實(shí)現(xiàn);
Workers-運(yùn)行connectors和tasks的進(jìn)程;
Converters- kafka connect和其他存儲(chǔ)系統(tǒng)直接發(fā)送或者接受數(shù)據(jù)之間轉(zhuǎn)換數(shù)據(jù);
distribute模式啟動(dòng):
需要先建三個(gè)broker
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic connect-configs
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 50 --topic connect-offsets
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic connect-status
啟動(dòng)
bin/connect-distributed.sh config/connect-distributed.properties
通過rest api管理connector
因?yàn)閗afka connect的意圖是以服務(wù)的方式去運(yùn)行,所以它提供了REST API去管理connectors,默認(rèn)的端口是8083,你也可以在啟動(dòng)kafka connect之前在配置文件中添加rest.port配置。
GET /connectors – 返回所有正在運(yùn)行的connector名
POST /connectors – 新建一個(gè)connector; 請(qǐng)求體必須是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。
GET /connectors/{name} – 獲取指定connetor的信息
GET /connectors/{name}/config – 獲取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 獲取指定connector的狀態(tài),包括它是否在運(yùn)行、停止、或者失敗,如果發(fā)生錯(cuò)誤,還會(huì)列出錯(cuò)誤的具體信息。
GET /connectors/{name}/tasks – 獲取指定connector正在運(yùn)行的task。
GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態(tài)信息
PUT /connectors/{name}/pause – 暫停connector和它的task,停止數(shù)據(jù)處理知道它被恢復(fù)。
PUT /connectors/{name}/resume – 恢復(fù)一個(gè)被暫停的connector
POST /connectors/{name}/restart – 重啟一個(gè)connector,尤其是在一個(gè)connector運(yùn)行失敗的情況下比較常用
POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個(gè)task,一般是因?yàn)樗\(yùn)行失敗才這樣做。
DELETE /connectors/{name} – 刪除一個(gè)connector,停止它的所有task并刪除配置。
#standalone模式啟動(dòng)
bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties