- state創(chuàng)建
- state清理 (TTL/clear)
- state存儲 (分布式)
- state的恢復(fù)
flink中對狀態(tài)的分類有以下2種:
- Keyed State (跟key關(guān)聯(lián)在一起,作用于KeyedStream)
- Operator State (和并行度有關(guān))
flink中狀態(tài)的存儲有2個格式:
- Managed (flink內(nèi)置的數(shù)據(jù)結(jié)構(gòu)存儲)
- Raw (原始數(shù)據(jù)本身的數(shù)據(jù)結(jié)構(gòu),但在checkpoint的時候,轉(zhuǎn)化成的byte數(shù)組,flink認(rèn)不出原來的類型)
Key Groups 的個數(shù)與定義的最大并行度相同
1. Keyed State
1.1 Managed Keyed State
| 狀態(tài)種類 | 描述 | API |
|---|---|---|
| ValueState<T> | 單個值 | update, value |
| ListState<T> | 一組值 | add, addAll, get, update |
| ReducingState<T> | 單個值 (代表加入這個狀態(tài)中所有的數(shù)據(jù)的一個聚合) | add<T> |
| AggregatingState<IN,OUT> | 單個值 (類似ReducingState,但支持聚合輸出為不同的類型) | add(IN) |
| MapState<UK, UV> | 一組kv結(jié)構(gòu) | put,putAll, get, entries, key, values |
所有的state都支持clear 操作
State is accessed using the RuntimeContext, so it is only possible in rich functions
1.2 使用state實(shí)現(xiàn)窗口操作
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.util.Collector
import org.apache.flink.streaming.api.scala._
import org.apache.flink.api.scala.createTypeInformation
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
1.3 state TTL
- 狀態(tài)可以根據(jù)TTL的設(shè)置,自動清理掉
- TTL的設(shè)置可以細(xì)粒度到每個元素(一組值)
數(shù)據(jù)的清理是在讀數(shù)據(jù)的時候做的
| 清理策略 | 描述 |
|---|---|
| cleanupFullSnapshot | 在進(jìn)行全量狀態(tài)的快照去掉過期的數(shù)據(jù) |
| cleanupInBackground | |
| cleanupInRocksdbCompactFilter |
1.4 state在DataStream API中的使用
| stateless api | state api | Managed Keyed State |
|---|---|---|
map |
mapWithState |
ValueState |
flatMap |
flatMapWithState |
ValueState |
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
2. Operator State
2.0 狀態(tài)的split和union
Even-split redistribution:
- Each operator returns a List of state elements
- The whole state is logically a concatenation of all lists
- On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators
Union redistribution:
- Each operator returns a List of state elements
- The whole state is logically a concatenation of all lists
- On restore/redistribution, each operator gets the complete list of state elements
2.1 添加checkpoint的hook
Whenever a checkpoint has to be performed,
snapshotState()is calledThe counterpart,
initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actually recovering from an earlier checkpoint
initializeState():
- when the parallel function instance is created during distributed
- but also where state recovery logic is included.
2.1.1 以下示例展示了在sink中添加checkpoint的hook的方法:
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import scala.collection.mutable.ListBuffer
class BufferingSink(threshold: Int = 0)
extends SinkFunction[(String, Int)]
with CheckpointedFunction{
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int), context: SinkFunction.Context[_]): Unit = {
bufferedElements += value
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if(context.isRestored) { // 此處在進(jìn)行數(shù)據(jù)恢復(fù)的時候會用到
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
2.1.2 以下示例展示了在source中添加checkpoint的hook的方法:
當(dāng)更新state或者發(fā)送數(shù)據(jù)的時候,需要獲取checkpoint lock
import java.util
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import org.apache.flink.streaming.api.functions.source.{RichParallelSourceFunction, SourceFunction}
class CounterSource
extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long]{
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
val lock = ctx.getCheckpointLock
while (isRunning) {
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] = {
util.Collections.singletonList(offset)
}
override def restoreState(state: util.List[Long]): Unit = {
for (s <- state) {
offset = s
}
}
}
ref: