如何配置KafkaChannel

完整配置

# Define spooling source
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /tmp/flume
a1.sources.s1.channels = c1

a1.sources.s1.interceptors = i1
a1.sources.s1.interceptors.i1.type = regex_extractor
a1.sources.s1.interceptors.i1.regex =.*\\|(.*)\\|.*
a1.sources.s1.interceptors.i1.serializers = e1
a1.sources.s1.interceptors.i1.serializers.e1.name = key

# Define a kafka channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = host1:6667,host2:6667,host3:6667
a1.channels.c1.kafka.topic = test
a1.channels.c1.parseAsFlumeEvent = false

a1.channels = c1
a1.sources = s1
a1.sinks =k1

例如在/tmp/flume下面放置一個文件,內(nèi)容a|b|c
那么通過上面的配置,消費一下kafka的test,看一下結(jié)果

sh /usr/hdp/2.5.0.0-1245/kafka/bin/kafka-console-consumer.sh --zookeeper host1:2181 --topic test  --property print.key=true

b       a|b|c
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容