Flink API

1.Flink API介紹

Flink提供了不同的抽象級別以開發(fā)流式或者批處理應用程序


分層API
  • Stateful Stream Processing 最低級的抽象接口是狀態(tài)化的數(shù)據(jù)流接口(stateful
    streaming)。這個接口是通過 ProcessFunction 集成到 DataStream API 中的。該接口允許用戶
    自由的處理來自一個或多個流中的事件,并使用一致的容錯狀態(tài)。另外,用戶也可以通過注冊
    event time 和 processing time 處理回調(diào)函數(shù)的方法來實現(xiàn)復雜的計算
  • DataStream/DataSet API 是 Flink 提供的核心 API ,DataSet 處理
    有界的數(shù)據(jù)集,DataStream 處理有界或者無界的數(shù)據(jù)流。用戶可以通過各種方法(map /
    flatmap / window / keyby / sum / max / min / avg / join 等)將數(shù)據(jù)進行轉換 / 計算
  • Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起
    來卻更加簡潔,可以在表與 DataStream/DataSet 之間無縫切換,也允許程序將 Table API 與
    DataStream 以及 DataSet 混合使用
  • SQL Flink 提供的最高層級的抽象是 SQL 。這一層抽象在語法與表達能力上與 Table API 類似。
    SQL 抽象與 Table API 交互密切,同時 SQL 查詢可以直接在 Table API 定義的表上執(zhí)行

2.Dataflows 數(shù)據(jù)流

在Flink的世界觀中,一切都是數(shù)據(jù)流,所以對于批計算來說,那只是流計算的一個特例而已

Flink Dataflows是由三部分組成,分別是:Source、Transformation、Sink結束

  • Source負責讀取數(shù)據(jù)源
  • Transformation利用各種算子進行處理加工
  • Sink最終輸出到外部(console、kafka、redis、DB......)


    DataFlow

當source數(shù)據(jù)源的數(shù)量比較大或計算邏輯相對比較復雜的情況下,需要提高并行度來處理數(shù)據(jù),采用并行數(shù)據(jù)流

通過設置不同算子的并行度 source并行度設置為2 map也是2.... 代表會啟動多個并行的線程來處理數(shù)據(jù)


parallelized view

在運行時,F(xiàn)link上運行的程序會被映射成“邏輯數(shù)據(jù)流”(dataflows),它包含了這三部分。每一個dataflow以一個或多個sources開始以一個或多個sinks結束。dataflow類似于任意的有向無環(huán)圖(DAG)。在大部分情況下,程序中的轉換運算(transformations)跟dataflow中的算子(operator)是一一對應的關系,但有時候,一個transformation可能對應多個operator


3.Flink DataStream API

3.1 Environment

StreamExecutionEnvironment是所有Flink程序的基礎。您可以使用以下靜態(tài)方法獲得一個StreamExecutionEnvironment:

#創(chuàng)建一個執(zhí)行環(huán)境,表示當前執(zhí)行程序的上下文。 
#如果程序是獨立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;
#如果從命令行客戶端調(diào)用程序以提交到集群,則此方法返回此集群的執(zhí)行環(huán)境
#也就是說,getExecutionEnvironment會根據(jù)查詢運行的方式?jīng)Q定返回什么樣的運行環(huán)境,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。
getExecutionEnvironment()

#返回本地執(zhí)行環(huán)境 
createLocalEnvironment()

#返回集群執(zhí)行環(huán)境,將Jar提交到遠程服務器。
#需要在調(diào)用時指定JobManager的IP和端口號,并指定要在集群中運行的Jar包。
createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

3.2 Data Sources

Flink內(nèi)嵌支持的數(shù)據(jù)源非常多,比如HDFS、Socket、Kafka、Collections Flink也提供了addSource方式,可以自定義數(shù)據(jù)源,本小節(jié)將講解Flink所有內(nèi)嵌數(shù)據(jù)源及自定義數(shù)據(jù)源的原理及API


3.2.1 File-based:

通過讀取本地、HDFS文件創(chuàng)建一個數(shù)據(jù)源

env.readTextFile(path) 
env.readTextFile(path)
env.readFile(fileInputFormat, path, watchType, interval, pathFilter)

readTextFile底層調(diào)用的就是readFile方法,readFile是一個更加底層的方式,使用起來會更加的靈活

3.2.2 Socket-based:

接受Socket Server中的數(shù)據(jù)

env.socketTextStream("node09",8888)

3.2.3 Collection-based:

env.fromCollection(Seq)
env.fromCollection(Iterator)
env.fromElements(elements: _*)
env.fromParallelCollection(SplittableIterator)
env.generateSequence(from, to)

3.2.4 Kafka Source

