論文概要
"Realtime Data Processing at Facebook"是Facebook 在2016年發(fā)表的論文。論文中介紹了Facebook是如何構建分布式實時數(shù)據(jù)處理系統(tǒng)的。論文的亮點在于:著重介紹了Facebook在設計實時數(shù)據(jù)處理系統(tǒng)過程中,針對系統(tǒng)部分關鍵點的做了哪些設計、以及為什么這樣設計,同時業(yè)界是怎么做的,這些設計點也是當前實時計算領域關鍵設計點。
Facebook實時處理場景
實時數(shù)據(jù)聚合報表。
移動應用內(nèi)分析。
Facebook頁面數(shù)據(jù)統(tǒng)計。
Facebook實時系統(tǒng)現(xiàn)狀
秒級延遲,而非毫米級(秒級延遲幾乎能夠支撐內(nèi)部所有業(yè)務,毫秒級沒有場景);
借助可持久化消息總線(persistent message bus)來進行數(shù)據(jù)傳輸,這種傳輸機制為實現(xiàn)容錯、擴展性和多種正確語義打下了基礎。
每秒數(shù)百GB數(shù)據(jù)處理吞吐。
Facebook的消息總線就是一套分布式消息隊列,基本和kafka類似。
系統(tǒng)架構
Facebook內(nèi)部的實時數(shù)據(jù)處理系統(tǒng)實際是由多個系統(tǒng)組成
左側的mobile或web數(shù)據(jù)被送入到Scribe消息總線。
實時數(shù)據(jù)處理系統(tǒng)Puma、Stylus和Swift從Scribe中讀取數(shù)據(jù)進行處理,然后將處理之后的數(shù)據(jù)在寫入到Scribe中。Puma、Stylus和Swift可以分別單獨使用,可以通過Scribe來來構成一個復雜的DAG。
右側Laser、Suba和Hive是用于提供不同類型查詢的存儲系統(tǒng),他們的數(shù)據(jù)也是從Scribe中攝取。Laser還可以將數(shù)據(jù)提供給流處理系統(tǒng)和線上產(chǎn)品。

分布式數(shù)據(jù)通信Scribe
Scribe是一個秒級延遲、高吞吐、可持久化的分布式消息系統(tǒng)。Scribe中數(shù)據(jù)組織方式和Kafka類似,數(shù)據(jù)通過category(對應kafka中的topic)組織,category內(nèi)部bucket(對應kafka中的partition)是數(shù)據(jù)處理單位。Scribe將數(shù)據(jù)持久化到HDFS中,具備回讀、重復讀能力。
Scribe在Facebook也稱為分布式數(shù)據(jù)傳輸系統(tǒng),意思就是專門用于系統(tǒng)內(nèi)或系統(tǒng)間的數(shù)據(jù)傳輸解決方案。
流處理引擎Puma
Puma是一個流處理系統(tǒng),應用程序使用類SQL語言編寫,UDF使用Java編寫。它的優(yōu)點就是能在很段的時間(1h內(nèi))完成application的編寫、測試和部署。
使用Puma編寫的應用主要有兩個應用目的:
- Stateful計算與服務:為簡單的聚合查詢提供預計算,查詢延遲就等于聚合窗口的大小,查詢結果是通過Thrift API來查詢Puma,Puma的數(shù)據(jù)結果是存儲在HBase上的。(有點類似Queryable state,但是Queryable state不綁定窗口,可查詢窗口內(nèi)數(shù)據(jù)、并且數(shù)據(jù)可以存儲在本地)

