Rescaling Stateful Applications in Production

一、概述

數(shù)據(jù)局部性是Flink中的一個(gè)關(guān)鍵原則,并且強(qiáng)烈影響狀態(tài)的存儲(chǔ)和訪問方式。Flink中的狀態(tài)都是Local State。Why local state is a fundamental primitive in stream processing

  • 1、Apache Flink is a massively parallel distributed system that allows stateful stream processing at large scale. For scalability, a Flink job is logically decomposed into a graph of operators, and the execution of each operator is physically decomposed into multiple parallel operator instances. Conceptually, each parallel operator instance in Flink is an independent task that can be scheduled on its own machine in a network-connected cluster of shared-nothing machines.

Apache Flink是一個(gè)大規(guī)模并行分布式系統(tǒng),允許大規(guī)模的有狀態(tài)流處理。對(duì)于可伸縮性,F(xiàn)link作業(yè)在邏輯上被分解為運(yùn)算符圖,并且每個(gè)運(yùn)算符的執(zhí)行在物理上被分解為多個(gè)并行運(yùn)算符實(shí)例。從概念上講,F(xiàn)link中的每個(gè)并行運(yùn)算符實(shí)例都是一個(gè)獨(dú)立的任務(wù),可以在無(wú)共享機(jī)器的網(wǎng)絡(luò)連接集群中的自己的機(jī)器上進(jìn)行調(diào)度。

  • 2、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.

對(duì)于此設(shè)置中的高吞吐量和低延遲,必須最小化任務(wù)之間的網(wǎng)絡(luò)通信。在Flink中,用于流處理的網(wǎng)絡(luò)通信僅發(fā)生在作業(yè)運(yùn)算符圖中的邏輯邊緣(垂直),以便流數(shù)據(jù)可以從上游傳輸?shù)较掠蝟perator。

  • 3、For high throughput and low latency in this setting, network communications among tasks must be minimized. In Flink, network communication for stream processing only happens along the logical edges in the job’s operator graph (vertically), so that the stream data can be transferred from upstream to downstream operators.

對(duì)于此設(shè)置中的高吞吐量和低延遲,必須最小化任務(wù)之間的網(wǎng)絡(luò)通信。在Flink中,用于流處理的網(wǎng)絡(luò)通信僅發(fā)生在作業(yè)運(yùn)算符圖中的邏輯邊緣(垂直),以便流數(shù)據(jù)可以從上游傳輸?shù)较掠蝟perator。

  • 4、However, there is no communication between the parallel instances of an operator (horizontally). To avoid such network communication, data locality is a key principle in Flink and strongly affects how state is stored and accessed.
    但是,operator的并行實(shí)例之間沒有通信(水平)。為了避免這種網(wǎng)絡(luò)通信,數(shù)據(jù)局部性是Flink中的一個(gè)關(guān)鍵原則,并且強(qiáng)烈影響狀態(tài)的存儲(chǔ)和訪問方式。

二、Rescaling Stateful Stream Processing Jobs

image.png

三、Reassigning Operator State When Rescaling

image.png

Operator States的動(dòng)態(tài)擴(kuò)展是非常靈活的,現(xiàn)提供了3種擴(kuò)展,下面分別介紹:

  • 1、ListState:并發(fā)度在改變的時(shí)候,會(huì)將并發(fā)上的每個(gè)List都取出,然后把這些List合并到一個(gè)新的List,然后根據(jù)元素的個(gè)數(shù)在均勻分配給新的Task;
  • 2、UnionListState:相比于ListState更加靈活,把劃分的方式交給用戶去做,當(dāng)改變并發(fā)的時(shí)候,會(huì)將原來(lái)的List拼接起來(lái)。然后不做劃分,直接交給用戶;
  • 3、BroadcastState:如大表和小表做Join時(shí),小表可以直接廣播給大表的分區(qū),在每個(gè)并發(fā)上的數(shù)據(jù)都是完全一致的。做的更新也相同,當(dāng)改變并發(fā)的時(shí)候,把這些數(shù)據(jù)COPY到新的Task即可;
1、Operator State只有一種數(shù)據(jù)結(jié)構(gòu)即:ListState<T>,并且是全局的, Operator State的每個(gè)SubTask貢獻(xiàn)一部分T給ListState<T>。正是因?yàn)槭荓ist,Operator在rescaling的時(shí)候,才會(huì)進(jìn)行分配。否則一個(gè)T,對(duì)于Flink,這個(gè)T就是一個(gè)黑盒,F(xiàn)link無(wú)法進(jìn)行分配。
2、為什么Operator State只提供了一種數(shù)據(jù)結(jié)構(gòu)ListState<T>,就是因?yàn)镺perator State的Rescale的問題。
  • 1、Operator State不像Keyed State可以有一個(gè)全局的狀態(tài)(對(duì)于每一個(gè)Key來(lái)說(shuō),當(dāng)然Transform也是圍繞一個(gè)個(gè)key獨(dú)立進(jìn)行),因?yàn)闊o(wú)論怎樣改變并行度,所有具有相同key的Records都會(huì)落到一個(gè)Task上面,只是有可能這個(gè)key對(duì)應(yīng)的Records換個(gè)Task而已,一個(gè)key只有一個(gè)并行度。
  • 2、對(duì)于Operator State來(lái)說(shuō),當(dāng)并行度改變的時(shí)候,上游過(guò)來(lái)的Records會(huì)重新散列到SubTask上面,可以理解為每個(gè)subTask上的Records變了。所以ListState中的T(item),我們希望是is considered an atomic, independently re-distributable part of the operator state.