Flink的Kafka 消費類FlinkKafkaConsumer 這個是通用的kafka連接器適用0.10 以上的版本(或FlinkKafkaConsumer011 對應的是Kafka 0.11.x或FlinkKafkaConsumer010 對應Kafka 0.10.x)
kafka 作為source的例子:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));

上面使用Flink已經(jīng)定義好的反序列化shema SimpleStringSchema 但是返回的結果只有Kafka的value,而沒有其它信息
如果需要獲得Kafka的消息的key、value 和元數(shù)據(jù),就需要通過實現(xiàn)KafkaDeserializationSchema接口方法deserialize 來實現(xiàn)

flink 對kafka的連接比較重要,后面會專門研究kafka connector

3.2.5 Custom Source

我們可以通過實現(xiàn) Flink 的SourceFunction 來實現(xiàn)單個或者多個并行度的 Source。具體調(diào)用如下:

val stream = env.addSource( new MySensorSource() )

我們希望可以隨機生成傳感器數(shù)據(jù),MySensorSource具體的代碼實現(xiàn)如下:

class MySensorSource extends SourceFunction[SensorReading]{

    // flag: 表示數(shù)據(jù)源是否還在正常運行
    var running: Boolean = true

    override def cancel(): Unit = {
        running = false
    }

    override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit = {
        // 初始化一個隨機數(shù)發(fā)生器
        val rand = new Random()

        var curTemp = 1.to(10).map(
            i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )
        )

        while(running){
        // 更新溫度值
        curTemp = curTemp.map(
            t => (t._1, t._2 + rand.nextGaussian() )
        )
        // 獲取當前時間戳
        val curTime = System.currentTimeMillis()

        curTemp.foreach(
            t => ctx.collect(SensorReading(t._1, curTime, t._2))
        )
        Thread.sleep(100)
        }
    }
}

3.3 Transformations

Transformations算子可以將一個或者多個算子轉換成一個新的數(shù)據(jù)流,使用Transformations算子組合可以進行復雜的業(yè)務處理


3.3.1 簡單的操作 DataStream


① Map

DataStream → DataStream

遍歷數(shù)據(jù)流中的每一個元素,產(chǎn)生一個新的元素

② FlatMap

DataStream → DataStream

遍歷數(shù)據(jù)流中的每一個元素,產(chǎn)生N個元素 N=0,1,2,......

③ Filter

DataStream → DataStream

過濾算子,根據(jù)數(shù)據(jù)流的元素計算出一個boolean類型的值,true代表保留,false代表過濾掉


3.3.2 分組流的操作 KeyedStream


① KeyBy

DataStream → KeyedStream

根據(jù)數(shù)據(jù)流中指定的字段來分區(qū),相同指定字段值的數(shù)據(jù)一定是在同一個分區(qū)中,內(nèi)部分區(qū)使用的是HashPartitioner

指定分區(qū)字段的方式有三種:
1、根據(jù)索引號或者field 指定 (最新版本已經(jīng)不推薦使用這個方法指定分區(qū))
2、通過匿名函數(shù)來指定
3、通過實現(xiàn)KeySelector接口 指定分區(qū)字段

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.generateSequence(1, 100)
    stream
      .map(x => (x % 3, 1))
      //根據(jù)索引號來指定分區(qū)字段
      //      .keyBy(0)
      //通過傳入匿名函數(shù) 指定分區(qū)字段
      //      .keyBy(x=>x._1)
      //通過實現(xiàn)KeySelector接口  指定分區(qū)字段
      .keyBy(new KeySelector[(Long, Int), Long] {
      override def getKey(value: (Long, Int)): Long = value._1
    })
      .sum(1)
      .print()
    env.execute()

② Reduce

根據(jù)key聚合結果

注意: reduce是基于分區(qū)后的流對象進行聚合,也就是說,DataStream類型的對象無法調(diào)用reduce方法

.reduce((v1,v2) => (v1._1,v1._2 + v2._2))

④Fold

一個有初始值的分組數(shù)據(jù)流的滾動折疊操作. 合并當前元素和前一次折疊操作的結果,并產(chǎn)生一個新的值.

下面的fold函數(shù)就是當我們輸入一個 (1,2,3,4,5)的序列, 將會產(chǎn)生一下面的句子:"start-1", "start-1-2", "start-1-2-3", ...

val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i })

⑤ Aggregations

KeyedStream → DataStream

Aggregations代表的是一類聚合算子,具體算子如下:

keyedStream.sum(0)
keyedStream.sum("key")
keyedStream.min(0)
keyedStream.min("key")
keyedStream.max(0)
keyedStream.max("key")
keyedStream.minBy(0)
keyedStream.minBy("key")
keyedStream.maxBy(0)
keyedStream.maxBy("key")

