Kafka connect HDFS

? ? ? ?HDFS connector允許以各種格式將Kafka topic中的數(shù)據(jù)導(dǎo)出到HDFS文件中,并與Hive集成,使數(shù)據(jù)可以被HiveQL查詢。
? ? ? ?connector定期從Kafka輪詢數(shù)據(jù)并將其寫入HDFS。每個Kafka topic的數(shù)據(jù)由partitioner進行分區(qū)并劃分為塊。
? ? ? ?每個數(shù)據(jù)塊都表示為一個HDFS文件,其中包含topic、kafka partition,以及該數(shù)據(jù)塊的開始和結(jié)束偏移量。如果配置中沒有指定partitioner,則使用保留Kafka分區(qū)的默認partitioner。每個數(shù)據(jù)塊的大小由寫入HDFS的記錄數(shù)量、寫入時間以及schema兼容性決定。
? ? ? ?HDFS connector可以和Hive集成,當啟用時,connector自動為每個Kafka主題創(chuàng)建一個Hive外部分區(qū)表,并根據(jù)HDFS中可用數(shù)據(jù)更新表。

快速開始

1.從github下載kafka-connect-hdfs代碼:

git clone https://github.com/confluentinc/kafka-connect-hdfs.git

2.編譯

mvn clean install

3.進入target目錄,將jar包拷到kafka目錄的libs下

cp kafka-connect-hdfs-5.1.0.jar /opt/kafka_2.11-2.1.0/libs/

4.解決依賴問題,kafka-connect-hdfs-5.1.0.jar依賴別的jar包,這些jar包都在target目錄下的kafka-connect-hdfs-5.1.0-development/share/java/kafka-connect-hdfs里,將依賴到的jar包也拷貝到kafka目錄的libs下
5.啟動worker

bin/connect-distributed.sh config/hdfs/connect-distributed-hdfs.properties

connect-distributed-hdfs.properties文件內(nèi)容:
bootstrap.servers=master:9092

group.id=connect-cluster

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.topic=connect-offsets-hdfs
offset.storage.replication.factor=1
offset.storage.partitions=1

config.storage.topic=connect-configs-hdfs
config.storage.replication.factor=1
config.storage.partitions=1

status.storage.topic=connect-status-hdfs
status.storage.replication.factor=1
status.storage.partitions=1


offset.flush.interval.ms=10000

plugin.path=/opt/kafka_2.11-2.1.0/libs,/opt/kafka-connector/kafka-connect-hdfs/target/kafka-connect-hdfs-5.1.0-development/share/java/kafka-connect-hdfs

hive.integration=true
hive.metastore.uris=thrift://master:9083/default
schema.compatibility=BACKWARD

6.啟動connector

使用postman發(fā)送請求:
請求地址:http://slave1:8083/connectors
請求方式:POST
請求參數(shù):
{
  "name": "test-hive",
  "config": {
    "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
    "tasks.max": "1",
    "topics": "test_hive",
    "hdfs.url": "hdfs://master:9000",
    "flush.size": "3",
    "name": "test-hive"
  }
}

特性

HDFS connector提供以下特性:

  • Exactly once:connector使用wal機制確保每條記錄準確地導(dǎo)出到HDFS一次。此外,connector將Kafka的offsets保存在文件中來管理偏移量,以便在失敗和任務(wù)重啟時,可以從上次提交的偏移量開始。
  • 可擴展的數(shù)據(jù)格式:connector原生支持Avro和parquet格式。此外,可以通過擴展Format類使用其他格式。
  • Hive整合:connector支持開箱即用的Hive集成,啟用時,connector自動為導(dǎo)出到HDFS的每個topic創(chuàng)建一個Hive外部分區(qū)表。
  • Schema演化:connector支持schema演化以及不同的schema兼容性級別。當connector觀察到schema改變時,它根據(jù)schema.compatibility配置選定正確的schema。
  • 安全的HDFS和Hive Metastore支持:connector支持Kerberos身份驗證,因此可以使用安全的HDFS和Hive metastore。
  • 可插入的partitioner:connector支持默認的分區(qū)器、基于字段的分區(qū)器和基于事件的分區(qū)器??梢酝ㄟ^擴展Partitioner類來實現(xiàn)自己的分區(qū)器。此外,還可以通過擴展TimeBasedPartitioner類自定義基于時間的分區(qū)器。

Schema演化

? ? ? ?HDFS connector支持schema演化,并根據(jù)schema.compatibility的配置對schema的更改做出響應(yīng)。schema.compatibility支持四種模式:NONE,BACKWARD,FORWARD,ALL。

  • NONE:這種情況下,connector確保寫入HDFS的每個文件都具有適合的模式。當connector觀察到數(shù)據(jù)中的schema更改時,它為受影響的topic分區(qū)提交當前文件集,并在新文件中使用新模式寫入數(shù)據(jù)。
  • BACKWARD:我們總是可以用最后的schema統(tǒng)一查詢所有數(shù)據(jù)。例如,刪除字段是對schema的向后兼容,因為當我們遇到使用包含這些字段的舊的schema寫入的記錄時,我們可以忽略它們。添加具有默認值的字段也是向后兼容的。
    schema.compatibility設(shè)置為BACKWARD時,connector跟蹤HDFS寫入數(shù)據(jù)的最新schema,如果一個數(shù)據(jù)記錄schema版本比當前版本新,connector提交當前文件,并用新schema寫入文件。對于使用較早schema到達較晚的數(shù)據(jù)記錄,connector將數(shù)據(jù)記錄投射為最新的schema,然后再寫入HDFS的同一組文件。
  • FORWARD:如果schema.compatibility設(shè)置為FORWARD,我們總是使用最舊的schema統(tǒng)一查詢數(shù)據(jù)。刪除具有默認值的字段是向前兼容的,因為當字段丟失時,舊schema將使用默認值。
    如果schema.compatibility設(shè)置為FORWARD,connector在寫入HDFS中的同一組文件時,會將數(shù)據(jù)投射為最老的schema。
  • FULL:完全兼容意味著可以用舊schema讀取新數(shù)據(jù),也可以用新schema讀取舊數(shù)據(jù)。
    如果schema.compatibility設(shè)置為FULL,connector表現(xiàn)的動作同BACKWORD相同
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容