開發(fā)流式量測數(shù)據(jù)處理系統(tǒng)的基本模式

? 譯:Patterns for Streaming Measurement Data with Akka Streams

? 可用于關(guān)鍵系統(tǒng)worker分離集群的思路:分布式物聯(lián)網(wǎng),并使用MQTT 協(xié)議作為傳輸協(xié)議。Akka actor系統(tǒng)具有的輕量級(jí)的松耦合的actors,可以在去中心化的集群中具有強(qiáng)大的路由,分片和發(fā)布/訂閱功能。文章展示了基于這個(gè)系統(tǒng)的物聯(lián)網(wǎng)架構(gòu),并編寫了相關(guān)代碼來作演示

? akka MQTT物聯(lián)網(wǎng)

? 對(duì)于流式數(shù)據(jù)宜看作為一個(gè)個(gè)獨(dú)立數(shù)據(jù)消息組成的數(shù)據(jù)流,因?yàn)檫@樣符合計(jì)算機(jī)處理流式數(shù)據(jù)的本質(zhì)比如TCP(因?yàn)槭橇魉詡鬏攲拥腡CP并不知道數(shù)據(jù)截止到哪里就是一個(gè)完整消息,所以在TCP之上需要指定消息格式子協(xié)議,最常用的就是在頭部就指定本消息長度,其他方式比如短連接可以以客戶端斷開連接作為截止信號(hào)、對(duì)xml消息可以以根元素的結(jié)束元素為截止),總體上看流式數(shù)據(jù)最像時(shí)序數(shù)據(jù),按照時(shí)間維度有序流動(dòng)的數(shù)據(jù)消息,適宜用Seq上的時(shí)間窗口處理。

? actor是適合處理流式數(shù)據(jù)的,但你自己在actor之間實(shí)現(xiàn)穩(wěn)定的流式數(shù)據(jù)傳輸卻是冗長易錯(cuò)的,除了發(fā)送接收,還得注意限流防止壓垮緩存/郵箱,最重要的是,流處理已經(jīng)總結(jié)歸納為了諾干最佳實(shí)踐模式,基于actor去實(shí)現(xiàn)這些模式代碼已經(jīng)可以相對(duì)固定了,也就是可以形成高度抽象、高度通用的模板,這就是Akka Streams API。流式處理的最佳實(shí)踐模式在全行業(yè)的沉淀是RSocket

Chana

? Akka Streams為你提供直觀安全的模板化/程式化的流式處理開發(fā)模式,自帶基于資源的限流、反壓,消滅OutOfMemoryErrors,為此我們的流必須能指定使用緩沖大小、如果消費(fèi)者跟不上還需要能夠逐層反壓上游生產(chǎn)者。Akka Streams API是給最終用戶用的,響應(yīng)流宣言是描述不同流處理系統(tǒng)間如何交互的,這是他們的不同之處。

? 一些優(yōu)質(zhì)中文scala、AKKA資源:

1、掘金水滴

2、scala官網(wǎng)

3、推特

4、ScalaCool

5、shawdubie.com/

6、? http://letitcrash.com/post/30509298968/case-study-an-auto-updating-cache-using-actors

? 作者Colin Breck長期致力于工業(yè)監(jiān)控流式數(shù)據(jù)系統(tǒng),具備分布式數(shù)據(jù)采集系統(tǒng)、持久化隊(duì)列/發(fā)布訂閱的消息系統(tǒng)、高性能時(shí)序數(shù)據(jù)存儲(chǔ)系統(tǒng)經(jīng)驗(yàn);在Unix, Linux, Windows, Azure, and AWS上開發(fā)系統(tǒng)的經(jīng)驗(yàn);使用C, C++, C#, and Scala開發(fā)經(jīng)驗(yàn);開發(fā)過商業(yè)系統(tǒng)、開源解決方案;具備platform-as-a-service云組件使用經(jīng)驗(yàn)。不管采用何種開發(fā)語言和平臺(tái),它們都有同樣的流式量測數(shù)據(jù)處理模式,而Akka Streams API是作者使用過的最自然、強(qiáng)大、高效的開發(fā)此類系統(tǒng)的框架。