3.2.3 連接算子


① Union

DataStream → DataStream

合并兩個或者更多的數(shù)據(jù)流產(chǎn)生一個新的數(shù)據(jù)流,這個新的數(shù)據(jù)流中包含了所合并的數(shù)據(jù)流的元素

注意:需要保證數(shù)據(jù)流中元素類型一致

val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds1 = env.fromCollection(List(("a",1),("b",2),("c",3)))
    val ds2 = env.fromCollection(List(("d",4),("e",5),("f",6)))
    val ds3 = env.fromCollection(List(("g",7),("h",8)))
//    val ds3 = env.fromCollection(List((1,1),(2,2)))
    val unionStream = ds1.union(ds2,ds3)
    unionStream.print()
    env.execute()

② Connect

DataStream,DataStream → ConnectedStreams

合并兩個數(shù)據(jù)流并且保留兩個數(shù)據(jù)流的數(shù)據(jù)類型,能夠共享兩個流的狀態(tài)

val ds1 = env.socketTextStream("node01", 8888)
val ds2 = env.socketTextStream("node01", 9999)
val wcStream1 = ds1.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
val wcStream2 = ds2.flatMap(_.split(" ")).map((_, 1)).keyBy(0).sum(1)
val restStream: ConnectedStreams[(String, Int), (String, Int)] = wcStream2.connect(wcStream1)

ConnectedStreams 可以通過下面的算子來操作最后輸出DataStream,具體看ConnectedStreams


ConnectedStreams

Connect與 Union 區(qū)別:

  1. Union之前兩個流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調(diào)整成為一樣的。
  2. Connect只能操作兩個流,Union可以操作多個。

3.2.4 連接流的操作算子 ConnectedStreams

① CoMap, CoFlatMap

ConnectedStreams → DataStream
CoMap, CoFlatMap并不是具體算子名字,而是一類操作名稱

凡是基于ConnectedStreams數(shù)據(jù)流做map遍歷,這類操作叫做CoMap
凡是基于ConnectedStreams數(shù)據(jù)流做flatMap遍歷,這類操作叫做CoFlatMap

CoMap第一種實現(xiàn)方式:

connectedStream.map(new CoMapFunction[(String,Int),(String,Int),(String,Int)] {
      //對第一個數(shù)據(jù)流做計算
      override def map1(value: (String, Int)): (String, Int) = {
        (value._1+":first",value._2+100)
      }
      //對第二個數(shù)據(jù)流做計算
      override def map2(value: (String, Int)): (String, Int) = {
        (value._1+":second",value._2*100)
      }
    }).print()

CoMap第二種實現(xiàn)方式:

connectedStream.map(
      //對第一個數(shù)據(jù)流做計算
      x=>{(x._1+":first",x._2+100)}
      //對第二個數(shù)據(jù)流做計算
      ,y=>{(y._1+":second",y._2*100)}
    ).print()

CoFlatMap第一種實現(xiàn)方式:

ds1.connect(ds2).flatMap((x,c:Collector[String])=>{
      //對第一個數(shù)據(jù)流做計算
      x.split(" ").foreach(w=>{
        c.collect(w)
      })

    }
      //對第二個數(shù)據(jù)流做計算
      ,(y,c:Collector[String])=>{
      y.split(" ").foreach(d=>{
        c.collect(d)
      })
    }).print

CoFlatMap第二種實現(xiàn)方式:

 ds1.connect(ds2).flatMap(
      //對第一個數(shù)據(jù)流做計算
      x=>{
      x.split(" ")
    }
      //對第二個數(shù)據(jù)流做計算
      ,y=>{
        y.split(" ")
      }).print()

CoFlatMap第三種實現(xiàn)方式:

ds1.connect(ds2).flatMap(new CoFlatMapFunction[String,String,(String,Int)] {
    //對第一個數(shù)據(jù)流做計算 
    override def flatMap1(value: String, out: Collector[(String, Int)]): Unit = {
        val words = value.split(" ")
        words.foreach(x=>{
          out.collect((x,1))
        })
      }

    //對第二個數(shù)據(jù)流做計算
    override def flatMap2(value: String, out: Collector[(String, Int)]): Unit = {
        val words = value.split(" ")
        words.foreach(x=>{
          out.collect((x,1))
        })
      }
    }).print()

3.2.5 拆分流


① Split

DataStream → SplitStream

根據(jù)條件將一個流分成兩個或者更多的流

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,100)
val splitStream = stream.split(
    d => {
        d % 2 match {
            case 0 => List("even")
            case 1 => List("odd")
        }
    }
)
splitStream.select("even").print()
env.execute()
@deprecated Please use side output instead

② Select

