flume與kafka集成

環(huán)境:flume-1.6,kafka_2.11-0.9.0.0。完成將/usr/local/nohup.out下的日志實(shí)時(shí)輸出到kafka中,便于后續(xù)處理,包括將日志歸檔到hdfs,以及與storm,spark等集成,進(jìn)行實(shí)時(shí)日志的分析。

1. 配置kafka:

1)server.properties?

host.name=localhost, log.dir=/usr/local/kafka-logs

2)zookeeper.properties

dataDir=/usr/local/zookeeper/data

3)zookeeper啟動(dòng)

bin/zookeeper-server-start.sh config/zookeeper.properties

2. 啟動(dòng)kafka:

1)啟動(dòng)server:?bin/kafka-server-start.sh config/server.properties

bin/kafka-topics.sh--create--zookeeper localhost:2181--replication1--partition1--topic test

2)啟動(dòng)一個(gè)consumer: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

3. 配置flume:

1)flume-conf.properties

#agent section

producer.sources = s

producer.channels = c

producer.sinks = r

#source section

producer.sources.s.type = exec

producer.sources.s.command = tail -F /usr/local/nohup.out

producer.sources.s.channels = c

# Each sink's type must be defined

producer.sinks.r.type = org.apache.flume.plugins.KafkaSink

producer.sinks.r.custom.topic.name = test

producer.sinks.r.metadata.broker.list = 127.0.0.1:9092

producer.sinks.r.partition.key = 0

producer.sinks.r.partitioner.class = org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class = kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks = 1

producer.sinks.r.max.message.size = 1000000

#Specify the channel the sink should use

producer.sinks.r.channel = c

# Each channel's type is defined.

producer.channels.c.type = memory

producer.channels.c.capacity = 1000

4. 啟動(dòng)flume:

1) bin/flume-ng agent --conf conf --conf-file conf/flume-conf.properties --name producer -Dflume.root.logger=INFO,console

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容