計算top K 事件,5min時間窗口
- Stateless計算:Puma提供了Scribe流的過濾和處理能力。
Puma不具備ad-hoc查詢能力,因為在編譯階段對查詢進行了優(yōu)化,所以Puma應用都是部署幾個月甚至幾年的應用。
流處理引擎Swift
Swift是一個具備Checkpoint功能的基礎流處理引擎,它的使用非常簡單靈活,可以指定從Scribe中讀取固定字符串或者字節(jié)來作為一個Checkpoint。如果Swift application 掛掉后,可以從最后一次Checkpoint讀取數(shù)據(jù)。所以能夠保證數(shù)據(jù)至少被處理一次(at least once)。
Swift一般使用Python腳本語言來編寫流應用處理程序。
流處理引擎Stylus
Stylus是一個使用C++編寫的low level的流處理框架,Processor是Stylus的基礎組件。Processor可以是Stateless或Stateful,Processor可以組成一個復雜的DAG。
Processor API基本和我們現(xiàn)在各個流處理引擎一致,Stylus也支持event time和wartmark。
高性能存儲服務Laser
Laser是一個在RocksDB之上構建的,高查詢吞吐量、低延遲的ky存儲服務。Laser的數(shù)據(jù)來源于Scribe或者Hive(每天讀取一次)。Laser能夠被Facebook線上產(chǎn)品以及Puma和Stylus所訪問。
Laser主要有兩個用途:
- 能夠將Puma和Stylus處理結果(Scribe)應用到Facebook的線上產(chǎn)品。
- 將Hive中復雜查詢結果和Scribe數(shù)據(jù)存儲起來,供Puma或Stylus使用。(有點緩存中間結果的意思)
OLAP Store Scuba
Scuba是一個快速切片的分析數(shù)據(jù)存儲,Scuba具有每秒攝入數(shù)百萬行數(shù)據(jù)并插入到數(shù)千個表中的能力。Scuba中的數(shù)據(jù)是由線上產(chǎn)品輸出到Scribe,然后在攝入到Scuba中,這個過程大概有1min延遲。Scuba也支持Puma、Stylus和Swify中的輸出數(shù)據(jù)。
Scuba具備ad-hoc的查詢能力,查詢延遲一般在1s以內(nèi)。同時Scuba通過UI可以展示查詢結果(支持各種圖標)。
數(shù)據(jù)倉庫Hive
Hive在Facebook中是用于存儲EB級別的數(shù)據(jù)倉庫,每天會接收幾PB的數(shù)據(jù)寫入。Hive中的數(shù)據(jù),F(xiàn)acebook使用Presto查詢,Presto提供了完成ANSI SQL查詢語義,查詢結果可以存儲到Laser中,被線上產(chǎn)品或者其它流處理引擎所使用。
設計決策
Paper中提到了Facebook在做流處理系統(tǒng)過程中,在一些關鍵點上做了一些設計決策,這些設計決策對比了業(yè)界已有的方案,并且給出了這些決策對Streaming系統(tǒng)的影響。
Paper中首先說明了流處理系統(tǒng)的存在五方面的重要設計:易用性(Easy of use)、性能(Performance)、容錯(Fault-tolerance)、擴展性(Scalability)和正確性(Correctness)。
易用性:處理是否復雜?SQL是否夠用?是否還需要通用語言(比如java或C++)?用于編寫、測試和部署的速度有多快。
性能:多少延遲是ok的,毫秒級、秒級還是分鐘級?需要多高的吞吐?
容錯:可以容忍什么類型的失敗?數(shù)據(jù)處理或者輸出的次數(shù)保證了什么語義?系統(tǒng)如何存儲和恢復內(nèi)存中的狀態(tài)數(shù)據(jù)。
擴展性:數(shù)據(jù)能不能分片和重分片來并行處理?系統(tǒng)能否根據(jù)流量擴縮容?
正確性:是否需要ACID?所有輸入數(shù)據(jù)是否都需要輸出?
這易用性、性能、容錯、擴展性和正確性基本是現(xiàn)在流系統(tǒng)的對比標準。
Paper分別從語言范式、數(shù)據(jù)傳輸、處理語義、狀態(tài)存儲機制和再處理五方面來說明Facebook在流系統(tǒng)設計之初的設計決策。

語言范式會對系統(tǒng)易用性和性能有所影響。
數(shù)據(jù)傳輸會對易用性(比如debug)、性能、容錯和擴展性有影響。
處理語義會對容錯和正確性有影響。
狀態(tài)存儲機制會對易用性、性能、容錯、擴展性和正確性五方面都有所影響。
再處理對系統(tǒng)易用性、擴展性和正確性有所影響。
Paper對業(yè)界已有流處理引擎和Facebook流處理引擎在上面設計決策進行了對比。