SplitStream → DataStream

從SplitStream中選擇一個或者多個數(shù)據(jù)流

splitStream.select("even").print()

③ side output側輸出流

流計算過程,可能遇到根據(jù)不同的條件來分隔數(shù)據(jù)流。filter分割造成不必要的數(shù)據(jù)復制

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    val gtTag = new OutputTag[String]("gt")
    val processStream = stream.process(new ProcessFunction[String, String] {
      override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
        try {
          val longVar = value.toLong
          if (longVar > 100) {
            out.collect(value)
          } else {
            ctx.output(gtTag, value)
          }
        } catch {
          case e => e.getMessage
            ctx.output(gtTag, value)
        }
      }
    })
    val sideStream = processStream.getSideOutput(gtTag)
    sideStream.print("sideStream")
    processStream.print("mainStream")
    env.execute()

3.2.6 Iterate 迭代算子

DataStream → IterativeStream → DataStream

Iterate算子提供了對數(shù)據(jù)流迭代的支持,對于定義不斷更新模型的算法特別有用

迭代由兩部分組成:迭代體、終止迭代條件

不滿足終止迭代條件的數(shù)據(jù)流會返回到stream流中,進行下一次迭代

滿足終止迭代條件的數(shù)據(jù)流繼續(xù)往下游發(fā)送

val env = StreamExecutionEnvironment.getExecutionEnvironment
val initStream = env.socketTextStream("node01",8888)
val stream = initStream.map(_.toLong)
stream.iterate {
    iteration => {
        //定義迭代邏輯
        val iterationBody = iteration.map ( x => {
            println(x)
            if(x > 0) x - 1
            else x
        } )
        //> 0  大于0的值繼續(xù)返回到stream流中,當 <= 0 繼續(xù)往下游發(fā)送
        (iterationBody.filter(_ > 0), iterationBody.filter(_ <= 0))
    }
}.print()
env.execute()

3.2.7 Physical partitioning 分區(qū)算子 分區(qū)策略


① shuffle

場景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜

DataStream → DataStream

分區(qū)元素隨機均勻分發(fā)到下游分區(qū),網(wǎng)絡開銷比較大

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(1)
println(stream.getParallelism)
stream.shuffle.print()
env.execute()

console result: 上游數(shù)據(jù)比較隨意的分發(fā)到下游

② rebalance

場景:增大分區(qū)、提高并行度,解決數(shù)據(jù)傾斜

DataStream → DataStream

輪詢分區(qū)元素,均勻的將元素分發(fā)到下游分區(qū),下游每個分區(qū)的數(shù)據(jù)比較均勻,在發(fā)生數(shù)據(jù)傾斜時非常有用,網(wǎng)絡開銷比較大

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(3)
val stream = env.generateSequence(1,100)
val shuffleStream = stream.rebalance
shuffleStream.print()
env.execute()

console result:上游數(shù)據(jù)比較均勻的分發(fā)到下游

③ rescale

場景:減少分區(qū) 防止發(fā)生大量的網(wǎng)絡傳輸 不會發(fā)生全量的重分區(qū)

DataStream → DataStream

通過輪詢分區(qū)元素,將一個元素集合從上游分區(qū)發(fā)送給下游分區(qū),發(fā)送單位是集合,而不是一個個元素

注意:rescale發(fā)生的是本地數(shù)據(jù)傳輸,而不需要通過網(wǎng)絡傳輸數(shù)據(jù),比如taskmanager的槽數(shù)。簡單來說,上游的數(shù)據(jù)只會發(fā)送給本TaskManager中的下游

image.png
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.rescale.writeAsText("./data/stream2").setParallelism(4)
env.execute()

④ broadcast

場景:需要使用映射表、并且映射表會經(jīng)常發(fā)生變動的場景

DataStream → DataStream

上游中每一個元素內(nèi)容廣播到下游每一個分區(qū)中

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.broadcast.writeAsText("./data/stream2").setParallelism(4)
env.execute()

⑤ global

場景:并行度降為1

DataStream → DataStream

上游分區(qū)的數(shù)據(jù)只分發(fā)給下游的第一個分區(qū)

val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.global.writeAsText("./data/stream2").setParallelism(4)
env.execute()

⑥ forward

場景:一對一的數(shù)據(jù)分發(fā),map、flatMap、filter 等都是這種分區(qū)策略

DataStream → DataStream

上游分區(qū)數(shù)據(jù)分發(fā)到下游對應分區(qū)中

partition1->partition1

partition2->partition2

注意:必須保證上下游分區(qū)數(shù)(并行度)一致,不然會有如下異常:

Forward partitioning does not allow change of parallelism
* Upstream operation: Source: Sequence Source-1 parallelism: 2,
* downstream operation: Sink: Unnamed-4 parallelism: 4
* stream.forward.writeAsText("./data/stream2").setParallelism(4)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.forward.writeAsText("./data/stream2").setParallelism(2)
env.execute()

⑦ keyBy

場景:與業(yè)務場景匹配

DataStream → DataStream

根據(jù)上游分區(qū)元素的Hash值與下游分區(qū)數(shù)取模計算出,將當前元素分發(fā)到下游哪一個分區(qū)

MathUtils.murmurHash(keyHash)(每個元素的Hash值) % maxParallelism(下游分區(qū)數(shù))
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env.generateSequence(1,10).setParallelism(2)
stream.writeAsText("./data/stream1").setParallelism(2)
stream.keyBy(0).writeAsText("./data/stream2").setParallelism(2)
env.execute()

根據(jù)元素Hash值分發(fā)到下游分區(qū)中

⑧ PartitionCustom 自定義分區(qū)

DataStream → DataStream

通過自定義的分區(qū)器,來決定元素是如何從上游分區(qū)分發(fā)到下游分區(qū)

object ShuffleOperator {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(2)
    val stream = env.generateSequence(1,10).map((_,1))
    stream.writeAsText("./data/stream1")
    stream.partitionCustom(new customPartitioner(),0)
      .writeAsText("./data/stream2").setParallelism(4)
    env.execute()
  }
  class customPartitioner extends Partitioner[Long]{
    override def partition(key: Long, numPartitions: Int): Int = {
      key.toInt % numPartitions
    }
  }
}

3.4 Sink

Flink內(nèi)置了大量sink,可以將Flink處理后的數(shù)據(jù)輸出到HDFS、kafka、Redis、ES、MySQL等等。除此以外,需要用戶自定義實現(xiàn)sink。
工程場景中,會經(jīng)常消費kafka中數(shù)據(jù),處理結果存儲到Redis、HBase或者MySQL中


3.4.1 redis Sink

Flink處理的數(shù)據(jù)可以存儲到Redis中,以便實時查詢
Flink內(nèi)嵌連接Redis的連接器,只需要導入連接Redis的依賴就可以

        <dependency>
            <groupId>org.apache.bahir</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.0</version>
        </dependency>

WordCount寫入到Redis中,選擇的是HSET數(shù)據(jù)類型

代碼如下:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    val result = stream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    //若redis是單機
    val config = new FlinkJedisPoolConfig.Builder().setDatabase(3).setHost("node01").setPort(6379).build()
    //如果是 redis集群
    /*val addresses = new util.HashSet[InetSocketAddress]()
    addresses.add(new InetSocketAddress("node01",6379))
    addresses.add(new InetSocketAddress("node01",6379))
   val clusterConfig = new FlinkJedisClusterConfig.Builder().setNodes(addresses).build()*/

    result.addSink(new RedisSink[(String,Int)](config,new RedisMapper[(String,Int)] {

      override def getCommandDescription: RedisCommandDescription = {
        new RedisCommandDescription(RedisCommand.HSET,"wc")
      }

      override def getKeyFromData(t: (String, Int))  = {
        t._1
      }

      override def getValueFromData(t: (String, Int))  = {
        t._2 + ""
      }
    }))
    env.execute()

3.4.2 Kafka Sink

處理結果寫入到kafka topic中,F(xiàn)link也是默認支持,需要添加連接器依賴,跟讀取kafka數(shù)據(jù)用的連接器依賴相同
跟讀取kafka數(shù)據(jù)用的連接器依賴相同

之前添加過就不需要再次添加了

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
import java.lang
import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

object KafkaSink {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    val result = stream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    val props = new Properties()
    props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
//    props.setProperty("key.serializer",classOf[StringSerializer].getName)
//    props.setProperty("value.serializer",classOf[StringSerializer].getName)


    /**
    public FlinkKafkaProducer(
     FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
      */
    result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
      override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
      }
    },props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))

    env.execute()
  }
}
import java.lang
import java.util.Properties

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer

object KafkaSink {
  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env.socketTextStream("node01",8888)
    val result = stream.flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    val props = new Properties()
    props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
//    props.setProperty("key.serializer",classOf[StringSerializer].getName)
//    props.setProperty("value.serializer",classOf[StringSerializer].getName)


    /**
    public FlinkKafkaProducer(
     FlinkKafkaProducer(defaultTopic: String, serializationSchema: KafkaSerializationSchema[IN], producerConfig: Properties, semantic: FlinkKafkaProducer.Semantic)
      */
    result.addSink(new FlinkKafkaProducer[(String,Int)]("wc",new KafkaSerializationSchema[(String, Int)] {
      override def serialize(element: (String, Int), timestamp: lang.Long): ProducerRecord[Array[Byte], Array[Byte]] = {
        new ProducerRecord("wc",element._1.getBytes(),(element._2+"").getBytes())
      }
    },props,FlinkKafkaProducer.Semantic.EXACTLY_ONCE))