? Akka Streams是高效描述和執(zhí)行流式應(yīng)用的框架,使用它比actor模型以及系統(tǒng)語言c++、java等更加自然、強(qiáng)大。作為演示,客戶端(往往是移動(dòng)端)使用WebSocket連接上送流式量測數(shù)據(jù)到server,server做解析并存儲(chǔ)入庫。上送的頻度變化跨度很大,有峰值爆發(fā)也有低谷。

? 下面分類總結(jié)了流式量測數(shù)據(jù)處理模式、展示基于Akka Streams API如何可靠實(shí)現(xiàn)這些模式,流式量測消息來自MQ、broker或基于WebSocket的某種協(xié)議,比如從WebSocket接收消息、發(fā)給Database Actor去全異步解析、存儲(chǔ)。

分組消息

? 處理unbounded流式數(shù)據(jù)往往需要先對(duì)其分組,比如處理一批傳感器定時(shí)上報(bào)的量測數(shù)據(jù),當(dāng)將處理后的量測數(shù)據(jù)存入數(shù)據(jù)庫或MQ,一般要按批寫入而不是一次寫一條,這是為了減少請(qǐng)求次數(shù)、提高寫入性能(類似網(wǎng)絡(luò)通訊的合并緩發(fā),但需測試證實(shí)有效并量化效果,因?yàn)橐灿姓f法是每actor獨(dú)立存儲(chǔ)情況下,每次存儲(chǔ)很小的數(shù)據(jù)量,性能也不錯(cuò)。也許是因?yàn)楝F(xiàn)代數(shù)據(jù)庫連接池普遍有所優(yōu)化)?;?Akka Streams API分組消息數(shù)據(jù)很方便,只要增加調(diào)用grouped:

處理無限輸入流,一次向數(shù)據(jù)庫寫入1k條元素

? grouped是標(biāo)準(zhǔn)scala集合Iterable特質(zhì)的方法,可以將Iterable對(duì)象分成指定大小的組,除最后一組可能會(huì)小一些以外,每組大小相同,例:

? val t = Iterable(1 to 16: _*)

? val groups = t.grouped(5)

? groups仍為一個(gè)Iterable,它包含4個(gè)List,前三個(gè)都是5個(gè)元素最后一個(gè)只有一個(gè)16。類似Spark對(duì)scala集合類操作的擴(kuò)展,Stream采納這種泛化的函數(shù)式集合操作。

? 流式數(shù)據(jù)處理系統(tǒng)的一個(gè)天生特點(diǎn):數(shù)據(jù)往往需要及時(shí)處理以及傳輸給其他系統(tǒng)。比如對(duì)于來自傳感器的量測數(shù)據(jù)處理重點(diǎn)關(guān)注的需求有:需要近實(shí)時(shí)的聚合、生成告警或在UI界面展示,而且,傳感數(shù)據(jù)都是秒級(jí)采樣上報(bào)的(頻率高,總數(shù)據(jù)量也不?。7纸Mgroup處理往往會(huì)帶來無法接受的延遲,針對(duì)這一點(diǎn)Akka Streams API定義了 groupedWithin 機(jī)制來分組 events(這就是合并緩發(fā)了,在網(wǎng)絡(luò)通訊中被證實(shí)有效,storm等多種框架采用):

but also emit events within a bounded time-frame, even if the maximum number of events has not been satisfied. This leads to the efficient batching of events, without the introduction of unacceptable latency.

分解消息

? 和分組處理消息同樣很常見的做法卻正相反:將一批消息的聚合分解為它的獨(dú)立元素再處理,比如JSON消息往往是聚合起來的集合,你收到的都是數(shù)組,數(shù)組的元素才是獨(dú)立消息,如下風(fēng)機(jī)上送的json消息

{ 資產(chǎn)編號(hào),信號(hào)采樣時(shí)間,量測值{電量,轉(zhuǎn)速,風(fēng)速} }

? 這樣的消息很常見,一般是按量測類型分解為獨(dú)立的采樣數(shù)據(jù)點(diǎn)、發(fā)送到下游做聚合計(jì)算、存儲(chǔ)入庫或者展示到用戶界面上,這樣一條消息分解之后的獨(dú)立采樣數(shù)據(jù)點(diǎn)使用case class可以表示為:

此處宜加final;signal是一個(gè)量測的名字,value是值

?Akka Streams新手想當(dāng)然的會(huì)想把消息按照signal分組為采樣數(shù)據(jù)點(diǎn)的seq、然后flatten這個(gè)seq、把每個(gè)元素作為一條消息發(fā)往下游,這實(shí)際上會(huì)制造一個(gè)unbounded JSON消息流:

處理單條消息的例子

? mapConcat可以進(jìn)一步簡化:

限流

? 對(duì)于流式系統(tǒng)來說,重要的一點(diǎn)是對(duì)發(fā)往其他服務(wù)的并發(fā)請(qǐng)求做限流,以免壓垮其他系統(tǒng)(壓垮別人可能很容易做到,你可以輕易寫一段并發(fā)代碼寫爆網(wǎng)卡緩存,總體上做到優(yōu)雅服務(wù)降級(jí),也就是負(fù)載均衡、保障service-level agreements,特別是數(shù)據(jù)流源源不斷、消息速率動(dòng)態(tài)變化情況下,對(duì)此Akka Streams提供了無縫支持,當(dāng)待處理的請(qǐng)求飽和,系統(tǒng)會(huì)進(jìn)行反壓、暫停請(qǐng)求,而不是任憑內(nèi)存耗盡:

對(duì)Sample異步、分批存儲(chǔ)入庫,并且限制了待處理消息上限是10個(gè)

節(jié)流

? 傳統(tǒng)多線程編程當(dāng)中,對(duì)消息進(jìn)行節(jié)流有一定挑戰(zhàn),特別是涉及到計(jì)數(shù)器、計(jì)時(shí)器、跨線程同步,節(jié)流消息可能對(duì)集成外部系統(tǒng)很重要。類似限流,在面對(duì)有意無意造成的消息洪流時(shí),節(jié)流消息在維持系統(tǒng)一致性方面(狀態(tài)保持一致,即數(shù)據(jù)及其處理保持完整)同樣重要,那些場景下一些客戶端會(huì)瘋狂發(fā)送消息(也許是bug也許是惡意denial-of-service攻擊),比如,如果預(yù)期一個(gè)客戶端每分鐘向服務(wù)端發(fā)送一條消息,那么服務(wù)端就沒有理由接受這個(gè)客戶端更高頻度的消息。

? 如果上游超出額定頻度,那么throttle元素可以熔斷流,或者通過反壓手段來規(guī)范shape流的行為:

限制上游僅能每秒發(fā)送一條消息

? 消息發(fā)送頻度并不總是一樣的,比如,一個(gè)客戶端在暫時(shí)丟失與服務(wù)端連接時(shí)可能會(huì)緩存消息數(shù)據(jù),一旦連接恢復(fù),就會(huì)補(bǔ)發(fā)所有數(shù)據(jù),這可能會(huì)造成消息洪流,maximumBurst參數(shù)用于允許客戶端爆發(fā)式補(bǔ)發(fā)數(shù)據(jù)的同時(shí)仍然保持節(jié)流消息不要超出上限,可以參考Akka Streams官方文檔了解token-bucket算法。

? 在填充/搬運(yùn)大量數(shù)據(jù)時(shí)節(jié)流也有用處,比如將一整年的量測數(shù)據(jù)從一個(gè)數(shù)據(jù)庫到另一個(gè)庫進(jìn)行遷移,可以對(duì)填充做限流以免壓垮數(shù)據(jù)庫或沖擊其他依賴該數(shù)據(jù)庫的服務(wù),使得填充可以在線進(jìn)行以免維護(hù)性下線服務(wù)。

并發(fā)

? 要構(gòu)建高效、可擴(kuò)展、低延遲的流式數(shù)據(jù)處理系統(tǒng),并發(fā)執(zhí)行任務(wù)是必須的,比如一個(gè)場景是程序從MQ中讀取消息,執(zhí)行一個(gè)CPU密集型任務(wù)、對(duì)消息進(jìn)行轉(zhuǎn)換、之后將轉(zhuǎn)換結(jié)果異步地存儲(chǔ)到數(shù)據(jù)庫。這個(gè)場景要最大化性能怎么設(shè)計(jì)?至少我們可以考慮:在一條線程向數(shù)據(jù)庫寫入結(jié)果的同時(shí),另一條線程可以從MQ讀取下一條待處理消息,也就是至少可以并發(fā)讀寫,更進(jìn)一步,甚至可以在第三個(gè)線程上并發(fā)地執(zhí)行cpu密集型的數(shù)據(jù)轉(zhuǎn)換,理論上這樣肯定能進(jìn)一步提升吞吐量性能(這個(gè)場景在ESB中相當(dāng)?shù)湫停篠EDA,多階段事件驅(qū)動(dòng)架構(gòu))

? Akka Streams默認(rèn)會(huì)順序地(單線程)執(zhí)行一個(gè)流,單線程對(duì)于許多流處理來說就夠了,比如只是執(zhí)行一系列數(shù)據(jù)map轉(zhuǎn)換,默認(rèn)單線程沒毛病,因?yàn)檫@樣可以避免異步傳遞消息的成本、并且最大化利用了代價(jià)最低的young-generation GC.。對(duì)于傳統(tǒng)的多線程編程,不同任務(wù)的并行處理可能非常復(fù)雜,涉及跨線程的數(shù)據(jù)同步,但是使用Akka Streams可以化繁為簡,只要在算子之間的異步邊界上用async描述一下就好。

