Spark Streaming整合flume

兩種整合方式一種方式是推送,另一種方式是拉取

Spark Streaming支持的數(shù)據(jù)源有兩大類,第一大類是基礎(chǔ)的數(shù)據(jù)源

第二大類是高級數(shù)據(jù)源

Spark Streaming和Flume的整合指南

下面是我寫的樣例:

第一步配置flume的agent,并啟動flume,監(jiān)控一個文件夾datatest/,配置文件如下

# name the components on this agent

spooldir-memory-avro-streaming.sources = spooldir-source

spooldir-memory-avro-streaming.sinks = avro-sink

spooldir-memory-avro-streaming.channels = memory-channel

# Describe/configure the source

##注意:不能往監(jiān)控目中重復(fù)丟同名文件

## 通過spooldir來監(jiān)控文件內(nèi)容的變化

spooldir-memory-avro-streaming.sources.spooldir-source.type = spooldir

spooldir-memory-avro-streaming.sources.spooldir-source.spoolDir =/usr/local/datatest

spooldir-memory-avro-streaming.sources.spooldir-source.fileHeader = true

spooldir-memory-avro-streaming.sources.spooldir-source.deletePolicy=immediate

spooldir-memory-avro-streaming.sources.spooldir-source.ignorePattern=^(.)*\\.out$

# Describe the sink

spooldir-memory-avro-streaming.sinks.avro-sink.type = avro

spooldir-memory-avro-streaming.sinks.avro-sink.hostname=10.101.3.3

spooldir-memory-avro-streaming.sinks.avro-sink.port=44445

spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel

# Use a channel which buffers events in memory

##使用內(nèi)存的方式

spooldir-memory-avro-streaming.channels.memory-channel.type = memory

# Bind the source and sink to the channel

spooldir-memory-avro-streaming.sources.spooldir-source.channels = memory-channel

spooldir-memory-avro-streaming.sinks.avro-sink.channel = memory-channel

第二步編寫streaming與flume整合的java代碼


第三步提交jar到spark上,由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &

bin/spark-submit --class com.liushun.Flume2StreamingWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

第四步 啟動flume由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &

./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streaming.conf --name spooldir-memory-avro-streaming -Dflume.root.logger=INFO,console

注意:一定不能先啟動flume,因為flume在啟動時會預(yù)先測試監(jiān)聽端口是否存在,如果存在才繼續(xù)啟動,否則就會中斷啟動,報錯。

第五步 驗證

創(chuàng)建文件移動到usr/local/datatest目錄下,觀察streaming打印的日志是否跟我們預(yù)想的一樣。


在運行程序時你可能會遇到ClassNotFound的錯誤,不用緊張,只需要根據(jù)錯誤,去你的maven倉庫種找到相應(yīng)的jar包添加到spark的jars目錄中即可。

截止目前位置:flume主動推送到Streaming的方式介紹完畢,接下來我們介紹另外一種。


這一種方案,相較于第一種推送的方式更加高可用,更加健壯

那么我們要使用這種方式,那么具體步驟是什么尼?看下面


第一步:配置flume

(1)添加jar包

(2)編寫運行的自定義的flume的agent配置文件


第二步編寫Spark Streaming應(yīng)用程序

以下是我的整合樣例,僅供參考

第一步、配置flume的agent的自定義Sink,并啟動flume,監(jiān)控一個文件夾datatest/,配置文件如下

# name the components on this agent

spooldir-memory-avro-streamingfromSink.sources = spooldir-source

spooldir-memory-avro-streamingfromSink.sinks = spark-sink

spooldir-memory-avro-streamingfromSink.channels = memory-channel

# Describe/configure the source

##注意:不能往監(jiān)控目中重復(fù)丟同名文件

## 通過spooldir來監(jiān)控文件內(nèi)容的變化

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.type = spooldir

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.spoolDir =/usr/local/datatest

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.fileHeader = true

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.deletePolicy=immediate

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.ignorePattern=^(.)*\\.out$

# Describe the sink

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.hostname=10.101.3.3

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.port=44445

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel

# Use a channel which buffers events in memory

##使用內(nèi)存的方式

spooldir-memory-avro-streamingfromSink.channels.memory-channel.type = memory

# Bind the source and sink to the channel

spooldir-memory-avro-streamingfromSink.sources.spooldir-source.channels = memory-channel

spooldir-memory-avro-streamingfromSink.sinks.spark-sink.channel = memory-channel

應(yīng)用程序編寫樣例:



第三步啟動flume的,由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &

./flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/spooldir-memory-avro-streamingfromSink.conf --name spooldir-memory-avro-streamingfromSink -Dflume.root.logger=INFO,console

第四步啟動應(yīng)用程序,由于是樣例,我都用會話方式啟動,如果想后臺啟動那就前加 nohup 后加 &

./spark-submit --class com.liushun.StreamingFromFlumeWordCnt --master yarn /usr/local/spark-2.1.1-bin-hadoop2.6/bin/SparkStreamTest-1.1-SNAPSHOT.jar

驗證結(jié)果:


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容