Flink 狀態(tài)

  1. state創(chuàng)建
  2. state清理 (TTL/clear)
  3. state存儲 (分布式)
  4. state的恢復(fù)

flink中對狀態(tài)的分類有以下2種:

  1. Keyed State (跟key關(guān)聯(lián)在一起,作用于KeyedStream)
  2. Operator State (和并行度有關(guān))

flink中狀態(tài)的存儲有2個格式:

  1. Managed (flink內(nèi)置的數(shù)據(jù)結(jié)構(gòu)存儲)
  2. Raw (原始數(shù)據(jù)本身的數(shù)據(jù)結(jié)構(gòu),但在checkpoint的時候,轉(zhuǎn)化成的byte數(shù)組,flink認(rèn)不出原來的類型)

Key Groups 的個數(shù)與定義的最大并行度相同

1. Keyed State

1.1 Managed Keyed State

狀態(tài)種類 描述 API
ValueState<T> 單個值 update, value
ListState<T> 一組值 add, addAll, get, update
ReducingState<T> 單個值 (代表加入這個狀態(tài)中所有的數(shù)據(jù)的一個聚合) add<T>
AggregatingState<IN,OUT> 單個值 (類似ReducingState,但支持聚合輸出為不同的類型) add(IN)
MapState<UK, UV> 一組kv結(jié)構(gòu) put,putAll, get, entries, key, values

所有的state都支持clear 操作

State is accessed using the RuntimeContext, so it is only possible in rich functions

1.2 使用state實(shí)現(xiàn)窗口操作

import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.createTypeInformation

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

  private var sum: ValueState[(Long, Long)] = _

  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {

    // access the state value
    val tmpCurrentSum = sum.value

    // If it hasn't been used before, it will be null
    val currentSum = if (tmpCurrentSum != null) {
      tmpCurrentSum
    } else {
      (0L, 0L)
    }

    // update the count
    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)

    // update the state
    sum.update(newSum)

    // if the count reaches 2, emit the average and clear the state
    if (newSum._1 >= 2) {
      out.collect((input._1, newSum._2 / newSum._1))
      sum.clear()
    }
  }

  override def open(parameters: Configuration): Unit = {
    sum = getRuntimeContext.getState(
      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
    )
  }
}

object ExampleCountWindowAverage {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.fromCollection(List(
      (1L, 3L),
      (1L, 5L),
      (1L, 7L),
      (1L, 4L),
      (1L, 2L)
    )).keyBy(_._1)
      .flatMap(new CountWindowAverage())
      .print()
    // the printed output will be (1,4) and (1,5)

    env.execute("ExampleManagedState")
  }
}

object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)
  env.execute("ExampleManagedState")
}

1.3 state TTL

  1. 狀態(tài)可以根據(jù)TTL的設(shè)置,自動清理掉
  2. TTL的設(shè)置可以細(xì)粒度到每個元素(一組值)

數(shù)據(jù)的清理是在讀數(shù)據(jù)的時候做的

清理策略 描述
cleanupFullSnapshot 在進(jìn)行全量狀態(tài)的快照去掉過期的數(shù)據(jù)
cleanupInBackground
cleanupInRocksdbCompactFilter

1.4 state在DataStream API中的使用

stateless api state api Managed Keyed State
map mapWithState ValueState
flatMap flatMapWithState ValueState
val stream: DataStream[(String, Int)] = ...

val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

2. Operator State

2.0 狀態(tài)的split和union

Even-split redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators

Union redistribution:

  1. Each operator returns a List of state elements
  2. The whole state is logically a concatenation of all lists
  3. On restore/redistribution, each operator gets the complete list of state elements

2.1 添加checkpoint的hook

  1. Whenever a checkpoint has to be performed, snapshotState() is called

  2. The counterpart, initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint

initializeState():

  1. when the parallel function instance is created during distributed
  2. but also where state recovery logic is included.

2.1.1 以下示例展示了在sink中添加checkpoint的hook的方法:

import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction

import scala.collection.mutable.ListBuffer

class BufferingSink(threshold: Int = 0)
  extends SinkFunction[(String, Int)]
    with CheckpointedFunction{
  @transient
  private var checkpointedState: ListState[(String, Int)] = _

  private val bufferedElements = ListBuffer[(String, Int)]()


  override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
    bufferedElements += value
    if (bufferedElements.size == threshold) {
      for (element <- bufferedElements) {
        // send it to the sink
      }
      bufferedElements.clear()
    }
  }

  override def snapshotState(context: FunctionSnapshotContext): Unit = {
    checkpointedState.clear()
    for (element <- bufferedElements) {
      checkpointedState.add(element)
    }
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val descriptor = new ListStateDescriptor[(String, Int)](
      "buffered-elements",
      TypeInformation.of(new TypeHint[(String, Int)]() {})
    )
    checkpointedState = context.getOperatorStateStore.getListState(descriptor)

    if(context.isRestored) {  // 此處在進(jìn)行數(shù)據(jù)恢復(fù)的時候會用到
      for(element <- checkpointedState.get()) {
        bufferedElements += element
      }
    }
  }
}

2.1.2 以下示例展示了在source中添加checkpoint的hook的方法:

當(dāng)更新state或者發(fā)送數(shù)據(jù)的時候,需要獲取checkpoint lock

import java.util

import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}

class CounterSource
  extends RichParallelSourceFunction[Long]
    with ListCheckpointed[Long]{

  @volatile
  private var isRunning = true

  private var offset = 0L

  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
    val lock = ctx.getCheckpointLock
    while (isRunning) {
      lock.synchronized({
        ctx.collect(offset)
        offset += 1
      })
    }
  }

  override def cancel(): Unit = isRunning = false

  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
    util.Collections.singletonList(offset)
  }

  override def restoreState(state: util.List[Long]): Unit = {
    for (s <- state) {
      offset = s
    }
  }
}

ref:

  1. https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容