Flume實(shí)踐

Flume簡介

flume 作為 cloudera 開發(fā)的實(shí)時(shí)日志收集系統(tǒng),受到了業(yè)界的認(rèn)可與廣泛應(yīng)用。Flume 初始的發(fā)行版本目前被統(tǒng)稱為 Flume OG(original generation),屬于 cloudera。但隨著 FLume 功能的擴(kuò)展,F(xiàn)lume OG 代碼工程臃腫、核心組件設(shè)計(jì)不合理、核心配置不標(biāo)準(zhǔn)等缺點(diǎn)暴露出來,尤其是在 Flume OG 的最后一個(gè)發(fā)行版本 0.9.4. 中,日志傳輸不穩(wěn)定的現(xiàn)象尤為嚴(yán)重,為了解決這些問題,2011 年 10 月 22 號(hào),cloudera 完成了 Flume-728,對 Flume 進(jìn)行了里程碑式的改動(dòng):重構(gòu)核心組件、核心配置以及代碼架構(gòu),重構(gòu)后的版本統(tǒng)稱為 Flume NG(next generation);改動(dòng)的另一原因是將 Flume 納入 apache 旗下,cloudera Flume 改名為 Apache Flume。

Flume的核心組件

image
  • Source
    Source 負(fù)責(zé)接收 event 或通過特殊機(jī)制產(chǎn)生 event,并將 events 批量的放到一個(gè)或多個(gè)Channel, Flume提供了各種source的實(shí)現(xiàn),包括Avro Source、 Exce Source、 SpoolingDirectory Source、 NetCat Source、 Syslog Source、 Syslog TCP Source、Syslog UDP Source、 HTTP Source、 HDFS Source, etc

  • Channel
    Flume Channel主要提供一個(gè)隊(duì)列的功能,對source提供中的數(shù)據(jù)進(jìn)行簡單的緩存。 Flume對于Channel, 則提供了Memory Channel、 JDBC Chanel、 File Channel,etc

  • Sink
    Flume Sink取出Channel中的數(shù)據(jù),進(jìn)行相應(yīng)的存儲(chǔ)文件系統(tǒng),數(shù)據(jù)庫,或者提交到遠(yuǎn)程服務(wù)器,包括HDFS sink、 Logger sink、 Avro sink、 File Roll sink、 Null sink、 HBasesink、 etc

消費(fèi)kafka推送到HDFS

  • 配置文件

        agent.sources = source_from_kafka
        # channels alias
        agent.channels = mem_channel
        # sink alias
        agent.sinks = hdfs_sink
    
        # define kafka source
        agent.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
        agent.sources.source_from_kafka.channels = mem_channel
        agent.sources.source_from_kafka.batchSize = 1000
        agent.sources.source_from_kafka.kafka.consumer.auto.offset.reset = latest
        # set kafka broker address  
        agent.sources.source_from_kafka.kafka.bootstrap.servers = 127.0.0.1:9092
        # set kafka topic
        agent.sources.source_from_kafka.kafka.topics = intelligence-building
        # set kafka groupid
        agent.sources.source_from_kafka.kafka.consumer.group.id = building-group
                
        # defind hdfs sink
        agent.sinks.hdfs_sink.type = hdfs         
        # specify the channel the sink should use  
        agent.sinks.hdfs_sink.channel = mem_channel
        # set store hdfs path
        agent.sinks.hdfs_sink.hdfs.path = hdfs://127.0.0.1:9000/data/flume/kafka/%Y%m%d   
            
        # set file size to trigger roll
        agent.sinks.hdfs_sink.hdfs.rollSize = 0 
        agent.sinks.hdfs_sink.hdfs.rollCount = 0  
        agent.sinks.hdfs_sink.hdfs.rollInterval = 3600  
        agent.sinks.hdfs_sink.hdfs.threadsPoolSize = 16
        agent.sinks.hdfs_sink.hdfs.fileType = DataStream
        agent.sinks.hdfs_sink.hdfs.writeFormat = Text
        agent.sinks.hdfs_sink.hdfs.callTimeout = 3600000
        agent.sinks.hdfs_sink.hdfs.useLocalTimeStamp = true    
        agent.sinks.hdfs_sink.hdfs.filePrefix = FlumeData_%H
        #agent.sinks.hdfs_sink.hdfs.fileSuffix = .log
        # define channel from kafka source to hdfs sink 
        agent.channels.mem_channel.type = memory
        # channel store size
        agent.channels.mem_channel.capacity = 100000
        # transaction size
        agent.channels.mem_channel.transactionCapacity = 10000
        agent.channels.mem_channel.keep-alive = 60
        agent.channels.mem_channel.capacity = 1000000
    
    
  • 啟動(dòng)腳本

        ./bin/flume-ng agent --conf ./conf --name agent --conf-file ./conf/flume-hdfs.example -Dflume.root.logger=INFO,console >log.log 2>&1 &
    
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容