語言范式設計決策
語言范式是指用戶編寫流處理應用時所使用的語言。對于語言范式的選擇決策,決定了應用應用程序編寫的難易以及編寫者對引擎性能的控制粒度。
在流處理場景,可選擇的通用語言范式有三種:
聲明式語言(declarative),主要以SQL為代表。它的優(yōu)勢在于簡單可快速上手,但是表達能力有限,許多系統(tǒng)需要額外增加一些UDF。
函數(shù)式(functional),函數(shù)式編程模型將應用程序表示為一系列預定義的operator。它方便編寫,并且有更多操作(operator)可用,而且能夠控制這些operator的順序。
程序語言(procedural),像c++、java、python都是通用的程序語言,它們具備非常高的靈活性和性能。但是它們往往需要更長時間的編寫和測試。比如Storm、Heron、Samza等。
在Facebook內(nèi)部,由因為沒有單一的語言范式能夠滿足所有場景,所以需要多種語言范式的流處理引擎(針對不同易用性和性能),這也是為什么他們內(nèi)部有三套流處理引擎。Puma使用Sql、Swift使用Python、Stylus使用C++來編寫應用程序。對于函數(shù)式編程,F(xiàn)B內(nèi)部還沒有支持,他們在調(diào)研Spark Streaming這些流處理引擎。
目前主流流計算引擎已經(jīng)能一套runtime支持多種level的api,比如Flink的SQL/Table API 、DataStream/DataSet API和ProcessFunction。
數(shù)據(jù)傳輸設計決策
流處理應用通常是由多個節(jié)點組成的一個復雜的DAG,所以需要在節(jié)點間進行數(shù)據(jù)傳輸。數(shù)據(jù)傳輸對于系統(tǒng)的容錯、性能和擴展性都是非常重要功能。由于應用程序調(diào)試,所以對易用性也有影響。
通用的傳輸機制主要有以下三類:
直接消息傳輸,通常使用RPC或者內(nèi)存消息隊列來實現(xiàn)進程間數(shù)據(jù)傳輸,比如MillWheel、Spark、Flink等都采用RPC進行通信,而Storm則使用ZeroMQ進行通信。直接消息傳輸?shù)膬?yōu)點在于性能非常好,基本能保證端到端在10ms以內(nèi)。
基于代理的消息傳輸,通過單獨的代理節(jié)點來鏈接流處理節(jié)點,并進行消息的轉發(fā)。該模式會增加系統(tǒng)開銷,但系統(tǒng)會有很好的擴展性。代理模式的消息傳輸,還可以將輸入流復用到多個輸出的Processor中,同時也具備背壓的能力。比如Heron所使用的stream manager就是該模式。
基于持久存儲的消息傳輸,streaming的processor鏈接持久化的消息總線(消息隊列),讀寫都直接操作該消息隊列。該模式是可靠性最強的傳輸方式,不僅具備多路復用的能力,數(shù)據(jù)寫入和讀取還能以不同速率處理。該模式也具備完整的單點故障恢復能力。比如Samza就是使用kafka來做數(shù)據(jù)傳輸。
我們上面說了Facebook使用Scribe來做數(shù)據(jù)傳輸,該模式大概會有1s左右的數(shù)據(jù)延遲(并且會落盤,所以也受限于磁盤和網(wǎng)絡io)。之所以Facebook會選用這種模式,主要考慮在當時Facebook內(nèi)部大部分場景都能夠容忍這個延遲,并且該模式為容錯、易用性、擴展性和性能帶來很大便利。
處理語義
處理語義的選擇會影響流系統(tǒng)的容錯和正確性能。
paper首先將流處理系統(tǒng)所做的事情總結為三部分。
處理輸入數(shù)據(jù),比如反序列化、查詢外部系統(tǒng)、更新內(nèi)存狀態(tài)等。
生成輸出,基于輸入數(shù)據(jù)和內(nèi)存狀態(tài),為下游系統(tǒng)生成輸出數(shù)據(jù)。
保存Checkpoint到數(shù)據(jù)庫,用于故障恢復。需要保存的內(nèi)容有三部分:
內(nèi)存中的狀態(tài)。
輸入流的offset。
輸出數(shù)據(jù)。
上面這三部分可以總結為兩種類型的處理語義:
- 狀態(tài)語義,每個輸入事件至少被處理一次(at-least-once)、最多處理一次(at-most-once)和只處理一次(exactly-once)。
- 輸出語義,給定的輸出值至少出現(xiàn)一次、最多出現(xiàn)一次和只出現(xiàn)一次。
狀態(tài)語義就是我們平時所討論的流處理引擎的數(shù)據(jù)處理語義;輸出語義就是我們所說的端到端的數(shù)據(jù)一致性語義。
對于Stateless處理節(jié)點只有輸出語義,對于Stateful處理節(jié)點兩種處理語義都存在。
狀態(tài)語義
狀態(tài)語義只取決于存儲offset和存儲內(nèi)存狀態(tài)的順序。
at-least-once:先存儲內(nèi)存中的狀態(tài),然后在存儲offset。
at-most-once:先存儲offset,然后存儲內(nèi)存中狀態(tài)。
exactly-once:內(nèi)存狀態(tài)和offset存儲是原子語義的,比如通過事務。
下圖是當發(fā)生fo后,不同狀態(tài)語義所帶來的結果。

