Flume Pull方式整合Spark-Streaming

Approach: Pull-based Approach using a Custom Sink

Flume的sink不直接連接Spark組件,而是存到一個(gè)Customer sink中存在buffer中
Spark Streaming進(jìn)行分批次拉取數(shù)據(jù)。每一次操作只有當(dāng)數(shù)據(jù)到達(dá)并且以副本的形式復(fù)制成功以后才算成功,因此該方式提高了容錯(cuò)性。
Flume配置文件 flume_pull_streaming.conf

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel

該配置中指定了agent的sink類型為org.apache.spark.streaming.flume.sink.SparkSink
并且指定了該sink對(duì)應(yīng)的地址和端口

SparkStreaming 端代碼:

object flumePull {
  def main(args: Array[String]): Unit = {
    if(args.length != 2){
      System.err.println("Usage:flumePull <hostname> <port>")
      System.exit(1)
    }
    val conf: SparkConf = new SparkConf().setAppName("flumePull").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(3))

    val flumeEvent: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(ssc,args(0),args(1).toInt)
    //將SparkFlumeEvent轉(zhuǎn)換為String
    val lines: DStream[String] = flumeEvent.map(fe => new String(fe.event.getBody.array()).trim)
    val res: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

其中指定的地址和端口號(hào)是SparkSink對(duì)應(yīng)的地址和端口號(hào)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容