    env.execute()
  }
}

3.4.3 MySQL Sink(冪等性)

Flink處理結果寫入到MySQL中,這并不是Flink默認支持的,需要添加MySQL的驅動依賴

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.44</version>
        </dependency>

因為不是內(nèi)嵌支持的,所以需要基于RichSinkFunction自定義sink

消費kafka中數(shù)據(jù),統(tǒng)計各個卡口的流量,并且存入到MySQL中

注意:需要去重,操作MySQL需要冪等性

import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer

object MySQLSink {

  case class CarInfo(monitorId: String, carId: String, eventTime: String, Speed: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //設置連接kafka的配置信息
    val props = new Properties()
    props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id", "flink-kafka-001")
    props.setProperty("key.deserializer", classOf[StringSerializer].getName)
    props.setProperty("value.deserializer", classOf[StringSerializer].getName)

    //第一個參數(shù) : 消費的topic名
    val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
      //什么時候停止,停止條件是什么
      override def isEndOfStream(t: (String, String)): Boolean = false

      //要進行序列化的字節(jié)流
      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
        val key = new String(consumerRecord.key(), "UTF-8")
        val value = new String(consumerRecord.value(), "UTF-8")
        (key, value)
      }

      //指定一下返回的數(shù)據(jù)類型  Flink提供的類型
      override def getProducedType: TypeInformation[(String, String)] = {
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
      }
    }, props))

    stream.map(data => {
      val value = data._2
      val splits = value.split("\t")
      val monitorId = splits(0)
      (monitorId, 1)
    }).keyBy(_._1)
      .reduce(new ReduceFunction[(String, Int)] {
        //t1:上次聚合完的結果  t2:當前的數(shù)據(jù)
        override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
          (t1._1, t1._2 + t2._2)
        }
      }).addSink(new MySQLCustomSink)

    env.execute()
  }

  //冪等性寫入外部數(shù)據(jù)庫MySQL
  class MySQLCustomSink extends RichSinkFunction[(String, Int)] {
    var conn: Connection = _
    var insertPst: PreparedStatement = _
    var updatePst: PreparedStatement = _

    //每來一個元素都會調(diào)用一次
    override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
      println(value)
      updatePst.setInt(1, value._2)
      updatePst.setString(2, value._1)
      updatePst.execute()
      println(updatePst.getUpdateCount)
      if(updatePst.getUpdateCount == 0){
        println("insert")
        insertPst.setString(1, value._1)
        insertPst.setInt(2, value._2)
        insertPst.execute()
      }
    }

    //thread初始化的時候執(zhí)行一次
    override def open(parameters: Configuration): Unit = {
      conn = DriverManager.getConnection("jdbc:mysql://node01:3306/test", "root", "123123")
      insertPst = conn.prepareStatement("INSERT INTO car_flow(monitorId,count) VALUES(?,?)")
      updatePst = conn.prepareStatement("UPDATE car_flow SET count = ? WHERE monitorId = ?")
    }

    //thread關閉的時候 執(zhí)行一次
    override def close(): Unit = {
      insertPst.close()
      updatePst.close()
      conn.close()
    }
  }

}

3.4.4 Socket Sink

Flink處理結果發(fā)送到套接字(Socket)

基于RichSinkFunction自定義sink

import java.io.PrintStream
import java.net.{InetAddress, Socket}
import java.util.Properties

import org.apache.flink.api.common.functions.ReduceFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTuple2TypeInformation, createTypeInformation}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringSerializer

//sink 到 套接字 socket
object SocketSink {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //設置連接kafka的配置信息
    val props = new Properties()
    //注意   sparkstreaming + kafka(0.10之前版本) receiver模式  zookeeper url(元數(shù)據(jù))
    props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id", "flink-kafka-001")
    props.setProperty("key.deserializer", classOf[StringSerializer].getName)
    props.setProperty("value.deserializer", classOf[StringSerializer].getName)

