
Spark Streaming支持的數(shù)據(jù)源有兩大類,第一大類是基礎(chǔ)的數(shù)據(jù)源

第二大類是高級數(shù)據(jù)源





下面是我寫的樣例:
第一步配置flume的agent,并啟動flume,監(jiān)控一個文件夾datatest/,配置文件如下
# name the components on this agent
spooldir-memory-avro-streaming.sources = spooldir-source
spooldir-memory-avro-streaming.sinks = avro-sink
spooldir-memory-avro-streaming.channels = memory-channel
# Describe/configure the source
##注意:不能往監(jiān)控目中重復(fù)丟同名文件
## 通過spooldir來監(jiān)控文件內(nèi)容的變化
spooldir-memory-avro-streaming.sources.spooldir-source.type = spooldir
spooldir-memory-avro-streaming.sources.spooldir-source.spoolDir =/usr/local/datatest
spooldir-memory-avro-streaming.sources.spooldir-source.fileHeader = true
spooldir-memory-avro-streaming.sources.spooldir-source.deletePolicy=immediate
spooldir-memory-avro-streaming.sources.spooldir-source.ignorePattern=^(.)*\\.out$
# Describe the sink
spooldir-memory-avro-streaming.sinks.avro-sink.type = avro
spooldir-memory-avro-streaming.sinks.avro-sink.hostname=10.101.3.3
spooldir-memory-avro-streaming.sinks.avro-sink.port=44445
spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel
# Use a channel which buffers events in memory
##使用內(nèi)存的方式
spooldir-memory-avro-streaming.channels.memory-channel.type = memory
# Bind the source and sink to the channel
spooldir-memory-avro-streaming.sources.spooldir-source.channels = memory-channel
spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel
第二步編寫streaming與flume整合的java代碼



第三步提交jar到spark上,由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &
bin/spark-submit --class com.liushun.Flume2StreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar
第四步 啟動flume由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &
./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streaming.conf --name spooldir-memory-avro-streaming -Dflume.root.logger=INFO,console
注意:一定不能先啟動flume,因為flume在啟動時會預(yù)先測試監(jiān)聽端口是否存在,如果存在才繼續(xù)啟動,否則就會中斷啟動,報錯。
第五步 驗證
創(chuàng)建文件移動到usr/local/datatest目錄下,觀察streaming打印的日志是否跟我們預(yù)想的一樣。

在運行程序時你可能會遇到ClassNotFound的錯誤,不用緊張,只需要根據(jù)錯誤,去你的maven倉庫種找到相應(yīng)的jar包添加到spark的jars目錄中即可。
截止目前位置:flume主動推送到Streaming的方式介紹完畢,接下來我們介紹另外一種。

那么我們要使用這種方式,那么具體步驟是什么尼?看下面

第一步:配置flume
(1)添加jar包


(2)編寫運行的自定義的flume的agent配置文件


第二步編寫Spark Streaming應(yīng)用程序
以下是我的整合樣例,僅供參考
第一步、配置flume的agent的自定義Sink,并啟動flume,監(jiān)控一個文件夾datatest/,配置文件如下
# name the components on this agent
spooldir-memory-avro-streamingfromSink.sources = spooldir-source
spooldir-memory-avro-streamingfromSink.sinks = spark-sink
spooldir-memory-avro-streamingfromSink.channels = memory-channel
# Describe/configure the source
##注意:不能往監(jiān)控目中重復(fù)丟同名文件
## 通過spooldir來監(jiān)控文件內(nèi)容的變化
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.type = spooldir
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.spoolDir =/usr/local/datatest
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.fileHeader = true
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.deletePolicy=immediate
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.ignorePattern=^(.)*\\.out$
# Describe the sink
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.hostname=10.101.3.3
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.port=44445
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel
# Use a channel which buffers events in memory
##使用內(nèi)存的方式
spooldir-memory-avro-streamingfromSink.channels.memory-channel.type = memory
# Bind the source and sink to the channel
spooldir-memory-avro-streamingfromSink.sources.spooldir-source.channels = memory-channel
spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel
應(yīng)用程序編寫樣例:



第三步啟動flume的,由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &
./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streamingfromSink.conf --name spooldir-memory-avro-streamingfromSink -Dflume.root.logger=INFO,console
第四步啟動應(yīng)用程序,由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &
./spark-submit --class com.liushun.StreamingFromFlumeWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar
驗證結(jié)果:
