Source API
以下scala代碼展示了幾種source類型:
package com.example.apitest
import org.apache.flink.streaming.api.scala._
//定義樣例類
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object sourceTest {
def main(args: Array[String]): Unit = {
//創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//1.從集合中讀取數(shù)據(jù)
val dataList = List(
SensorReading("sensor_1",1547718199,35.8),
SensorReading("sensor_2",1547718201,15.8),
SensorReading("sensor_3",1547718221,25.3)
)
val stream1 = env.fromCollection(dataList)
//2.自定義數(shù)據(jù)source
val stream2 = env.fromElements(1.0,35,"hello")
//3.從文件中讀取數(shù)據(jù)
val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
val stream3 = env.readTextFile(inputPath)
stream3.print()
//執(zhí)行
env.execute("source test")
}
}
flink從kafka獲取源數(shù)據(jù)
首先pow文件需要添加如下依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>1.10.1</version>
</dependency>
以下代碼為獲取kafka數(shù)據(jù)源流計(jì)算代碼:
package com.example.apitest
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import java.util.Properties
//定義樣例類
case class SensorReading(id:String,timestamp:Long,temperature:Double)
object sourceTest {
def main(args: Array[String]): Unit = {
//創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers","192.168.2.144:9092")
//properties.setProperty("group.id","consumer-group")
val stream = env.addSource(new FlinkKafkaConsumer011[String]("firstkafka",new SimpleStringSchema(),properties))
stream.print()
//執(zhí)行
env.execute("source test")
}
}
上述代碼執(zhí)行完以后,我們需要開(kāi)啟對(duì)應(yīng)的kafka的對(duì)應(yīng)topic的生產(chǎn)者,產(chǎn)生數(shù)據(jù)。
./kafka-console-producer.sh --broker-list 192.168.2.144:9092 --topic firstkafka
Sink API
寫到文件系統(tǒng)
package com.example.apitest.sinktest
import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._
object FileSink {
def main(args: Array[String]): Unit = {
// 創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
val inputStream = env.readTextFile(inputPath)
// 轉(zhuǎn)換成樣例類
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble)
})
//寫到文件系統(tǒng)
dataStream.print()
dataStream.addSink(
StreamingFileSink.forRowFormat(
new Path("/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/out.txt"),
new SimpleStringEncoder[SensorReading]()
).build()
)
env.execute("file sink test")
}
}
寫到Kafka
package com.example.apitest.sinktest
import com.example.apitest.SensorReading
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011
object KafkaSinkTest {
def main(args: Array[String]): Unit = {
// 創(chuàng)建執(zhí)行環(huán)境
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputPath = "/Users/wenhuan/IdeaProjects/FlinkTutorial/src/main/resources/sensor.txt"
val inputStream = env.readTextFile(inputPath)
// 轉(zhuǎn)換成樣例類
val dataStream = inputStream
.map(data => {
val arr = data.split(",")
SensorReading(arr(0),arr(1).toLong,arr(2).toDouble).toString
})
//寫到文件系統(tǒng)
dataStream.print()
dataStream.addSink(
new FlinkKafkaProducer011[String]("192.168.2.144:9092","firstkafka",new SimpleStringSchema())
)
env.execute("Kafka sink test")
}
}