    //第一個參數(shù) : 消費的topic名
    val stream = env.addSource(new FlinkKafkaConsumer[(String, String)]("flink-kafka", new KafkaDeserializationSchema[(String, String)] {
      //什么時候停止,停止條件是什么
      override def isEndOfStream(t: (String, String)): Boolean = false

      //要進行序列化的字節(jié)流
      override def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): (String, String) = {
        val key = new String(consumerRecord.key(), "UTF-8")
        val value = new String(consumerRecord.value(), "UTF-8")
        (key, value)
      }

      //指定一下返回的數(shù)據(jù)類型  Flink提供的類型
      override def getProducedType: TypeInformation[(String, String)] = {
        createTuple2TypeInformation(createTypeInformation[String], createTypeInformation[String])
      }
    }, props))

    stream.map(data => {
      val value = data._2
      val splits = value.split("\t")
      val monitorId = splits(0)
      (monitorId, 1)
    }).keyBy(_._1)
      .reduce(new ReduceFunction[(String, Int)] {
        //t1:上次聚合完的結果  t2:當前的數(shù)據(jù)
        override def reduce(t1: (String, Int), t2: (String, Int)): (String, Int) = {
          (t1._1, t1._2 + t2._2)
        }
      }).addSink(new SocketCustomSink("node01",8888))

    env.execute()
  }

  class SocketCustomSink(host:String,port:Int) extends RichSinkFunction[(String,Int)]{
    var socket: Socket  = _
    var writer:PrintStream = _

    override def open(parameters: Configuration): Unit = {
      socket = new Socket(InetAddress.getByName(host), port)
      writer = new PrintStream(socket.getOutputStream)
    }

    override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
      writer.println(value._1 + "\t" +value._2)
      writer.flush()
    }

    override def close(): Unit = {
      writer.close()
      socket.close()
    }
  }
}

3.4.5 HBase Sink

計算結果寫入sink 兩種實現(xiàn)方式:

  1. map算子寫入 頻繁創(chuàng)建hbase連接
  2. process寫入 適合批量寫入hbase

導入HBase依賴包

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>

讀取kafka數(shù)據(jù),統(tǒng)計卡口流量保存至HBase數(shù)據(jù)庫中

  1. HBase中創(chuàng)建對應的表
create 'car_flow',{NAME => 'count', VERSIONS => 1}
  1. 實現(xiàn)代碼
import java.util.{Date, Properties}

import com.msb.stream.util.{DateUtils, HBaseUtil}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.common.serialization.StringSerializer


object HBaseSinkTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //設置連接kafka的配置信息
    val props = new Properties()
    //注意   sparkstreaming + kafka(0.10之前版本) receiver模式  zookeeper url(元數(shù)據(jù))
    props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
    props.setProperty("group.id", "flink-kafka-001")
    props.setProperty("key.deserializer", classOf[StringSerializer].getName)
    props.setProperty("value.deserializer", classOf[StringSerializer].getName)

    val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props))


    stream.map(row => {
      val arr = row.split("\t")
      (arr(0), 1)
    }).keyBy(_._1)
      .reduce((v1: (String, Int), v2: (String, Int)) => {
        (v1._1, v1._2 + v2._2)
      }).process(new ProcessFunction[(String, Int), (String, Int)] {

      var htab: HTable = _

      override def open(parameters: Configuration): Unit = {
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.quorum", "node01:2181,node02:2181,node03:2181")
        val hbaseName = "car_flow"
        htab = new HTable(conf, hbaseName)
      }

      override def close(): Unit = {
        htab.close()
      }

      override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), (String, Int)]#Context, out: Collector[(String, Int)]): Unit = {
        // rowkey:monitorid   時間戳(分鐘) value:車流量
        val min = DateUtils.getMin(new Date())
        val put = new Put(Bytes.toBytes(value._1))
        put.addColumn(Bytes.toBytes("count"), Bytes.toBytes(min), Bytes.toBytes(value._2))
        htab.put(put)
      }
    })
    env.execute()
  }
}

3.4.6 Elasticsearch Sink

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
    <version>${flink-version}</version>
</dependency>

在主函數(shù)中調(diào)用:

val httpHosts = new util.ArrayList[HttpHost]()
httpHosts.add(new HttpHost("localhost", 9200))

val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts, new ElasticsearchSinkFunction[SensorReading] {
  override def process(t: SensorReading, runtimeContext: RuntimeContext, requestIndexer: RequestIndexer): Unit = {
    println("saving data: " + t)
    val json = new util.HashMap[String, String]()
    json.put("data", t.toString)
    val indexRequest = Requests.indexRequest().index("sensor").`type`("readingData").source(json)
    requestIndexer.add(indexRequest)
    println("saved successfully")
  }
} )
dataStream.addSink( esSinkBuilder.build() )

3.5 支持的數(shù)據(jù)類型

