Kafka--04Kafka connect

Kafka Connect用于在Kafka與其他系統(tǒng)間數(shù)據(jù)傳輸?shù)墓ぞ摺afka connect可以獲取整個(gè)數(shù)據(jù)庫(kù)或從所有應(yīng)用程序服務(wù)器收集指標(biāo)到Kafka;也可以從Kafka中topic數(shù)據(jù)傳輸至其他存儲(chǔ)或者查詢系統(tǒng)或者批處理系統(tǒng)進(jìn)行離線分析。

Kafka Connect功能
Kafka能用框架,提供統(tǒng)一的集成API
支持分布式模式(distributed)及單機(jī)模式(standalone)
REST接口,用于查看和管理Kafka connectors
自動(dòng)化offset管理,開發(fā)人員不必?fù)?dān)心錯(cuò)誤處理的影響
分布式,可擴(kuò)展
流/批處理集成

Kafka connect兩個(gè)核心組成 Source和Sink。
Source:負(fù)責(zé)導(dǎo)入數(shù)據(jù)到Kafka;
Sink :負(fù)責(zé)從Kafka導(dǎo)出數(shù)據(jù);
(如上二者都稱為 connector)


Kafka connect的幾個(gè)重要的概念包括:connectors、tasks、workers和converters。
Connectors-通過管理任務(wù)來細(xì)條數(shù)據(jù)流的高級(jí)抽象;
Tasks- 數(shù)據(jù)寫入kafka和數(shù)據(jù)從kafka讀出的實(shí)現(xiàn);
Workers-運(yùn)行connectors和tasks的進(jìn)程;
Converters- kafka connect和其他存儲(chǔ)系統(tǒng)直接發(fā)送或者接受數(shù)據(jù)之間轉(zhuǎn)換數(shù)據(jù);

distribute模式啟動(dòng):
需要先建三個(gè)broker
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic connect-configs
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 50 --topic connect-offsets
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 10 --topic connect-status
啟動(dòng)
bin/connect-distributed.sh config/connect-distributed.properties

通過rest api管理connector

因?yàn)閗afka connect的意圖是以服務(wù)的方式去運(yùn)行,所以它提供了REST API去管理connectors,默認(rèn)的端口是8083,你也可以在啟動(dòng)kafka connect之前在配置文件中添加rest.port配置。

GET /connectors – 返回所有正在運(yùn)行的connector名
POST /connectors – 新建一個(gè)connector; 請(qǐng)求體必須是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必須包含你的connector的配置信息。
GET /connectors/{name} – 獲取指定connetor的信息
GET /connectors/{name}/config – 獲取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 獲取指定connector的狀態(tài),包括它是否在運(yùn)行、停止、或者失敗,如果發(fā)生錯(cuò)誤,還會(huì)列出錯(cuò)誤的具體信息。
GET /connectors/{name}/tasks – 獲取指定connector正在運(yùn)行的task。
GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態(tài)信息
PUT /connectors/{name}/pause – 暫停connector和它的task,停止數(shù)據(jù)處理知道它被恢復(fù)。
PUT /connectors/{name}/resume – 恢復(fù)一個(gè)被暫停的connector
POST /connectors/{name}/restart – 重啟一個(gè)connector,尤其是在一個(gè)connector運(yùn)行失敗的情況下比較常用
POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個(gè)task,一般是因?yàn)樗\(yùn)行失敗才這樣做。
DELETE /connectors/{name} – 刪除一個(gè)connector,停止它的所有task并刪除配置。

#standalone模式啟動(dòng)
 bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,537評(píng)論 19 139
  • 版權(quán)聲明:本文為博主原創(chuàng)文章,未經(jīng)博主允許不得轉(zhuǎn)載 本文是基于hadoop 2.7.1,以及kafka 0.11....
    大數(shù)據(jù)_zzzzMing閱讀 4,731評(píng)論 5 5
  • 今晚老公在青島,下班后6點(diǎn)十幾分剛進(jìn)門,大寶說,媽媽,你可回來了,我們終于可以吃飯了,你不回來,奶奶不讓吃飯...
    軒萌媽閱讀 188評(píng)論 0 0
  • 個(gè)人主頁(yè) 主要由兩部分,左側(cè)是基本信息區(qū)域,右側(cè)是文章列表區(qū)域 基本信息區(qū)域 在基本信息包含以下內(nèi)容:修改和查看個(gè)...
    冰美式烏龍閱讀 308評(píng)論 0 1
  • (稻盛哲學(xué)學(xué)習(xí)會(huì))打卡第20天 姓名:柳琳 部門:開發(fā)部 組別:待定 【知~學(xué)習(xí)】 閱讀《活法》第二章節(jié) 【內(nèi)容感...
    木木_be35閱讀 142評(píng)論 0 0

友情鏈接更多精彩內(nèi)容