Flink流處理API

一、Environment

1.getExecutionEnvironment

創(chuàng)建一個(gè)執(zhí)行環(huán)境,表示當(dāng)前執(zhí)行程序的上下文。 如果程序是獨(dú)立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;如果從命令行客戶端調(diào)用程序以提交到集群,則此方法返回此集群的執(zhí)行環(huán)境,也就是說,getExecutionEnvironment會(huì)根據(jù)查詢運(yùn)行的方式?jīng)Q定返回什么樣的運(yùn)行環(huán)境,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

2.createLocalEnvironment

返回本地執(zhí)行環(huán)境,需要在調(diào)用時(shí)指定默認(rèn)的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

3.createRemoteEnvironment

返回集群執(zhí)行環(huán)境,將Jar提交到遠(yuǎn)程服務(wù)器。需要在調(diào)用時(shí)指定JobManager的IP和端口號(hào),并指定要在集群中運(yùn)行的Jar包。

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanager-hostname", 6123,"C://jar//flink//wordcount.jar")

二、Source

創(chuàng)建一個(gè)kafka的工具類

object MyKafkaUtil {
  val prop = new Properties()

  prop.setProperty("bootstrap.servers","hadoop1:9092")
  prop.setProperty("group.id","test")

  def getConsumer(topic:String ):FlinkKafkaConsumer011[String]= {
     val myKafkaConsumer:FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), prop)
     myKafkaConsumer
  }
}

消費(fèi)kafka

object StartupApp {
def main(args: Array[String]): Unit = {
       val environment: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
       val kafkaConsumer  =MyKafkaUtil.getConsumer("GMALL_STARTUP")
       val dstream: DataStream[String] = environment.addSource(kafkaConsumer)
       dstream.print()
       environment.execute()
  }
}

Exactly-once two-phase commit
Flink通過checkpoint來保存數(shù)據(jù)是否處理完成的狀態(tài)
由JobManager協(xié)調(diào)各個(gè)TaskManager進(jìn)行checkpoint存儲(chǔ),checkpoint保存在 StateBackend中,默認(rèn)StateBackend是內(nèi)存級(jí)的,也可以改為文件級(jí)的進(jìn)行持久化保存。
執(zhí)行過程實(shí)際上是一個(gè)兩段式提交,每個(gè)算子執(zhí)行完成,會(huì)進(jìn)行“預(yù)提交”,直到執(zhí)行完sink操作,會(huì)發(fā)起“確認(rèn)提交”,如果執(zhí)行失敗,預(yù)提交會(huì)放棄掉。
如果宕機(jī)需要通過StateBackend進(jìn)行恢復(fù),只能恢復(fù)所有確認(rèn)提交的操作。

三、Transform

1.KeyBy和Reduce

spark中的reduceByKey在Flink中被分成兩個(gè)算子:KeyBy和Reduce
KeyBy:
DataStream → KeyedStream:輸入必須是Tuple類型,邏輯地將一個(gè)流拆分成不相交的分區(qū),每個(gè)分區(qū)包含具有相同key的元素,在內(nèi)部以hash的形式實(shí)現(xiàn)的,KeyedStream是有狀態(tài)的。
Reduce:
KeyedStream → DataStream:一個(gè)分組數(shù)據(jù)流的聚合操作,合并當(dāng)前的元素和上次聚合的結(jié)果,產(chǎn)生一個(gè)新的值,返回的流中包含每一次聚合的結(jié)果,而不是只返回最后一次聚合的最終結(jié)果。

2.Split 和 Select

Split:
類似于Flume中的選擇器,在一個(gè)DataStream頭部加上不同的戳拆分成多個(gè)DataStream。


Split.png

Select:
在splitStream中獲取一個(gè)或多個(gè)DataStream。


