鍵State和操作State (Keyed State and Operator State)
Flink中有兩種基本的狀態(tài):鍵狀態(tài)(Keyed State)和操作狀態(tài)( Operator State)。
鍵狀態(tài)(Keyed State)
鍵狀態(tài)(Keyed State)是與鍵相關(guān)的,只能在KeyedStream的函數(shù)和操作符中使用。
可以把鍵狀態(tài)(Keyed State)理解成已經(jīng)分區(qū)的操作狀態(tài)(Operator State)或者分片,每個(gè)鍵只有一個(gè)狀態(tài)分區(qū)。每個(gè)鍵狀態(tài)(Keyed State)在邏輯上綁定到<parallel-operator-instance, key>的唯一組合,由于每個(gè)鍵是鍵操作符的一個(gè)并行實(shí)例,可以將其簡單地理解為<operator, key>。
鍵狀態(tài)(Keyed State)進(jìn)一步組合成所謂的鍵組(Key Groups) ,鍵組(Key Groups)是Flink重新分配鍵狀態(tài)(Keyed State)的原子單元;鍵組(Key Groups)的數(shù)量與定義的最大并行度完全相同。在執(zhí)行過程中,每個(gè)鍵操作符的并行實(shí)例都使用一個(gè)或多個(gè)鍵組的鍵。
操作符狀態(tài)(Operator State)
對(duì)于操作符狀態(tài)(Operator State),每個(gè)操作符狀態(tài)(Operator State)都綁定一個(gè)并行操作符實(shí)例。 Kafka Connector是在Flink中使用操作符狀態(tài)(Operator State)一個(gè)很好的例子。Kafka消費(fèi)者的每個(gè)并行實(shí)例都維護(hù)一個(gè)topic分區(qū)和偏移量(offset)的映射作為其操作符狀態(tài)(Operator State)。
當(dāng)并行度發(fā)生改變時(shí),操作符狀態(tài)(Operator State)接口支持在并行操作符實(shí)例之間重新分布狀態(tài)。
原始和管理狀態(tài)(Raw and Managed State)
鍵狀態(tài)(Keyed State)和操作符狀態(tài)(Operator State)有兩種形式:管理狀態(tài)和原始狀態(tài)。
管理狀態(tài)(Managed State)表示在Flink運(yùn)行時(shí)約束的數(shù)據(jù)結(jié)構(gòu),比如內(nèi)部的哈希表或者RocksDB。例如:ValueState, ListState。Flink在運(yùn)行時(shí)對(duì)狀態(tài)進(jìn)行編碼,并將其寫入檢查點(diǎn)(checkpoint)。
原始狀態(tài)(Raw State)是狀態(tài)操作符保存在自己的數(shù)據(jù)結(jié)構(gòu)中。當(dāng)觸發(fā)檢查點(diǎn)時(shí),它們只將字節(jié)序列寫入檢查點(diǎn)。Flink不知道狀態(tài)的數(shù)據(jù)結(jié)構(gòu),只看到原始字節(jié)。
所有datastream函數(shù)都可以使用管理狀態(tài),但是原始狀態(tài)接口只能在實(shí)現(xiàn)操作符時(shí)使用。 建議使用管理狀態(tài)(而不是原始狀態(tài)),因?yàn)槭褂霉芾頎顟B(tài),F(xiàn)link能夠在并行度改變時(shí)自動(dòng)重新分發(fā)狀態(tài),并且更好的內(nèi)存管理。
注意:如果您的管理狀態(tài)需要自定義序列化邏輯,請(qǐng)參閱 corresponding guide ,以確保未來的兼容性。Flink的默認(rèn)序列化不需要特殊處理。
使用管理鍵狀態(tài)(Using Managed Keyed State)
管理鍵狀態(tài)接口提供對(duì)不同類型狀態(tài)的訪問,這些狀態(tài)的作用域都是當(dāng)前輸入元素的鍵。這意味著這種類型的狀態(tài)只能在KeyedStream上使用,可以通過stream.keyBy(…)創(chuàng)建。
我們首先看一下Flink中不同類型的可用狀態(tài),然后了解如何在程序中使用它們??捎脿顟B(tài)為:
ValueState<T>: 保存一個(gè)可以修改和獲取的值(如前所述,該值的作用域?yàn)閕nput元素的key,因此操作的每個(gè)鍵可能都有一個(gè)值)。修改值可以使用
update(T),獲取值可以使用T value()。ListState<T>: 存儲(chǔ)一個(gè)元素列表??梢宰芳釉?,并且可以從當(dāng)前存儲(chǔ)的所有元素中獲取一個(gè)可迭代(Iterable)的元素。添加元素使用
add(T)或addAll(List<T>),獲取元素可以使用Iterable<T> get()。還可以使用update(list <T>)修改并覆蓋現(xiàn)有列表。ReducingState<T>: 存儲(chǔ)一個(gè)值,該值表示添加到該狀態(tài)所有值的聚合。類似于
ListState,添加元素使用add(T)通過ReduceFunction聚合。
-AggregatingState<IN, OUT>: 存儲(chǔ)一個(gè)值,該值表示添加到該狀態(tài)的所有值的聚合。 與ReducingState相反,聚合類型添加到該狀態(tài)的元素可以有不同類型。與ListState相同,但是使用add(IN)添加元素使用指定的AggregateFunction進(jìn)行聚合。
-
FoldingState<T, ACC>: 存儲(chǔ)一個(gè)值,該值表示添加到該狀態(tài)的所有值的聚合。與
ReducingState相反,聚合類型添加到該狀態(tài)的元素可以有不同類型。類似于ListState,但是使用add(T)添加元素使用指定的FoldFunction被折疊成一個(gè)聚合。
-
MapState<UK, UV>: 保存了一個(gè)映射列表??梢栽跔顟B(tài)中添加鍵-值對(duì),并可以從當(dāng)前存儲(chǔ)的所有map中獲取一個(gè)可迭代的元素。使用
put(UK, UV)或putAll(Map<UK, UV>)添加映射。獲取值可以使用get(UK)。獲取mappings, keys和values的可迭代數(shù)據(jù)可以分別使用entry()、keys()和values()。
所有類型的狀態(tài)都有一個(gè)clear()方法,用于清除當(dāng)前活動(dòng)鍵(即輸入元素的鍵)的狀態(tài)。
注意,FoldingState和FoldingStateDescriptor在Flink 1.4中已經(jīng)被棄用,將來會(huì)被完全刪除。請(qǐng)使用AggregatingState和AggregatingStateDescriptor。
注意,第一,這些狀態(tài)對(duì)象僅可以與狀態(tài)進(jìn)行交互。狀態(tài)不僅可以存儲(chǔ)在內(nèi)部,也可以存儲(chǔ)在磁盤或其他地方。第二,從狀態(tài)獲得的值取決于輸入元素的鍵。因此,如果鍵不同,那么在一次函數(shù)調(diào)用中獲得的值可能與另一次調(diào)用中的值不同。
要獲得狀態(tài)句柄,必須創(chuàng)建StateDescriptor。它保存了狀態(tài)的名稱(在下面可以看到可以創(chuàng)建多個(gè)狀態(tài),必須具有惟一的名稱,以便可以引用它們),狀態(tài)保存的值的類型,可能還有用戶指定的函數(shù),如ReduceFunction。根據(jù)要檢索的狀態(tài)類型,可以創(chuàng)建ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor或MapStateDescriptor。
使用RuntimeContext訪問狀態(tài),因此只能在rich函數(shù)中訪問。有關(guān)這方面的信息,請(qǐng)參閱這里,稍后將看到一個(gè)示例。在RichFunction中可用的RuntimeContext有以下幾種訪問狀態(tài)的方法:
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
- FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
這是一個(gè)FlatMapFunction的例子,展示了所有元素如何組合在一起:
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// access the state value
val tmpCurrentSum = sum.value
// If it hasn't been used before, it will be null
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// update the count
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
// update the state
sum.update(newSum)
// if the count reaches 2, emit the average and clear the state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.fromCollection(List(
(1L, 3L),
(1L, 5L),
(1L, 7L),
(1L, 4L),
(1L, 2L)
)).keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// the printed output will be (1,4) and (1,5)
env.execute("ExampleManagedState")
}
這個(gè)例子實(shí)現(xiàn)了一個(gè)簡單的計(jì)數(shù)窗口。我們按第一個(gè)字段鍵元組(在本例中所有的鍵都是1)。該函數(shù)在ValueState中存儲(chǔ)計(jì)數(shù)和一個(gè)正在運(yùn)行的和。一旦計(jì)數(shù)達(dá)到2,它將發(fā)出平均值并清除狀態(tài),以便我們從0開始。注意,如果在第一個(gè)字段中有不同值的元組,那么對(duì)于每個(gè)不同的輸入鍵,將保持不同的狀態(tài)值。
狀態(tài)生存時(shí)間(State Time-To-Live (TTL))
可以將生存時(shí)間(TTL)分配給任何類型的鍵狀態(tài)。如果已配置TTL且狀態(tài)值已過期,將以最佳方式清理存儲(chǔ)值,下面將對(duì)此進(jìn)行更詳細(xì)的討論。
所有狀態(tài)集合類型每個(gè)條目都支持TTLs。這意味著列表元素和映射項(xiàng)可以獨(dú)立過期。
為了使用狀態(tài)TTL,首先必須構(gòu)建一個(gè)StateTtlConfig配置對(duì)象。然后TTL可以通過傳遞配置在任何狀態(tài)描述符中啟用:
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
stateDescriptor.enableTimeToLive(ttlConfig)
配置有幾個(gè)選項(xiàng)需要考慮:
newBuilder方法的第一個(gè)參數(shù)是必選的,用于設(shè)置生存時(shí)長的值。
更新類型在狀態(tài)TTL刷新時(shí)配置(默認(rèn)情況下為OnCreateAndWrite):
- StateTtlConfig.UpdateType.OnCreateAndWrite - 只有在創(chuàng)建和寫入時(shí)訪問
- StateTtlConfig.UpdateType.OnReadAndWrite - 在讀取時(shí)訪問
狀態(tài)可見性配置如果未清除過期值,則在讀取訪問時(shí)是否返回過期值(默認(rèn)情況下,NeverReturnExpired):
- StateTtlConfig.StateVisibility.NeverReturnExpired - 過期的值永遠(yuǎn)不會(huì)返回
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 如果仍然可用返回
在NeverReturnExpired的情況下,過期狀態(tài)的行為就像不再存在一樣,即使仍然需要?jiǎng)h除。該選項(xiàng)對(duì)于數(shù)據(jù)在TTL之后對(duì)于不可讀訪問的用例非常有用,例如,處理敏感資料的應(yīng)用。
另一個(gè)選項(xiàng)ReturnExpiredIfNotCleanedUp允許在清理之前返回過期狀態(tài)。
說明
狀態(tài)后端存儲(chǔ)最后一次修改的時(shí)間戳和值,這意味著啟用該特性會(huì)增加狀態(tài)存儲(chǔ)的消耗。堆狀態(tài)后端(Heap state backend)使用Java對(duì)象的狀態(tài)對(duì)象引用和內(nèi)存中的原始long值。RocksDB狀態(tài)后端為每個(gè)存儲(chǔ)值、列表項(xiàng)或映射項(xiàng)添加8字節(jié)。
目前只支持處理時(shí)間的TTLs。
- 試圖恢復(fù)狀態(tài)(以前在沒有TTL的情況下配置的狀態(tài)),使用啟用TTL的描述符或反之,將導(dǎo)致兼容性失敗和statmigrationexception異常。
- 只有當(dāng)值序列化能夠處理空值時(shí),TTL的映射狀態(tài)當(dāng)前才支持空值。如果序列化器不支持null值,可以使用NullableSerializer對(duì)其進(jìn)行包裝,代價(jià)是在序列化形式中增加一個(gè)字節(jié)。
過期狀態(tài)的清理(Cleanup of Expired State)
目前,過期值只有在顯式讀取時(shí)才會(huì)被刪除,例如,通過調(diào)用valuestat .value()。
**注意: **這意味著默認(rèn)情況下,如果未讀取過期狀態(tài)就不會(huì)刪除它,這可能導(dǎo)致狀態(tài)不斷增長。這可能會(huì)在未來的版本中改變。
此外,可以在獲取完整狀態(tài)快照時(shí)激活清理,將減少其大小。當(dāng)前實(shí)現(xiàn)不會(huì)清理本地狀態(tài),但從上一個(gè)快照恢復(fù)時(shí),它不會(huì)包含已刪除的過期狀態(tài)??梢栽赟tateTtlConfig中配置:
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
此選項(xiàng)不適用于RocksDB狀態(tài)后端中的增量檢查點(diǎn)。
以后還會(huì)添加更多的策略在后臺(tái)自動(dòng)清理過期狀態(tài)。
在Scala DataStream API中聲明(State in the Scala DataStream API)
除了上面描述的接口之外,Scala API還為KeyedStream上具有單個(gè)ValueState的有狀態(tài)map()或flatMap()函數(shù)提供了快捷方式。用戶函數(shù)在一個(gè)選項(xiàng)中獲取ValueState的當(dāng)前值,并且必須返回一個(gè)更新后的值,該值將用于更新狀態(tài)。
val stream: DataStream[(String, Int)] = ...
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
使用管理操作符狀態(tài)(Using Managed Operator State)
要使用托管理操作符狀態(tài),有狀態(tài)函數(shù)可以實(shí)現(xiàn)更通用的CheckpointedFunction接口,也可以實(shí)現(xiàn)listcheckpoint <T extends Serializable>接口。
CheckpointedFunction
CheckpointedFunction接口通過不同的重新分配方案提供對(duì)非鍵狀態(tài)的訪問。它需要實(shí)現(xiàn)兩種方法:
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
每當(dāng)必須執(zhí)行檢查點(diǎn)時(shí),都會(huì)調(diào)用snapshotState()。對(duì)應(yīng)的initializeState()在每次初始化用戶定義的函數(shù)時(shí)調(diào)用,可以是在函數(shù)第一次初始化時(shí)調(diào)用,也可以是在函數(shù)實(shí)際從較早的檢查點(diǎn)恢復(fù)時(shí)調(diào)用。因此,initializeState()不僅是初始化不同類型狀態(tài)的地方,也包括狀態(tài)恢復(fù)邏輯。
目前,支持List樣式的管理操作符狀態(tài)。狀態(tài)是一個(gè)可序列化對(duì)象的列表,彼此獨(dú)立,因此在重新掃描時(shí)有資格進(jìn)行重新分發(fā)。換句話說,這些對(duì)象是可以重新分布非鍵狀態(tài)的最佳粒度。根據(jù)狀態(tài)訪問方法的不同,定義了以下重分發(fā)方案:
**Even-split redistribution: ** 每個(gè)操作符返回一個(gè)狀態(tài)元素列表。整個(gè)狀態(tài)在邏輯上是串聯(lián)所有列表。在恢復(fù)/重新分發(fā)時(shí),該列表被平均地分成盡可能多的并行操作符子列表。每個(gè)操作符獲取一個(gè)子列表,該子列表可以是空的,也可以包含一個(gè)或多個(gè)元素。例如,如果并行度為1,則操作符的檢查點(diǎn)狀態(tài)包含元素element1和element2。當(dāng)并行度增加到2時(shí),element1可能會(huì)出現(xiàn)在運(yùn)算符實(shí)例0中,而element2會(huì)出現(xiàn)在運(yùn)算符實(shí)例1中。
Union redistribution: 每個(gè)操作符返回一個(gè)狀態(tài)元素列表。整個(gè)狀態(tài)在邏輯上是串聯(lián)所有列表。在恢復(fù)/重新分發(fā)時(shí),每個(gè)操作符都獲得狀態(tài)元素的完整列表。
下面是一個(gè)有狀態(tài)SinkFunction的例子,它使用CheckpointedFunction將元素發(fā)送到外部之前緩沖它們。它演示了基本的均分重分發(fā)列表狀態(tài):
class BufferingSink(threshold: Int = 0)
extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int)): Unit = {
bufferedElements += value
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
if(context.isRestored) {
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
initializeState方法以FunctionInitializationContext作為參數(shù)。用于初始化非鍵狀態(tài)“containers”。這是ListState類型的容器,其中非鍵狀態(tài)對(duì)象將在檢查點(diǎn)上存儲(chǔ)。
注意狀態(tài)是如何初始化的,類似于鍵狀態(tài),使用一個(gè)StateDescriptor,其中包含狀態(tài)名和關(guān)于狀態(tài)持有的值的類型的信息:
val descriptor = new ListStateDescriptor[(String, Long)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Long)]() {})
)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
狀態(tài)訪問方法的命名約定包含其重新分布模式及其狀態(tài)結(jié)構(gòu)。例如,要在還原時(shí)使用具有union重分發(fā)方案的list state,使用getUnionListState(descriptor)訪問狀態(tài)。如果方法名不包含重分發(fā)模式,例如getListState(descriptor),它僅僅意味著將使用基本的均分重分發(fā)模式。
在初始化容器之后,我們使用上下文的isrestore()方法檢查失敗后是否正在恢復(fù)。如果是true,即正在恢復(fù),則應(yīng)用恢復(fù)邏輯。
如修改后的BufferingSink代碼所示,狀態(tài)初始化期間恢復(fù)的數(shù)據(jù)保存在一個(gè)ListState變量中,以備將來在snapshotState()中使用。在那里,ListState將清除前一個(gè)檢查點(diǎn)包含的所有對(duì)象,然后被我們想要檢查的新選項(xiàng)填滿。
另外,鍵狀態(tài)也可以在initializeState()方法中初始化??梢允褂?code>FunctionInitializationContext來完成。
ListCheckpointed
ListCheckpointed接口是CheckpointedFunction的一個(gè)更有限的變體,它只支持列表樣式的狀態(tài),在恢復(fù)時(shí)使用均分重分發(fā)方案。它還需要實(shí)現(xiàn)兩種方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
在snapshotState()上,操作應(yīng)該向檢查點(diǎn)返回一個(gè)對(duì)象列表,而restoreState()必須在恢復(fù)時(shí)處理這個(gè)列表。如果狀態(tài)不可重分區(qū),則始終可以在snapshotState()中返回Collections.singletonList(MY_STATE)。
有狀態(tài)的源函數(shù)(Stateful Source Functions)
與其他操作符相比,有狀態(tài)源需要更多的關(guān)注。為了更新狀態(tài)和輸出集合的原子性(用于故障/恢復(fù)上的精確一次語義),用戶需要從源上下文獲取一個(gè)鎖。
Scala
class CounterSource
extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long] {
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
val lock = ctx.getCheckpointLock
while (isRunning) {
// output and state update are atomic
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def restoreState(state: util.List[Long]): Unit =
for (s <- state) {
offset = s
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
Collections.singletonList(offset)
}
當(dāng)Flink完全確認(rèn)檢查點(diǎn)時(shí),一些操作可能需要這些信息來與外部世界進(jìn)行通信。在本例中,請(qǐng)參見org.apache.flink.runtime.state.CheckpointListener接口。