對(duì)一百萬JSON消息進(jìn)行g(shù)zip編碼解碼(both CPU-intensive operations);單線程
使用兩條線程分別并發(fā)執(zhí)行編碼和解碼,比上述單線程快40%;async就是用于定義異步邊界的,就像劃分SEDA中的Stage

? Akka Streams就是這樣定義異步邊界,但需要你對(duì)數(shù)據(jù)處理管道有深入的理解,比如:哪些異步操作是CPU-bound、哪些是memory-bound、哪些可以異步執(zhí)行.....? 但是畢竟它不像傳統(tǒng)多線程編程那么復(fù)雜,定義異步邊界的方式明顯簡潔直觀得多——因?yàn)樗侠怼⒏唵?,簡單到你可以反?fù)嘗試在何處定義異步邊界效果最好,這才是本來就該使用的自然方式,只要在合適的位置加入async元素。

? 這是流處理所特有的、從ESB繼承下來的優(yōu)勢,因?yàn)閿?shù)據(jù)管道就這一條,在這一條線上,你可以隨意定義異步邊界,這也幫助你最快找到最佳邊界,類似的在遵循SEDA多階段事件驅(qū)動(dòng)的ESB中比如spring integration或者mule,一個(gè)Stage叫做一個(gè)消息流flow,實(shí)際上就是一條線程的運(yùn)行邊界,具體結(jié)構(gòu)是一個(gè)前任務(wù)隊(duì)列后線程池組合,由你來劃分Stage,可粗可細(xì)。實(shí)際上微服務(wù)就是SOA2.0,要說SOA中只有一個(gè)東西有價(jià)值,那只能是ESB,而不是現(xiàn)在的主流聲音所鼓吹的ESB太重了,ESB也對(duì)流式數(shù)據(jù)處理有啟發(fā)意義。SOA敗在服務(wù)間交互協(xié)議,最初的soap基本全盤失敗已經(jīng)涼了、中間亂入的rest也不完全適用這個(gè)場景,因?yàn)樗鼈兌蓟谛阅芎桶姹径疾环€(wěn)定的Http,而Http根本不是設(shè)計(jì)用于服務(wù)間交互的,未來的方向是RSocket.? 微服務(wù)沒有多少新意,與SOA一脈相承,SOA做不好的微服務(wù)一樣:SOA的服務(wù)編排UDDI早早就涼了,微服務(wù)同樣也將無法解決這個(gè)問題;沒有服務(wù)編排的服務(wù)注冊(cè)中心也叫不響了,會(huì)降級(jí)為類似分布式發(fā)布訂閱的功能;SOA里最火的ESB,也就是現(xiàn)在微服務(wù)里最火的ServiceMesh