Select.png
val splitStream: SplitStream[StartUpLog] = startUplogDstream.split { startUplog =>
  var flags:List[String] =  null
  if ("appstore" == startUplog.ch) {
    flags = List(startUplog.ch)
  } else {
    flags = List("other" )
  }
  flags
}
val appStoreStream: DataStream[StartUpLog] = splitStream.select("appstore")
val otherStream: DataStream[StartUpLog] = splitStream.select("other")

3.Connect和 CoMap

Connect:
連接兩個(gè)數(shù)據(jù)流


Connect.png

CoMap:


CoMap.png
val connStream: ConnectedStreams[StartUpLog, StartUpLog] = appStoreStream.connect(otherStream)
val allStream: DataStream[String] = connStream.map(
  (log1: StartUpLog) => log1.ch,
  (log2: StartUpLog) => log2.ch
)

4.Union

對兩個(gè)或者兩個(gè)以上的DataStream進(jìn)行union操作,產(chǎn)生一個(gè)包含所有DataStream元素的新DataStream。

Connect與 Union 區(qū)別:
1.Union之前兩個(gè)流的類型必須是一樣,Connect可以不一樣,在之后的coMap中再去調(diào)整成為一樣的。
2.Connect只能操作兩個(gè)流,Union可以操作多個(gè).

Sink

1.Kafka

在kafka工具類中添加方法

def getProducer(topic:String): FlinkKafkaProducer011[String] ={
  new FlinkKafkaProducer011[String](brokerList,topic,new SimpleStringSchema())
}

主函數(shù)中添加

val myKafkaProducer: FlinkKafkaProducer011[String] = MyKafkaUtil.getProducer("channel_sum")

sumDstream.map( chCount=>chCount._1+":"+chCount._2 ).addSink(myKafkaProducer)

2.Redis

在Redis工具類中添加方法

def getRedisSink(): RedisSink[(String,String)] ={
    new RedisSink[(String,String)](conf,new MyRedisMapper)
  }

  class MyRedisMapper extends RedisMapper[(String,String)]{
    override def getCommandDescription: RedisCommandDescription = {
      new RedisCommandDescription(RedisCommand.HSET, "channel_count")
     // new RedisCommandDescription(RedisCommand.SET  )
    }
    override def getValueFromData(t: (String, String)): String = t._2
    override def getKeyFromData(t: (String, String)): String = t._1
  }

3.Elasticsearch

def  getElasticSearchSink(indexName:String):  ElasticsearchSink[String]  ={
    val esFunc = new ElasticsearchSinkFunction[String] {
      override def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer): Unit = {
        println("試圖保存:"+element)
        val jsonObj: JSONObject = JSON.parseObject(element)
        val indexRequest: IndexRequest = Requests.indexRequest().index(indexName).`type`("_doc").source(jsonObj)
        indexer.add(indexRequest)
        println("保存1條")
      }
    }

    val sinkBuilder = new ElasticsearchSink.Builder[String](httpHosts, esFunc)

    //刷新前緩沖的最大動(dòng)作量
    sinkBuilder.setBulkFlushMaxActions(10)
     sinkBuilder.build()
  }

4.JDBC 自定義sink

略略略

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • yayooo閱讀 161評論 0 0
  • 5.1 Flink 運(yùn)行模型 以上為 Flink 的運(yùn)行模型,F(xiàn)link 的程序主要由三部分構(gòu)成,分別為 Sour...
    __元昊__閱讀 442評論 1 0
  • Data Sources 源是程序讀取輸入數(shù)據(jù)的位置??梢允褂?StreamExecutionEnvironmen...
    Alex90閱讀 3,069評論 0 1
  • 首先需要編程應(yīng)用的四層抽象: 最底下的一層對用戶是不可見的, 通過ProcessFunction集成到DataSt...
    君劍閱讀 1,043評論 0 1
  • 親密關(guān)系的建立,要發(fā)自內(nèi)心的承諾,你才能和對方走的近。
    小笨魚王月閱讀 87評論 0 0

友情鏈接更多精彩內(nèi)容