圖解

  • As a generalized approach to solve this black box problem, we slightly modified the checkpointing interface, called ListCheckpointed. Figure 2B shows the new checkpointing interface, which returns and receives a list of state partitions. Introducing a list instead of a single object makes the meaningful partitioning of state explicit: each item in the list still remains a black box to Flink, but is considered an atomic, independently re-distributable part of the operator state.

作為解決這個(gè)黑盒問題的一種通用方法,我們稍微修改了一個(gè)名為的checkpointing接口,稱為L(zhǎng)istCheckpointed。圖2B顯示了新的檢查點(diǎn)接口,它返回并接收狀態(tài)分區(qū)列表。引入列表而不是單個(gè)對(duì)象會(huì)使?fàn)顟B(tài)的有意義分區(qū)顯式化:列表中的每個(gè)T仍然是Flink的黑盒子,但被認(rèn)為是原子的,可獨(dú)立重新分配的operator state的一部分。

image.png

四、Reassigning Keyed State When Rescaling

1、Question:

  • 1、While this automatically solves the problem of logically remapping the state to sub-tasks after rescaling, there is one more practical problem left to solve: how can we efficiently transfer the state to the subtasks’ local backends?

雖然這會(huì)自動(dòng)解決重新縮放后邏輯上將狀態(tài)重新映射到子任務(wù)的問題(因?yàn)橛蒶ey??),但還有一個(gè)實(shí)際問題需要解決:我們?nèi)绾尾拍苡行У貙顟B(tài)轉(zhuǎn)移到子任務(wù)的本地后端?

2、Answer

  • 1、Naive Approach
    A naive approach might be to read all the previous subtask state from the checkpoint in all sub-tasks and filter out the matching keys for each sub-task. While this approach can benefit from a sequential read pattern, each subtask potentially reads a large fraction of irrelevant state data, and the distributed file system receives a huge number of parallel read requests.

一種天真的方法可能是從所有子任務(wù)中的檢查點(diǎn)讀取所有先前的子任務(wù)狀態(tài),并過(guò)濾掉每個(gè)子任務(wù)的匹配鍵。雖然這種方法可以從順序讀取模式中受益,但是每個(gè)子任務(wù)可能會(huì)讀取大部分不相關(guān)的狀態(tài)數(shù)據(jù),并且分布式文件系統(tǒng)接收大量的并行讀取請(qǐng)求。

  • 2、Index
    Another approach could be to build an index that tracks the location of the state for each key in the checkpoint. With this approach, all sub-tasks could locate and read the matching keys very selectively. This approach would avoid reading irrelevant data, but it has two major downsides. A materialized index for all keys, i.e. a key-to-read-offset mapping, can potentially grow very large. Furthermore, this approach can also introduce a huge amount of random I/O (when seeking to the data for individual keys, see Figure 3A, which typically entails very bad performance in distributed file systems.

另一種方法可以是構(gòu)建一個(gè)索引,該索引跟蹤檢查點(diǎn)中每個(gè)密鑰的狀態(tài)位置。通過(guò)這種方法,所有子任務(wù)都可以非常有選擇地定位和讀取匹配的鍵。這種方法可以避免讀取不相關(guān)的數(shù)據(jù),但它有兩個(gè)主要缺點(diǎn)。1、所有鍵的物化索引(即鍵 - 讀 - 偏移映射)可能會(huì)變得非常大。此外,2、這種方法還可以引入大量的隨機(jī)I/O(當(dāng)尋求單個(gè)key的數(shù)據(jù)時(shí),參見圖3A,這通常在分布式文件系統(tǒng)中帶來(lái)非常糟糕的性能)。

image.png
  • 3、key-groups(the atomic unit of state assignment)

However, the new parallelism can be at most the previously configured max-parallelism. Once a job was started, the max-parallelism is baked into the savepoints and cannot be changed anymore.
新的并行度最多可以是先前配置的最大并行度。作業(yè)啟動(dòng)后,最大并行度將被烘焙到保存點(diǎn)中,并且無(wú)法再進(jìn)行更改。除非拋棄所有狀態(tài),作為一個(gè)新job開始

  • 1、Flink’s approach sits in between those two extremes by introducing key-groups as the atomic unit of state assignment. How does this work? The number of key-groups must be determined before the job is started and (currently) cannot be changed after the fact. As key-groups are the atomic unit of state assignment, this also means that the number of key-groups is the upper limit for parallelism. In a nutshell, key-groups give us a way to trade between flexibility in rescaling (by setting an upper limit for parallelism) and the maximum overhead involved in indexing and restoring the state.

  • 2、We assign key-groups to subtasks as ranges. This makes the reads on restore not only sequential within each key-group, but often also across multiple key-groups. An additional benefit: this also keeps the metadata of key-group-to-subtask assignments very small. We do not maintain explicit lists of key-groups because it is sufficient to track the range boundaries.

  • 3、We have illustrated rescaling from parallelism 3 to 4 with 10 key-groups in Figure 3B. As we can see, introducing key-groups and assigning them as ranges greatly improves the access pattern over the naive approach. Equation 2 and 3 in Figure 3B also details how we compute key-groups and the range assignment.

image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,817評(píng)論 0 10
  • 正在屋里看電視,一位鄰居敲門進(jìn)來(lái)。 她眼睛紅紅的。 “出什么事了?” 話剛一出口,她便嗚嗚哭了起來(lái)。 “這日子沒法...
    澠池3112王莉莉閱讀 419評(píng)論 2 4
  • 1. 參考文獻(xiàn): [1]CNN的發(fā)展史.http://www.cnblogs.com/52machinelearn...
    dreamsfuture閱讀 649評(píng)論 0 0

友情鏈接更多精彩內(nèi)容