? 上述async元素是最好最直觀的劃分異步邊界的方式,類似spring integration中的channel,或者mule中的flow,微觀結(jié)構(gòu)都是前任務(wù)隊(duì)列后線程池。

Idle Timeouts

?我發(fā)現(xiàn)idle timeouts空閑超時(shí)在兩種場景下特別有用(兩個(gè)場景都是關(guān)于不受控行為的):一個(gè)是當(dāng)流空閑時(shí)回收資源,這在 WebSocket server上很常見,或者是一個(gè)數(shù)據(jù)聚合管道,停止從上游組件接收數(shù)據(jù)(組件臨時(shí)下線或者被換掉,移動(dòng)客戶端或者其他服務(wù)組件都是不受控的)

一分鐘之后流會(huì)拋出超時(shí)異常,導(dǎo)致流處理中斷并記錄錯(cuò)誤日志,日志會(huì)記錄該設(shè)備ID

? 空閑超時(shí)用令流失效的簡單粗暴方式來回收資源,但體現(xiàn)了更多的問題,比如說為什么連接仍然正常情況下,客戶端在長達(dá)十多分鐘時(shí)間當(dāng)中未與服務(wù)端通訊?,它能保證每分鐘都發(fā)送消息嗎?,如果沒有強(qiáng)制空閑超時(shí),這些可能根本就注意不到,因此,問題并沒有得到解決.,靠個(gè)體很難解決。

? 第二個(gè)強(qiáng)制空閑超時(shí)有用之處是為流式數(shù)據(jù)處理系統(tǒng)開發(fā)功能測試,我經(jīng)常發(fā)現(xiàn)自己在寫驗(yàn)證流輸出的測試,這些流可能會(huì)做數(shù)據(jù)轉(zhuǎn)換、寫數(shù)據(jù)庫或者是接著交給下一個(gè)流處理,測試主要是測業(yè)務(wù)邏輯和輸出正確與否,但是得有數(shù)據(jù)在預(yù)定時(shí)間內(nèi)通過流來處理,在具備復(fù)雜業(yè)務(wù)邏輯的分布式系統(tǒng)中,就用得上空閑超時(shí)來應(yīng)對(duì)消息延遲。


周期性事件

? 流式經(jīng)常使用周期性消息,比如心跳、推送狀態(tài)或者通訊流量水位,可以用于觸發(fā)依賴事件

簡書抽風(fēng),寫了半天全丟了
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1 基本流處理 讓我們首先看看使用akka-stream處理流的真正含義。圖1展示了在某個(gè)處理節(jié)點(diǎn)上,元素是一個(gè)個(gè)...
    樂言筆記閱讀 2,773評(píng)論 1 1
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,547評(píng)論 19 139
  • 目前為止,已經(jīng)討論了機(jī)器學(xué)習(xí)和批處理模式的數(shù)據(jù)挖掘?,F(xiàn)在審視持續(xù)處理流數(shù)據(jù),實(shí)時(shí)檢測其中的事實(shí)和模式,好像從湖泊來...
    abel_cao閱讀 9,217評(píng)論 1 20
  • 與Actor集成 為了將流的元素作為消息傳遞給一個(gè)普通的actor,你可以在mapAsync里使用ask或者使用S...
    樂言筆記閱讀 4,130評(píng)論 0 1
  • Akka幫助您構(gòu)建可靠的應(yīng)用程序在一臺(tái)機(jī)器上使用多個(gè)處理器核心(“擴(kuò)大”)或分布在計(jì)算機(jī)網(wǎng)絡(luò)(“擴(kuò)張”)。關(guān)鍵的抽...
    兒哥欠三百首閱讀 2,810評(píng)論 0 0

友情鏈接更多精彩內(nèi)容