Flume簡介
flume 作為 cloudera 開發(fā)的實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的認(rèn)可與廣泛應(yīng)用。Flume 初始的發(fā)行版本目前被統(tǒng)稱為 Flume OG(original generation),屬于 cloudera。但隨著 FLume 功能的擴(kuò)展,F(xiàn)lume OG 代碼工程臃腫、核心組件設(shè)計(jì)不合理、核心配置不標(biāo)準(zhǔn)等缺點(diǎn)暴露出來,尤其是在 Flume OG 的最后一個(gè)發(fā)行版本 0.9.4. 中,日志傳輸不穩(wěn)定的現(xiàn)象尤為嚴(yán)重,為了解決這些問題,2011 年 10 月 22 號(hào),cloudera 完成了 Flume-728,對 Flume 進(jìn)行了里程碑式的改動(dòng):重構(gòu)核心組件、核心配置以及代碼架構(gòu),重構(gòu)后的版本統(tǒng)稱為 Flume NG(next generation);改動(dòng)的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。
Flume的核心組件
Source
Source 負(fù)責(zé)接收 event 或通過特殊機(jī)制產(chǎn)生 event,并將 events 批量的放到一個(gè)或多個(gè)Channel, Flume提供了各種source的實(shí)現(xiàn),包括Avro Source、 Exce Source、 SpoolingDirectory Source、 NetCat Source、 Syslog Source、 Syslog TCP Source、Syslog UDP Source、 HTTP Source、 HDFS Source, etcChannel
Flume Channel主要提供一個(gè)隊(duì)列的功能,對source提供中的數(shù)據(jù)進(jìn)行簡單的緩存。 Flume對于Channel, 則提供了Memory Channel、 JDBC Chanel、 File Channel,etcSink
Flume Sink取出Channel中的數(shù)據(jù),進(jìn)行相應(yīng)的存儲(chǔ)文件系統(tǒng),數(shù)據(jù)庫,或者提交到遠(yuǎn)程服務(wù)器,包括HDFS sink、 Logger sink、 Avro sink、 File Roll sink、 Null sink、 HBasesink、 etc
消費(fèi)kafka推送到HDFS
-
配置文件
agent.sources = source_from_kafka # channels alias agent.channels = mem_channel # sink alias agent.sinks = hdfs_sink # define kafka source agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource agent.sources.source_from_kafka.channels = mem_channel agent.sources.source_from_kafka.batchSize = 1000 agent.sources.source_from_kafka.kafka.consumer.auto.offset.reset = latest # set kafka broker address agent.sources.source_from_kafka.kafka.bootstrap.servers = 127.0.0.1:9092 # set kafka topic agent.sources.source_from_kafka.kafka.topics = intelligence-building # set kafka groupid agent.sources.source_from_kafka.kafka.consumer.group.id = building-group # defind hdfs sink agent.sinks.hdfs_sink.type = hdfs # specify the channel the sink should use agent.sinks.hdfs_sink.channel = mem_channel # set store hdfs path agent.sinks.hdfs_sink.hdfs.path = hdfs://127.0.0.1:9000/data/flume/kafka/%Y%m%d # set file size to trigger roll agent.sinks.hdfs_sink.hdfs.rollSize = 0 agent.sinks.hdfs_sink.hdfs.rollCount = 0 agent.sinks.hdfs_sink.hdfs.rollInterval = 3600 agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 16 agent.sinks.hdfs_sink.hdfs.fileType = DataStream agent.sinks.hdfs_sink.hdfs.writeFormat = Text agent.sinks.hdfs_sink.hdfs.callTimeout = 3600000 agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true agent.sinks.hdfs_sink.hdfs.filePrefix = FlumeData_%H #agent.sinks.hdfs_sink.hdfs.fileSuffix = .log # define channel from kafka source to hdfs sink agent.channels.mem_channel.type = memory # channel store size agent.channels.mem_channel.capacity = 100000 # transaction size agent.channels.mem_channel.transactionCapacity = 10000 agent.channels.mem_channel.keep-alive = 60 agent.channels.mem_channel.capacity = 1000000 -
啟動(dòng)腳本
./bin/flume-ng agent --conf ./conf --name agent --conf-file ./conf/flume-hdfs.example -Dflume.root.logger=INFO,console >log.log 2>&1 &