輸出語義
輸出語義,除了依賴內(nèi)存中狀態(tài)和offset,還取決輸出值的保存。
at-least-once:將結果發(fā)射到輸出流,然后保存內(nèi)存狀態(tài)和offset。(性能高,無需等待檢查點保存)
at-most-once:為offset和內(nèi)存狀態(tài)保存檢查點,然后發(fā)射輸出結果。(先保存Checkpoint,然后發(fā)射結果,需要緩存處理數(shù)據(jù))
exactly-once:為offset和內(nèi)存狀態(tài)保存檢查點,并且發(fā)射輸出結果是一個事務中的原子操作。(processor依賴事務,有性能損耗)

狀態(tài)語義與輸出語義的對應關系:

在Facebook內(nèi)部,雖然對于各種語義需求都有,但是從論文看是非常重視at-least-once處理的。因為at-least-once一方面能提供極致的性能體驗,另一面exactly-once需要借助事務存儲系統(tǒng)。
狀態(tài)存儲機制
狀態(tài)存儲的目的是當發(fā)生fo后,來恢復狀態(tài)。對于狀態(tài)的存儲和恢復,有多種方式。
狀態(tài)節(jié)點多副本,通過啟動多個狀態(tài)節(jié)點,來達到狀態(tài)節(jié)點多副本的能力。該方式會帶來額外的硬件開銷。
本地狀態(tài)存儲,類似Samza將狀態(tài)存儲到本地db,并同時將數(shù)據(jù)寫到kafka。
遠端數(shù)據(jù)庫持久化,將Checkpoint數(shù)據(jù)存儲到遠端數(shù)據(jù)庫中,比如HBase。
上游備份,事件緩存在上游節(jié)點,當當前節(jié)點fo后,上游重放。
全局一致性快照,F(xiàn)link所使用的分布式快照算法,當一個節(jié)點fo后,需要將多個節(jié)點恢復到一致(聯(lián)動fo)。
在Facebook 內(nèi)部對流處理系統(tǒng)的容錯有著不同的需求,Puma提供了有狀態(tài)聚合的容錯,Stylus提供了local database模式和remote database模式。

local database 首先定時將內(nèi)存數(shù)據(jù)寫到local database,后端進程在使用一個更大的時間間隔來將local database數(shù)據(jù)異步同步到HDFS。當處理進程fo時優(yōu)先使用本地數(shù)據(jù)恢復,如果恢復后的進程不在之前的機器,則從HDFS并行拉取數(shù)據(jù)。

remote database( ZippyDB) 不會將狀態(tài)存儲在本地,使用remote database一個主要優(yōu)勢就是快速FO,因為不需要等從遠端下載完整的狀態(tài)數(shù)據(jù)。
回填處理(Backfill processing)
回填處理也就是需要重新處理舊數(shù)據(jù)。
僅流處理,需要數(shù)據(jù)傳輸機制保存足夠長的數(shù)據(jù),以便當需要回放時進行重新處理。
雙系統(tǒng)(lambda架構),流批雙系統(tǒng)進行處理。
批處理環(huán)境可運行流應用的系統(tǒng),比如Spark Streaming、Flink等。
Facebook內(nèi)部使用MapReduce讀取Hive數(shù)據(jù)方式來進行舊數(shù)據(jù)處理。
一些經(jīng)驗教訓
多系統(tǒng)能夠快速實現(xiàn)和迭代(分別針對易用性、容錯、擴展等)、流系統(tǒng)的易用性非常重要(編寫、調(diào)試、部署、監(jiān)控)。
總結
設計目標是秒級延遲而不是毫秒級延遲,是一個重要的設計決策。在Facebook還有很多系統(tǒng)能夠提供毫秒級或微妙級延遲。秒級延遲允許使用消息隊列來做數(shù)據(jù)傳輸,這種傳輸機制方便系統(tǒng)實現(xiàn)容錯、擴展和多種語義的正確性。
易用性和其它特性一樣重要。在Facebook內(nèi)部的黑客文化中,快速實現(xiàn)是非常重要的,產(chǎn)品需要具備正確的學習曲線。簡單的調(diào)試、部署和監(jiān)控,能夠增加系統(tǒng)的使用率。
提供一個正確性的選擇范圍。并不是所有的case都需要ACID語義的,用戶根據(jù)自己需要選擇正確性語義。