這一篇文章詳細介紹企業(yè)級實時流處理的核心流程,是我們學習大數據的重要里程碑。重要性和重要等級是最高的,請大家務必仔細閱讀并認真的進行實操練習。
在我們進行整個整合工作之前,我們需要有一個整體架構圖來直觀的展示我們的思路:

第一步? 整合日志輸出到Flume
由于直接通過log4j appender寫入到flume,對程序的耦合性太高,我使用了flume主動監(jiān)控日志文件夾的方式來收集日志的。
關于flume監(jiān)控文件夾的方式我在flume如何監(jiān)控多個動態(tài)變化的日志文件一文中有詳細介紹
第二步? 整合Flume到Kafka
#test-avro-memory-hdfsAndkafka的angent,同一份數據寫入到兩個服務,只需要在flume中配置多個sink即可是實現,配置詳情如下。
##配置source
test-avro-memory-hdfsAndkafka.sources=avro-source
##同一份數據,輸出到兩個目標服務
##輸出到hdfs
test-avro-memory-hdfsAndkafka.sinks=hdfs-sink kafka-sink
##輸出到kafka
#test-avro-memory-hdfsAndkafka.sinks=kafka-sink
test-avro-memory-hdfsAndkafka.channels=memory-channel
test-avro-memory-hdfsAndkafka.sources.avro-source.type=avro
test-avro-memory-hdfsAndkafka.sources.avro-source.bind=10.101.3.3
test-avro-memory-hdfsAndkafka.sources.avro-source.port=44444
# Describe the hdfs-sink
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.type=hdfs
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.path = hdfs://10.101.3.3:9000/xcx/test/%y-%m-%d/%H%M/
#表示文件的前綴
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.filePrefix = xcx
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileSuffix =.txt
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.writeFormat=Text
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileType=DataStream
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.round = true
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundValue =30
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundUnit = minute
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollSize=1048576
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollCount=0
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.minBlockReplicas=1
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
# Describe the kafka-sink
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.type =org.apache.flume.sink.kafka.KafkaSink
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.brokerList=10.101.3.3:9092
###需要提前在kafka中新建一個topic,名字為flume_kafka_streaming,供flume和streaming使用
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.topic=flume_kafka_streaming
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.requiredAcks=1
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.batchSize=5
# Use a channel which buffers events in memory
test-avro-memory-hdfsAndkafka.channels.memory-channel.type = memory
# Bind the source and sink to the channel
test-avro-memory-hdfsAndkafka.sources.avro-source.channels=memory-channel
test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.channel=memory-channel
test-avro-memory-hdfsAndkafka.sinks.kafka-sink.channel=memory-channel