Spark Streaming整合Flume、Kafka打造實時流處理基礎框架

這一篇文章詳細介紹企業(yè)級實時流處理的核心流程,是我們學習大數據的重要里程碑。重要性和重要等級是最高的,請大家務必仔細閱讀并認真的進行實操練習。

在我們進行整個整合工作之前,我們需要有一個整體架構圖來直觀的展示我們的思路:


第一步? 整合日志輸出到Flume

由于直接通過log4j appender寫入到flume,對程序的耦合性太高,我使用了flume主動監(jiān)控日志文件夾的方式來收集日志的。

關于flume監(jiān)控文件夾的方式我在flume如何監(jiān)控多個動態(tài)變化的日志文件一文中有詳細介紹


第二步? 整合Flume到Kafka

#test-avro-memory-hdfsAndkafka的angent,同一份數據寫入到兩個服務,只需要在flume中配置多個sink即可是實現,配置詳情如下。

##配置source

test-avro-memory-hdfsAndkafka.sources=avro-source

##同一份數據,輸出到兩個目標服務

##輸出到hdfs

test-avro-memory-hdfsAndkafka.sinks=hdfs-sink kafka-sink

##輸出到kafka

#test-avro-memory-hdfsAndkafka.sinks=kafka-sink

test-avro-memory-hdfsAndkafka.channels=memory-channel

test-avro-memory-hdfsAndkafka.sources.avro-source.type=avro

test-avro-memory-hdfsAndkafka.sources.avro-source.bind=10.101.3.3

test-avro-memory-hdfsAndkafka.sources.avro-source.port=44444

# Describe the hdfs-sink

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.type=hdfs

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.path = hdfs://10.101.3.3:9000/xcx/test/%y-%m-%d/%H%M/

#表示文件的前綴

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.filePrefix = xcx

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileSuffix =.txt

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.writeFormat=Text

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.fileType=DataStream

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.round = true

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundValue =30

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.roundUnit = minute

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollSize=1048576

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.rollCount=0

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.minBlockReplicas=1

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true

# Describe the kafka-sink

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.type =org.apache.flume.sink.kafka.KafkaSink

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.brokerList=10.101.3.3:9092

###需要提前在kafka中新建一個topic,名字為flume_kafka_streaming,供flume和streaming使用

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.topic=flume_kafka_streaming

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.requiredAcks=1

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.batchSize=5

# Use a channel which buffers events in memory

test-avro-memory-hdfsAndkafka.channels.memory-channel.type = memory

# Bind the source and sink to the channel

test-avro-memory-hdfsAndkafka.sources.avro-source.channels=memory-channel

test-avro-memory-hdfsAndkafka.sinks.hdfs-sink.channel=memory-channel

test-avro-memory-hdfsAndkafka.sinks.kafka-sink.channel=memory-channel

第三步? 整合Kafka到Spark Streaming

請參看Spark Streaming整合kafka

第四步 使用Spark Streaming對接收到的數據進行處理

請參看Spark Streaming整合kafka

?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容