Flink的API

Source API

以下scala代碼展示了幾種source類型:

package com.example.apitest
import org.apache.flink.streaming.api.scala._

//定義樣例類
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object sourceTest {
  def main(args: Array[String]): Unit = {
    //創(chuàng)建執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //1.從集合中讀取數(shù)據(jù)
    val dataList = List(
      SensorReading("sensor_1",1547718199,35.8),
      SensorReading("sensor_2",1547718201,15.8),
      SensorReading("sensor_3",1547718221,25.3)
    )
    val stream1 = env.fromCollection(dataList)

    //2.自定義數(shù)據(jù)source
    val stream2 = env.fromElements(1.0,35,"hello")

    //3.從文件中讀取數(shù)據(jù)
    val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
    val stream3 = env.readTextFile(inputPath)

    stream3.print()
    //執(zhí)行
    env.execute("source test")
  }
}

flink從kafka獲取源數(shù)據(jù)

首先pow文件需要添加如下依賴:

<dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
          <version>1.10.1</version>
</dependency>

以下代碼為獲取kafka數(shù)據(jù)源流計(jì)算代碼:

package com.example.apitest
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

import java.util.Properties

//定義樣例類
case class SensorReading(id:String,timestamp:Long,temperature:Double)

object sourceTest {
  def main(args: Array[String]): Unit = {
    //創(chuàng)建執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers","192.168.2.144:9092")
    //properties.setProperty("group.id","consumer-group")
    val stream = env.addSource(new FlinkKafkaConsumer011[String]("firstkafka",new SimpleStringSchema(),properties))

    stream.print()
    //執(zhí)行
    env.execute("source test")
  }
}

上述代碼執(zhí)行完以后,我們需要開(kāi)啟對(duì)應(yīng)的kafka的對(duì)應(yīng)topic的生產(chǎn)者,產(chǎn)生數(shù)據(jù)。

./kafka-console-producer.sh --broker-list 192.168.2.144:9092 --topic firstkafka

Sink API

寫到文件系統(tǒng)

package com.example.apitest.sinktest
import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._


object FileSink {
  def main(args: Array[String]): Unit = {
    // 創(chuàng)建執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    // 轉(zhuǎn)換成樣例類
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
      })

    //寫到文件系統(tǒng)
    dataStream.print()
    dataStream.addSink(
      StreamingFileSink.forRowFormat(
        new Path("/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/out.txt"),
        new SimpleStringEncoder[SensorReading]()
      ).build()
    )
    env.execute("file sink test")
  }
}

寫到Kafka

package com.example.apitest.sinktest

import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011


object KafkaSinkTest {
  def main(args: Array[String]): Unit = {
    // 創(chuàng)建執(zhí)行環(huán)境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    // 轉(zhuǎn)換成樣例類
    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0),arr(1).toLong,arr(2).toDouble).toString
      })

    //寫到文件系統(tǒng)
    dataStream.print()
    dataStream.addSink(
      new FlinkKafkaProducer011[String]("192.168.2.144:9092","firstkafka",new SimpleStringSchema())
    )
    env.execute("Kafka sink test")

  }
}

寫到MySQL

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容