flume+kafka接收syslog實(shí)戰(zhàn)


1、環(huán)境配置:

平臺(tái)版本:CDH 5.15.1

服務(wù)器資源:VM虛擬機(jī)三臺(tái)(規(guī)格:CPU 2核,內(nèi)存8G,硬盤300G)

組件版本:flume使用但是5.15.1自帶的版本(1.6),kafka是CDH 5.15.1兼容版本。

部署方式:flume agent和kafka 部署在相同節(jié)點(diǎn)。


2、flume的配置

整個(gè)項(xiàng)目中之所以選用flume,是看中了flume強(qiáng)大靈活的日志收集功能。本文中CDH使用的是flume agent的部署方式,一個(gè)agent包含了source,channel和sink的三個(gè)部分組成(更加詳細(xì)的請(qǐng)參考官網(wǎng)文檔)。


flume agent基本模型

這里source我采用的是syslog,kafka作為sink,channel選用memory。

由于項(xiàng)目的開發(fā)環(huán)境使用時(shí)CDH 5.15.1,所以添加flume agent的配置很友好(其實(shí)還是vim修改配置文件比較順手)。此處注意項(xiàng):

代理名稱(也是就是agent名稱)需要和下面配置文件中的一致。


CDH環(huán)境的flume agent修改項(xiàng)



flume agent的詳細(xì)配置如下:

# Please paste flume.conf here. Example:

# Sources, channels, and sinks are defined per

# agent name, in this case 'tier1'.

tier1.sources? = source1

tier1.channels = channel1

tier1.sinks? ? = sink1

# For each source, channel, and sink, set

# standard properties.

tier1.sources.source1.type? ? = syslogtcp

tier1.sources.source1.host? ? = cdh-3

tier1.sources.source1.port? ? = 5140

tier1.sources.source1.channels = channel1

#tier1.channels.channel1.type? = file

#tier1.channels.channel1.checkpointDir = /home/flume/channel/checkpoint

#tier1.channels.channel1.dataDirs = /home/flume/channel/data

tier1.channels.channel1.type? = memory

tier1.channels.channel1.capacity =10000

tier1.channels.channel1.transactionCapacity=10000

tier1.sinks.sink1.type? ? ? ? = org.apache.flume.sink.kafka.KafkaSink

tier1.sinks.sink1.kafka.topic = rawlog

tier1.sinks.sink1.kafka.bootstrap.servers = cdh-3:9092

tier1.sinks.sink1.kafka.flumeBatchSize = 2000

tier1.sinks.sink1.kafka.producer.acks = 1

tier1.sinks.sink1.channel? ? ? = channel1

此處tier1就是之前agent的名稱,需保持一致。kafka sink的topic需要提前創(chuàng)建,flume是不會(huì)自動(dòng)創(chuàng)建的。

3、功能測試

修改完flume agent的配置,重新啟動(dòng)下agent。agent會(huì)啟動(dòng)并監(jiān)聽5140端口,如果沒有請(qǐng)排查下日志,看是否有其他問題。

為了能準(zhǔn)確觀察到數(shù)據(jù)進(jìn)入kafka的狀態(tài),這里采用了KafkaOffsetMonitor來監(jiān)控狀態(tài)。KafkaOffsetMonitor是一個(gè)可以用于監(jiān)控Kafka的Topic及Consumer消費(fèi)狀況的工具,其配置和使用特別的方便。源項(xiàng)目Github地址為:https://github.com/quantifind/KafkaOffsetMonitor。?

我們采用了如下的啟動(dòng)參數(shù):

java -cp KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk 0.0.0.0:2181 --port 9999 --refresh 2.seconds --retain 2.days

如何發(fā)送syslog報(bào)文這里就不再描述了。進(jìn)行了下簡單測試,單節(jié)點(diǎn)agent采用如上的配置,基本上可以達(dá)到5k以上的入庫性能。具體數(shù)據(jù)如圖所示,注意我們只需觀察offset和時(shí)間軸就可粗略估算。發(fā)送了20k條數(shù)據(jù),4s后完全進(jìn)入kafka。


發(fā)送前kafka offset是906370
發(fā)送完后offset是926369
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容