Flink流應用程序處理的是以數(shù)據(jù)對象表示的事件流。所以在Flink內(nèi)部,我們需要能夠處理這些對象。它們需要被序列化和反序列化,以便通過網(wǎng)絡傳送它們;或者從狀態(tài)后端、檢查點和保存點讀取它們。為了有效地做到這一點,F(xiàn)link需要明確知道應用程序所處理的數(shù)據(jù)類型。Flink使用類型信息的概念來表示數(shù)據(jù)類型,并為每個數(shù)據(jù)類型生成特定的序列化器、反序列化器和比較器。
Flink還具有一個類型提取系統(tǒng),該系統(tǒng)分析函數(shù)的輸入和返回類型,以自動獲取類型信息,從而獲得序列化器和反序列化器。但是,在某些情況下,例如lambda函數(shù)或泛型類型,需要顯式地提供類型信息,才能使應用程序正常工作或提高其性能。
Flink支持Java和Scala中所有常見數(shù)據(jù)類型。使用最廣泛的類型有以下幾種。

3.5.1 基礎數(shù)據(jù)類型

Flink支持所有的Java和Scala基礎數(shù)據(jù)類型,Int, Double, Long, String, …?

val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)
numbers.map( n => n + 1 )

3.5.2 Java和Scala元組(Tuples)

val persons: DataStream[(String, Integer)] = env.fromElements( 
("Adam", 17), 
("Sarah", 23) ) 
persons.filter(p => p._2 > 18)

3.5.3 Scala樣例類(case classes)

case class Person(name: String, age: Int) 
val persons: DataStream[Person] = env.fromElements(
Person("Adam", 17), 
Person("Sarah", 23) )
persons.filter(p => p.age > 18)

3.5.4 Java簡單對象(POJOs)

public class Person {
public String name;
public int age;
  public Person() {}
  public Person(String name, int age) { 
this.name = name;      
this.age = age;  
}
}
DataStream<Person> persons = env.fromElements(   
new Person("Alex", 42),   
new Person("Wendy", 23));

3.5.5 其它(Arrays, Lists, Maps, Enums, 等等)

Flink對Java和Scala中的一些特殊目的的類型也都是支持的,比如Java的ArrayList,HashMap,Enum等等。

3.6 實現(xiàn)UDF函數(shù)——更細粒度的控制流

3.6.1 函數(shù)類(Function Classes)

Flink暴露了所有udf函數(shù)的接口(實現(xiàn)方式為接口或者抽象類)。例如MapFunction, FilterFunction, ProcessFunction等等。
下面例子實現(xiàn)了FilterFunction接口:

class FilterFilter extends FilterFunction[String] {
      override def filter(value: String): Boolean = {
        value.contains("flink")
      }
}
val flinkTweets = tweets.filter(new FlinkFilter)

還可以將函數(shù)實現(xiàn)成匿名類

val flinkTweets = tweets.filter(
new RichFilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains("flink")
}
}
)

我們filter的字符串"flink"還可以當作參數(shù)傳進去。

val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(new KeywordFilter("flink"))

class KeywordFilter(keyWord: String) extends FilterFunction[String] {
override def filter(value: String): Boolean = {
value.contains(keyWord)
}
}

3.6.2 匿名函數(shù)(Lambda Functions)

val tweets: DataStream[String] = ...
val flinkTweets = tweets.filter(_.contains("flink"))

3.6.3 富函數(shù)(Rich Functions)

“富函數(shù)”是DataStream API提供的一個函數(shù)類的接口,所有Flink函數(shù)類都有其Rich版本。它與常規(guī)函數(shù)的不同在于,可以獲取運行環(huán)境的上下文,并擁有一些生命周期方法,所以可以實現(xiàn)更復雜的功能。

  • RichMapFunction
  • RichFlatMapFunction
  • RichFilterFunction
  • …?
    Rich Function有一個生命周期的概念。典型的生命周期方法有:
  • open()方法是rich function的初始化方法,當一個算子例如map或者filter被調(diào)用之前open()會被調(diào)用。
  • close()方法是生命周期中的最后一個調(diào)用的方法,做一些清理工作。
  • getRuntimeContext()方法提供了函數(shù)的RuntimeContext的一些信息,例如函數(shù)執(zhí)行的并行度,任務的名字,以及state狀態(tài)
class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {
var subTaskIndex = 0

override def open(configuration: Configuration): Unit = {
subTaskIndex = getRuntimeContext.getIndexOfThisSubtask
// 以下可以做一些初始化工作,例如建立一個和HDFS的連接
}

override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {
if (in % 2 == subTaskIndex) {
out.collect((subTaskIndex, in))
}
}

override def close(): Unit = {
// 以下做一些清理工作,例如斷開和HDFS的連接。
}
}

參考flink 官網(wǎng) Flink DataStream API
參考flink 官網(wǎng) Apache Kafka Connector 1.11
參考flink 官網(wǎng) Apache Kafka Connector1.10
參考flink 官網(wǎng) Operators
參考flink 官網(wǎng) Connectors

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容