版權(quán)聲明:本文為博主原創(chuàng)文章,未經(jīng)博主允許不得轉(zhuǎn)載
本文是基于hadoop 2.7.1,以及kafka 0.11.0.0。kafka-connect是以單節(jié)點模式運行,即standalone。
一. 首先,先對kafka和kafka connect做一個簡單的介紹
kafka:Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者規(guī)模的網(wǎng)站中的所有動作流數(shù)據(jù)。比較直觀的解釋就是其有一個生產(chǎn)者(producer)和一個消費者(consumer)??梢詫afka想象成一個數(shù)據(jù)容器,生產(chǎn)者負責(zé)發(fā)送數(shù)據(jù)到這個容器中,而消費者從容器中取出數(shù)據(jù),在將數(shù)據(jù)做處理,如存儲到hdfs。
kafka connect:Kafka Connect是一種用于在Kafka和其他系統(tǒng)之間可擴展的、可靠的流式傳輸數(shù)據(jù)的工具。它使得能夠快速定義將大量數(shù)據(jù)集合移入和移出Kafka的連接器變得簡單。即適合批量數(shù)據(jù)導(dǎo)入導(dǎo)出操作。
二. 下面將介紹如何用kafka connect將數(shù)據(jù)寫入到hdfs中。包括在這個過程中可能碰到的一些問題說明。
首先啟動kafka-connect:
bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
這個命令后面兩個參數(shù),
第一個是指定啟動的模式,有分布式和單節(jié)點兩種,這里是單節(jié)點。kafka自帶,放于config目錄下。
第二個參數(shù)指向描述connector的屬性的文件,可以有多個,這里只有一個connector用來寫入到hdfs。需要自己創(chuàng)建。
接下來看看connector1.properties的內(nèi)容,
name="test" #該connector的名字
#將自己按connect接口規(guī)范編寫的代碼打包后放在kafka/libs目錄下,再根據(jù)項目結(jié)構(gòu)引用對應(yīng)
connector connector.class=hdfs.HdfsSinkConnector
#Task是導(dǎo)入導(dǎo)出的具體實現(xiàn),這里是指定多少個task來并行運行導(dǎo)入導(dǎo)出作業(yè),由多線程實現(xiàn)。由于hdfs中一個文件每次只能又一個文件操作,所以這里只能是1
tasks.max=1
#指定從哪個topic讀取數(shù)據(jù),這些其實是用來在connector或者task的代碼中讀取的。 topics=test #指定key以那種方式轉(zhuǎn)換,需和Producer發(fā)送方指定的序列化方式一致 key.converter=org.apache.kafka.connect.converters.ByteArrayConverter value.converter=org.apache.kafka.connect.json.JsonConverter #同上
hdfs.url=hdfs://127.0.0.1:9000 #hdfs的url路徑,在Connector中會被讀取 hdfs.path=/test/file #hdfs文件路徑,同樣Connector中被讀取
key.converter.schemas.enable=true #稍后介紹,可以true也可以false,影響傳輸格式 value.converter.schemas.enable=true #稍后介紹,可以true也可以false
三. 接下來看代碼,connect主要是導(dǎo)入導(dǎo)出兩個概念,導(dǎo)入是source,導(dǎo)出時Sink。這里只使用Sink,不過Source和Sink的實現(xiàn)其實基本相同。
實現(xiàn)Sink其實不難,實現(xiàn)對應(yīng)的接口,即SinkConnector和SinkTask兩個接口,再打包放到kafka/libs目錄下即可。其中SinkConnector只有一個,而Task可以有多
先是Connector
publicclassHdfsSinkConnectorextends SinkConnector {
? ? //這兩項為配置hdfs的urlh和路徑的配置項,需要在connector1.properties中指定publicstaticfinalString HDFS_URL = "hdfs.url";
? ? publicstaticfinalString HDFS_PATH = "hdfs.path";
? ? privatestaticfinalConfigDef CONFIG_DEF =new ConfigDef()
? ? ? ? ? ? .define(HDFS_URL, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs url")
? ? ? ? ? ? .define(HDFS_PATH, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "hdfs path");
? ? private String hdfsUrl;
? ? private String hdfsPath;
? ? @Override
? ? public String version() {
? ? ? ? returnAppInfoParser.getVersion();? ? }
//start方法會再初始的時候執(zhí)行一次,這里主要用于配置? ? @Override
publicvoidstart(Map props) {
? ? ? ? hdfsUrl = props.get(HDFS_URL);
? ? ? ? hdfsPath = props.get(HDFS_PATH);
? ? }
//這里指定了Task的類
? ? @Override
? ? publicClass taskClass() {
? ? ? ? returnHdfsSinkTask.class;
? ? }
//用于配置Task的config,這些都是會在Task中用到
? ? @Override
? ? publicList> taskConfigs(int maxTasks) {
? ? ? ? ArrayList> configs =newArrayList<>();
? ? ? ? for(inti = 0; i < maxTasks; i++) {
? ? ? ? ? ? Map config =newHashMap<>();
? ? ? ? ? ? if(hdfsUrl !=null)
? ? ? ? ? ? ? ? config.put(HDFS_URL, hdfsUrl);
? ? ? ? ? ? if(hdfsPath !=null)
? ? ? ? ? ? ? ? config.put(HDFS_PATH, hdfsPath);
? ? ? ? ? ? configs.add(config);
? ? ? ? }
? ? ? ? return configs;
? ? }
//關(guān)閉時的操作,一般是關(guān)閉資源。
? ? @Override
? ? publicvoid stop() {
? ? ? ? // Nothing to do since FileStreamSinkConnector has no background monitoring.? ? }
? ? @Override
? ? public ConfigDef config() {
? ? ? ? return CONFIG_DEF;
? ? }
}
接下來是Task
publicclassHdfsSinkTaskextends SinkTask {
? ? privatestaticfinalLogger log = LoggerFactory.getLogger(HdfsSinkTask.class);
? ? private String filename;
? ? publicstatic String hdfsUrl;
? ? publicstatic String hdfsPath;
? ? private Configuration conf;
? ? private FSDataOutputStream os;
? ? private FileSystem hdfs;
? ? public HdfsSinkTask(){
? ? }
? ? @Override
? ? public String version() {
? ? ? ? returnnew HdfsSinkConnector().version();
? ? }
//Task開始會執(zhí)行的代碼,可能有多個Task,所以每個Task都會執(zhí)行一次
? ? @Override
? ? publicvoidstart(Map props) {
? ? ? ? hdfsUrl = props.get(HdfsSinkConnector.HDFS_URL);
? ? ? ? hdfsPath = props.get(HdfsSinkConnector.HDFS_PATH);
? ? ? ? System.out.println("----------------------------------- start--------------------------------");
? ? ? ? conf =new Configuration();conf.set("fs.defaultFS", hdfsUrl);
? ? ? ? //這兩個是與hdfs append相關(guān)的設(shè)置conf.setBoolean("dfs.support.append",true);
? ? ? ? conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
? ? ? ? try{
? ? ? ? ? ? hdfs = FileSystem.get(conf);//? ? ? ? ? ? connector.hdfs = new Path(HDFSPATH).getFileSystem(conf);os = hdfs.append(new Path(hdfsPath));
? ? ? ? }catch (IOException e){
? ? ? ? ? ? System.out.println(e.toString());
? ? ? ? }
? ? }
//核心操作,put就是將數(shù)據(jù)從kafka中取出,存放到其他地方去
? ? @Override
? ? publicvoidput(Collection sinkRecords) {
? ? ? ? for (SinkRecord record : sinkRecords) {
? ? ? ? ? ? log.trace("Writing line to {}: {}", logFilename(), record.value());
? ? ? ? ? ? try{
? ? ? ? ? ? ? ? System.out.println("write info------------------------" + record.value().toString() + "-----------------");
? ? ? ? ? ? ? ? os.write((record.value().toString()).getBytes("UTF-8"));
? ? ? ? ? ? ? ? os.hsync();
? ? ? ? ? ? }catch(Exception e){
? ? ? ? ? ? ? ? System.out.print(e.toString());
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? @Override
? ? publicvoidflush(Map offsets) {
? ? ? ? try{
? ? ? ? ? ? os.hsync();
? ? ? ? }catch(Exception e){? ? ? ? ? ? System.out.print(e.toString());? ? ? ? }? ? }
//同樣是結(jié)束時候所執(zhí)行的代碼,這里用于關(guān)閉hdfs資源? ? @Override
publicvoid stop() {
? ? ? ? try {
? ? ? ? ? ? os.close();
? ? ? ? }catch(IOException e){
? ? ? ? ? ? System.out.println(e.toString());
? ? ? ? }
? ? }
? ? private String logFilename() {
? ? ? ? returnfilename ==null? "stdout" : filename;
? ? }
}
這里重點提一下,因為在connector1.propertise中設(shè)置了key.converter=org.apache.kafka.connect.converters.ByteArrayConverter,所以不能用命令行形式的
producer發(fā)送數(shù)據(jù),而是要用程序的方式,并且在producer總也要設(shè)置key的序列化形式為org.apache.kafka.common.serialization.ByteArraySerializer。
編碼完成,先用idea以開發(fā)程序與依賴包分離的形式打包成jar包,然后將程序?qū)?yīng)的jar包(一般就是“項目名.jar”)放到kafka/libs目錄下面,這樣就能被找到。
四. 接下來對這個過程的問題做一個匯總。
1.在connector1.properties中的key.converter.schemas.enable=false和value.converter.schemas.enable=false的問題。
這個選項默認(rèn)在connect-standalone.properties中是true的,這個時候發(fā)送給topic的Json格式是需要使用avro格式,具體情況可以百度,這里給出一個樣例。
{
? ? "schema": {
? ? ? ? "type": "struct",
? ? ? ? "fields": [{
? ? ? ? ? ? "type": "int32",
? ? ? ? ? ? "optional": true,
? ? ? ? ? ? "field": "c1"
? ? ? ? }, {
? ? ? ? ? ? "type": "string",
? ? ? ? ? ? "optional": true,
? ? ? ? ? ? "field": "c2"
? ? ? ? }, {
? ? ? ? ? ? "type": "int64",
? ? ? ? ? ? "optional": false,
? ? ? ? ? ? "name": "org.apache.kafka.connect.data.Timestamp",
? ? ? ? ? ? "version": 1,
? ? ? ? ? ? "field": "create_ts"
? ? ? ? }, {
? ? ? ? ? ? "type": "int64",
? ? ? ? ? ? "optional": false,
? ? ? ? ? ? "name": "org.apache.kafka.connect.data.Timestamp",
? ? ? ? ? ? "version": 1,
? ? ? ? ? ? "field": "update_ts"
? ? ? ? }],
? ? ? ? "optional": false,
? ? ? ? "name": "foobar"
? ? },
? ? "payload": {
? ? ? ? "c1": 10000,
? ? ? ? "c2": "bar",
? ? ? ? "create_ts": 1501834166000,
? ? ? ? "update_ts": 1501834166000
? ? }
}?
主要就是schema和payload這兩個,不按照這個格式會報錯如下
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
? at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
如果想發(fā)送普通的json格式而不是avro格式的話,很簡單key.converter.schemas.enable和value.converter.schemas.enable設(shè)置為false就行。這樣就能發(fā)送普通的json格式數(shù)據(jù)。
2.在啟動的過程中出現(xiàn)各種各樣的java.lang.ClassNotFoundException。
在啟動connector的時候,一開始總是會報各個各樣的ClassNotFoundException,不是這個包就是那個包,查找問題一直說要么缺少包要么是包沖突。這個是什么原因呢?
其實歸根結(jié)底還是依賴沖突的問題,因為kafka程序自定義的類加載器加載類的目錄是在kafka/libs中,而寫到hdfs需要hadoop的包。
我一開始的做法是將hadoop下的包路徑添加到CLASSPATH中,這樣子問題就來了,因為kafka和hadoop的依賴包是有沖突的,比如hadoop是guava-11.0.2.jar,而kafka是guava-20.0.jar,兩個jar包版本不同,而我們是在kafka程序中調(diào)用hdfs,所以當(dāng)jar包沖突時應(yīng)該優(yōu)先調(diào)用kafka的。但是注意kafka用的是程序自定義的類加載器,其優(yōu)先級是低于CLASSPATH路徑下的類的,就是說加載類時會優(yōu)先加載CLASSPATH下的類。這樣子就有問題了。
我的解決方案時將kafka和hadoop加載的jar包路徑都添加到CLASSPATH中,并且kafka的路徑寫在hadoop前面,這樣就可以啟動connector成功。