kafka-connect簡析

kafka-connect 雖然代碼處于kafka中且占了很大的份量,但實際是Mq的一個應(yīng)用了。kafka-connect是一個提供了從其他數(shù)據(jù)源到kafka的SourceConnector,以及從kafka到其他數(shù)據(jù)源的SinkConnector的rest服務(wù)。下面介紹單機版的基本情況:

一、啟動方法

使用命令行bin/connect-standalone.sh config/connect-distributed.properties config/connect-file-source.properties 啟動服務(wù),配置參數(shù)中有多個文件路徑,第一個是服務(wù)的配置,后面跟著n個connector的配置。

服務(wù)配置示例:

#服務(wù)端口
rest.port=8083 
#插件搜索路徑
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#kafka集群地址
bootstrap.servers=localhost:9092,localhost:9093

二、rest接口

kafka-connect提供了許多api操作connector,可以從confluent官網(wǎng)獲取詳細(xì)信息。下面羅列一部分:

1.GET /
獲取kafka_cluster_id和版本信息

{
version: "unknown",
commit: "unknown",
kafka_cluster_id: "_MOY-UPhRC-Y-CwV59mm-g"
}

2.GET /connector-plugins
獲取服務(wù)所有加載的connector。

[
{
class: "org.apache.kafka.connect.file.FileStreamSinkConnector",
type: "sink",
version: "unknown"
},
{
class: "org.apache.kafka.connect.file.FileStreamSourceConnector",
type: "source",
version: "unknown"
}
]

PUT /connector-plugins/{connecttype}/config/validate

4.GET /connectors
獲取所有的connector實例,實際上是返回對象對應(yīng)ClusterConfigState#connectorConfigskeyset.

[
    "local-file-source"
]

5.GET /connectors/local-file-source
獲取某一個connector實例的詳細(xì)信息


{
    "name": "local-file-source",
    "config": {
        "connector.class": "FileStreamSource",
        "file": "test2.txt",
        "tasks.max": "1",
        "name": "local-file-source",
        "topic": "t2t"
    },
    "tasks": [
        {
            "connector": "local-file-source",
            "task": 0
        }
    ],
    "type": "source"
}

6.GET /connectors/{connector}/config
獲取connector實例的配置

{
connector.class: "FileStreamSource",
file: "t7.txt",
tasks.max: "1",
name: "local-file-source",
topic: "t2t"
}

7.PUT /connectors/{connector}/config
設(shè)置connector實例的配置
請求:

{
    "connector.class": "FileStreamSource",
    "file": "t7.txt",
    "tasks.max": "1",
    "name": "local-file-source",
    "topic": "t2t"
}

回復(fù):

{
connector.class: "FileStreamSource",
file: "t7.txt",
tasks.max: "1",
name: "local-file-source",
topic: "t2t"
}

8.GET /connectors/{connector}/status
獲取connector實例的運行狀態(tài)

{
name: "local-file-source",
connector: {
state: "RUNNING",
worker_id: "172.16.152.106:8083"
},
tasks: [
{
id: 0,
state: "RUNNING",
worker_id: "172.16.152.106:8083"
}
],
type: "source"
}

3.代碼結(jié)構(gòu)圖

image.png

4.簡單代碼分析

1.通過KafkaAdminClient從kafka中獲取kafkaClusterId,

2.加載plugins,讀取到plugins文件夾的所有connector類(包括source和sink)。注意每個plugin均配置了一個classLoader,這樣能保證各個插件的獨立性。

3.啟動rest服務(wù)

RestServer服務(wù)內(nèi)置了一個對象jetty的Server對象,并設(shè)置了一系列的handler(用了典型的裝飾器模式,用于身份認(rèn)證,數(shù)據(jù)統(tǒng)計,業(yè)務(wù)處理等),當(dāng)接受到客戶端請求后,按照路徑路由到相應(yīng)的RootResource,ConnectorsResource,ConnectorPluginsResource處理.api中定義了一個Herder接口,單機模式下的實現(xiàn)是StandaloneHerder,分布式版本下是DistributedHerder。路由到各個處理器的請求最后幾乎都是通過Herder來完成具體的業(yè)務(wù)邏輯,herder配置了一個worker,用于管理所有的connector實例,worker內(nèi)部有一個無疆隊列,每當(dāng)有一個task產(chǎn)生時,啟動一個線程來跑。

4.應(yīng)用的輸入?yún)?shù)有n個connector的配置,每個配置將生成一個connector實例,調(diào)用了herder的putConnectorConfig方法(和客戶端調(diào)用接口PUT /connectors/{connector}/config 是一樣的效果)。

1.connector實例配置變更接口的內(nèi)部運行機制

每次調(diào)用PUT /connectors/{connector}/config會把原來的connector實例刪掉,用新的配置在重新生成一個(這么干確實省事)。同理,如果掛在實例下面的task配置發(fā)生了改變(task的配置為ConnectorConfig#originals),那么還會將原來的task對象全部刪掉重新生成。

2.如何控制task的啟停

在task任務(wù)線程中檢測一個標(biāo)記位即可,暫停的時候進入睡眠即可。

3.單機版和分布式版有什么區(qū)別

4.connector實例的配置和其下的task的配置有什么關(guān)系

5.假設(shè)task poll上來的數(shù)據(jù)發(fā)送失敗,如何處理

WorkerSourceTask#toSend用于存儲poll上來的消息,每次都是將單次poll上來的數(shù)據(jù)發(fā)送成功了才會再次調(diào)用connector 的poll方法。

6.是否會有消息丟失或者消息重復(fù)

首先需要知道kafka-connect是如何儲存進度的,當(dāng)準(zhǔn)備發(fā)送消息時,WorkerSourceTask#outstandingMessages將會存儲到這些消息(在成功推給kafka后清除),同時將消息connector傳遞上來的消息進度(key是個map,value也是個map)存儲起來。當(dāng)執(zhí)行到后臺定時任務(wù),SourceTaskOffsetCommitter#commitExecutorService執(zhí)行了一個后臺任務(wù)最后調(diào)用FileOffsetBackingStore#set將生產(chǎn)進度刷新到磁盤(單機版默認(rèn)是本地目錄offset.storage.file.filename指定目錄,結(jié)構(gòu)是個hashmap,connector可以從SourceTask#context#offsetReader()#offsets()方法獲取,每次全量刷新,重啟時也全量載入內(nèi)存,這樣保證了歷史記錄是全的)
,刷新時機是outstandingMessages為空,這樣就保證了刷盤的時候,消息是一定發(fā)送到kafka了。那么又有一個問題,如果數(shù)據(jù)源源不斷的產(chǎn)生,會不會就不刷新了?這里kafka-connect也做了處理,當(dāng)后臺任務(wù)處于等待刷盤的時候,將消息存到WorkerSourceTask#outstandingMessagesBacklog用作備份。所以connector在實現(xiàn)的時候,消息必須有序,否則會產(chǎn)生消息重復(fù)。至于丟失,是不會的。那還有沒有消息重復(fù)的場景呢?有的,如果producer發(fā)送成功了,但是刷到磁盤的時候宕機了,那么會有消息重復(fù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容