論文概要
"Realtime Data Processing at Facebook"是Facebook 在2016年發(fā)表的論文。論文中介紹了Facebook是如何構(gòu)建分布式實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)的。論文的亮點(diǎn)在于:著重介紹了Facebook在設(shè)計(jì)實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)過程中,針對(duì)系統(tǒng)部分關(guān)鍵點(diǎn)的做了哪些設(shè)計(jì)、以及為什么這樣設(shè)計(jì),同時(shí)業(yè)界是怎么做的,這些設(shè)計(jì)點(diǎn)也是當(dāng)前實(shí)時(shí)計(jì)算領(lǐng)域關(guān)鍵設(shè)計(jì)點(diǎn)。
Facebook實(shí)時(shí)處理場景
實(shí)時(shí)數(shù)據(jù)聚合報(bào)表。
移動(dòng)應(yīng)用內(nèi)分析。
Facebook頁面數(shù)據(jù)統(tǒng)計(jì)。
Facebook實(shí)時(shí)系統(tǒng)現(xiàn)狀
秒級(jí)延遲,而非毫米級(jí)(秒級(jí)延遲幾乎能夠支撐內(nèi)部所有業(yè)務(wù),毫秒級(jí)沒有場景);
借助可持久化消息總線(persistent message bus)來進(jìn)行數(shù)據(jù)傳輸,這種傳輸機(jī)制為實(shí)現(xiàn)容錯(cuò)、擴(kuò)展性和多種正確語義打下了基礎(chǔ)。
每秒數(shù)百GB數(shù)據(jù)處理吞吐。
Facebook的消息總線就是一套分布式消息隊(duì)列,基本和kafka類似。
系統(tǒng)架構(gòu)
Facebook內(nèi)部的實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)實(shí)際是由多個(gè)系統(tǒng)組成
左側(cè)的mobile或web數(shù)據(jù)被送入到Scribe消息總線。
實(shí)時(shí)數(shù)據(jù)處理系統(tǒng)Puma、Stylus和Swift從Scribe中讀取數(shù)據(jù)進(jìn)行處理,然后將處理之后的數(shù)據(jù)在寫入到Scribe中。Puma、Stylus和Swift可以分別單獨(dú)使用,可以通過Scribe來來構(gòu)成一個(gè)復(fù)雜的DAG。
右側(cè)Laser、Suba和Hive是用于提供不同類型查詢的存儲(chǔ)系統(tǒng),他們的數(shù)據(jù)也是從Scribe中攝取。Laser還可以將數(shù)據(jù)提供給流處理系統(tǒng)和線上產(chǎn)品。

分布式數(shù)據(jù)通信Scribe
Scribe是一個(gè)秒級(jí)延遲、高吞吐、可持久化的分布式消息系統(tǒng)。Scribe中數(shù)據(jù)組織方式和Kafka類似,數(shù)據(jù)通過category(對(duì)應(yīng)kafka中的topic)組織,category內(nèi)部bucket(對(duì)應(yīng)kafka中的partition)是數(shù)據(jù)處理單位。Scribe將數(shù)據(jù)持久化到HDFS中,具備回讀、重復(fù)讀能力。
Scribe在Facebook也稱為分布式數(shù)據(jù)傳輸系統(tǒng),意思就是專門用于系統(tǒng)內(nèi)或系統(tǒng)間的數(shù)據(jù)傳輸解決方案。
流處理引擎Puma
Puma是一個(gè)流處理系統(tǒng),應(yīng)用程序使用類SQL語言編寫,UDF使用Java編寫。它的優(yōu)點(diǎn)就是能在很段的時(shí)間(1h內(nèi))完成application的編寫、測(cè)試和部署。
使用Puma編寫的應(yīng)用主要有兩個(gè)應(yīng)用目的:
- Stateful計(jì)算與服務(wù):為簡單的聚合查詢提供預(yù)計(jì)算,查詢延遲就等于聚合窗口的大小,查詢結(jié)果是通過Thrift API來查詢Puma,Puma的數(shù)據(jù)結(jié)果是存儲(chǔ)在HBase上的。(有點(diǎn)類似Queryable state,但是Queryable state不綁定窗口,可查詢窗口內(nèi)數(shù)據(jù)、并且數(shù)據(jù)可以存儲(chǔ)在本地)

計(jì)算top K 事件,5min時(shí)間窗口
- Stateless計(jì)算:Puma提供了Scribe流的過濾和處理能力。
Puma不具備ad-hoc查詢能力,因?yàn)樵诰幾g階段對(duì)查詢進(jìn)行了優(yōu)化,所以Puma應(yīng)用都是部署幾個(gè)月甚至幾年的應(yīng)用。
流處理引擎Swift
Swift是一個(gè)具備Checkpoint功能的基礎(chǔ)流處理引擎,它的使用非常簡單靈活,可以指定從Scribe中讀取固定字符串或者字節(jié)來作為一個(gè)Checkpoint。如果Swift application 掛掉后,可以從最后一次Checkpoint讀取數(shù)據(jù)。所以能夠保證數(shù)據(jù)至少被處理一次(at least once)。
Swift一般使用Python腳本語言來編寫流應(yīng)用處理程序。
流處理引擎Stylus
Stylus是一個(gè)使用C++編寫的low level的流處理框架,Processor是Stylus的基礎(chǔ)組件。Processor可以是Stateless或Stateful,Processor可以組成一個(gè)復(fù)雜的DAG。
Processor API基本和我們現(xiàn)在各個(gè)流處理引擎一致,Stylus也支持event time和wartmark。
高性能存儲(chǔ)服務(wù)Laser
Laser是一個(gè)在RocksDB之上構(gòu)建的,高查詢吞吐量、低延遲的ky存儲(chǔ)服務(wù)。Laser的數(shù)據(jù)來源于Scribe或者Hive(每天讀取一次)。Laser能夠被Facebook線上產(chǎn)品以及Puma和Stylus所訪問。
Laser主要有兩個(gè)用途:
- 能夠?qū)uma和Stylus處理結(jié)果(Scribe)應(yīng)用到Facebook的線上產(chǎn)品。
- 將Hive中復(fù)雜查詢結(jié)果和Scribe數(shù)據(jù)存儲(chǔ)起來,供Puma或Stylus使用。(有點(diǎn)緩存中間結(jié)果的意思)
OLAP Store Scuba
Scuba是一個(gè)快速切片的分析數(shù)據(jù)存儲(chǔ),Scuba具有每秒攝入數(shù)百萬行數(shù)據(jù)并插入到數(shù)千個(gè)表中的能力。Scuba中的數(shù)據(jù)是由線上產(chǎn)品輸出到Scribe,然后在攝入到Scuba中,這個(gè)過程大概有1min延遲。Scuba也支持Puma、Stylus和Swify中的輸出數(shù)據(jù)。
Scuba具備ad-hoc的查詢能力,查詢延遲一般在1s以內(nèi)。同時(shí)Scuba通過UI可以展示查詢結(jié)果(支持各種圖標(biāo))。
數(shù)據(jù)倉庫Hive
Hive在Facebook中是用于存儲(chǔ)EB級(jí)別的數(shù)據(jù)倉庫,每天會(huì)接收幾PB的數(shù)據(jù)寫入。Hive中的數(shù)據(jù),F(xiàn)acebook使用Presto查詢,Presto提供了完成ANSI SQL查詢語義,查詢結(jié)果可以存儲(chǔ)到Laser中,被線上產(chǎn)品或者其它流處理引擎所使用。
設(shè)計(jì)決策
Paper中提到了Facebook在做流處理系統(tǒng)過程中,在一些關(guān)鍵點(diǎn)上做了一些設(shè)計(jì)決策,這些設(shè)計(jì)決策對(duì)比了業(yè)界已有的方案,并且給出了這些決策對(duì)Streaming系統(tǒng)的影響。
Paper中首先說明了流處理系統(tǒng)的存在五方面的重要設(shè)計(jì):易用性(Easy of use)、性能(Performance)、容錯(cuò)(Fault-tolerance)、擴(kuò)展性(Scalability)和正確性(Correctness)。
易用性:處理是否復(fù)雜?SQL是否夠用?是否還需要通用語言(比如java或C++)?用于編寫、測(cè)試和部署的速度有多快。
性能:多少延遲是ok的,毫秒級(jí)、秒級(jí)還是分鐘級(jí)?需要多高的吞吐?
容錯(cuò):可以容忍什么類型的失敗?數(shù)據(jù)處理或者輸出的次數(shù)保證了什么語義?系統(tǒng)如何存儲(chǔ)和恢復(fù)內(nèi)存中的狀態(tài)數(shù)據(jù)。
擴(kuò)展性:數(shù)據(jù)能不能分片和重分片來并行處理?系統(tǒng)能否根據(jù)流量擴(kuò)縮容?
正確性:是否需要ACID?所有輸入數(shù)據(jù)是否都需要輸出?
這易用性、性能、容錯(cuò)、擴(kuò)展性和正確性基本是現(xiàn)在流系統(tǒng)的對(duì)比標(biāo)準(zhǔn)。
Paper分別從語言范式、數(shù)據(jù)傳輸、處理語義、狀態(tài)存儲(chǔ)機(jī)制和再處理五方面來說明Facebook在流系統(tǒng)設(shè)計(jì)之初的設(shè)計(jì)決策。

語言范式會(huì)對(duì)系統(tǒng)易用性和性能有所影響。
數(shù)據(jù)傳輸會(huì)對(duì)易用性(比如debug)、性能、容錯(cuò)和擴(kuò)展性有影響。
處理語義會(huì)對(duì)容錯(cuò)和正確性有影響。
狀態(tài)存儲(chǔ)機(jī)制會(huì)對(duì)易用性、性能、容錯(cuò)、擴(kuò)展性和正確性五方面都有所影響。
再處理對(duì)系統(tǒng)易用性、擴(kuò)展性和正確性有所影響。
Paper對(duì)業(yè)界已有流處理引擎和Facebook流處理引擎在上面設(shè)計(jì)決策進(jìn)行了對(duì)比。

語言范式設(shè)計(jì)決策
語言范式是指用戶編寫流處理應(yīng)用時(shí)所使用的語言。對(duì)于語言范式的選擇決策,決定了應(yīng)用應(yīng)用程序編寫的難易以及編寫者對(duì)引擎性能的控制粒度。
在流處理場景,可選擇的通用語言范式有三種:
聲明式語言(declarative),主要以SQL為代表。它的優(yōu)勢(shì)在于簡單可快速上手,但是表達(dá)能力有限,許多系統(tǒng)需要額外增加一些UDF。
函數(shù)式(functional),函數(shù)式編程模型將應(yīng)用程序表示為一系列預(yù)定義的operator。它方便編寫,并且有更多操作(operator)可用,而且能夠控制這些operator的順序。
程序語言(procedural),像c++、java、python都是通用的程序語言,它們具備非常高的靈活性和性能。但是它們往往需要更長時(shí)間的編寫和測(cè)試。比如Storm、Heron、Samza等。
在Facebook內(nèi)部,由因?yàn)闆]有單一的語言范式能夠滿足所有場景,所以需要多種語言范式的流處理引擎(針對(duì)不同易用性和性能),這也是為什么他們內(nèi)部有三套流處理引擎。Puma使用Sql、Swift使用Python、Stylus使用C++來編寫應(yīng)用程序。對(duì)于函數(shù)式編程,F(xiàn)B內(nèi)部還沒有支持,他們?cè)谡{(diào)研Spark Streaming這些流處理引擎。
目前主流流計(jì)算引擎已經(jīng)能一套runtime支持多種level的api,比如Flink的SQL/Table API 、DataStream/DataSet API和ProcessFunction。
數(shù)據(jù)傳輸設(shè)計(jì)決策
流處理應(yīng)用通常是由多個(gè)節(jié)點(diǎn)組成的一個(gè)復(fù)雜的DAG,所以需要在節(jié)點(diǎn)間進(jìn)行數(shù)據(jù)傳輸。數(shù)據(jù)傳輸對(duì)于系統(tǒng)的容錯(cuò)、性能和擴(kuò)展性都是非常重要功能。由于應(yīng)用程序調(diào)試,所以對(duì)易用性也有影響。
通用的傳輸機(jī)制主要有以下三類:
直接消息傳輸,通常使用RPC或者內(nèi)存消息隊(duì)列來實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)傳輸,比如MillWheel、Spark、Flink等都采用RPC進(jìn)行通信,而Storm則使用ZeroMQ進(jìn)行通信。直接消息傳輸?shù)膬?yōu)點(diǎn)在于性能非常好,基本能保證端到端在10ms以內(nèi)。
基于代理的消息傳輸,通過單獨(dú)的代理節(jié)點(diǎn)來鏈接流處理節(jié)點(diǎn),并進(jìn)行消息的轉(zhuǎn)發(fā)。該模式會(huì)增加系統(tǒng)開銷,但系統(tǒng)會(huì)有很好的擴(kuò)展性。代理模式的消息傳輸,還可以將輸入流復(fù)用到多個(gè)輸出的Processor中,同時(shí)也具備背壓的能力。比如Heron所使用的stream manager就是該模式。
基于持久存儲(chǔ)的消息傳輸,streaming的processor鏈接持久化的消息總線(消息隊(duì)列),讀寫都直接操作該消息隊(duì)列。該模式是可靠性最強(qiáng)的傳輸方式,不僅具備多路復(fù)用的能力,數(shù)據(jù)寫入和讀取還能以不同速率處理。該模式也具備完整的單點(diǎn)故障恢復(fù)能力。比如Samza就是使用kafka來做數(shù)據(jù)傳輸。
我們上面說了Facebook使用Scribe來做數(shù)據(jù)傳輸,該模式大概會(huì)有1s左右的數(shù)據(jù)延遲(并且會(huì)落盤,所以也受限于磁盤和網(wǎng)絡(luò)io)。之所以Facebook會(huì)選用這種模式,主要考慮在當(dāng)時(shí)Facebook內(nèi)部大部分場景都能夠容忍這個(gè)延遲,并且該模式為容錯(cuò)、易用性、擴(kuò)展性和性能帶來很大便利。
處理語義
處理語義的選擇會(huì)影響流系統(tǒng)的容錯(cuò)和正確性能。
paper首先將流處理系統(tǒng)所做的事情總結(jié)為三部分。
處理輸入數(shù)據(jù),比如反序列化、查詢外部系統(tǒng)、更新內(nèi)存狀態(tài)等。
生成輸出,基于輸入數(shù)據(jù)和內(nèi)存狀態(tài),為下游系統(tǒng)生成輸出數(shù)據(jù)。
保存Checkpoint到數(shù)據(jù)庫,用于故障恢復(fù)。需要保存的內(nèi)容有三部分:
內(nèi)存中的狀態(tài)。
輸入流的offset。
輸出數(shù)據(jù)。
上面這三部分可以總結(jié)為兩種類型的處理語義:
- 狀態(tài)語義,每個(gè)輸入事件至少被處理一次(at-least-once)、最多處理一次(at-most-once)和只處理一次(exactly-once)。
- 輸出語義,給定的輸出值至少出現(xiàn)一次、最多出現(xiàn)一次和只出現(xiàn)一次。
狀態(tài)語義就是我們平時(shí)所討論的流處理引擎的數(shù)據(jù)處理語義;輸出語義就是我們所說的端到端的數(shù)據(jù)一致性語義。
對(duì)于Stateless處理節(jié)點(diǎn)只有輸出語義,對(duì)于Stateful處理節(jié)點(diǎn)兩種處理語義都存在。
狀態(tài)語義
狀態(tài)語義只取決于存儲(chǔ)offset和存儲(chǔ)內(nèi)存狀態(tài)的順序。
at-least-once:先存儲(chǔ)內(nèi)存中的狀態(tài),然后在存儲(chǔ)offset。
at-most-once:先存儲(chǔ)offset,然后存儲(chǔ)內(nèi)存中狀態(tài)。
exactly-once:內(nèi)存狀態(tài)和offset存儲(chǔ)是原子語義的,比如通過事務(wù)。
下圖是當(dāng)發(fā)生fo后,不同狀態(tài)語義所帶來的結(jié)果。

輸出語義
輸出語義,除了依賴內(nèi)存中狀態(tài)和offset,還取決輸出值的保存。
at-least-once:將結(jié)果發(fā)射到輸出流,然后保存內(nèi)存狀態(tài)和offset。(性能高,無需等待檢查點(diǎn)保存)
at-most-once:為offset和內(nèi)存狀態(tài)保存檢查點(diǎn),然后發(fā)射輸出結(jié)果。(先保存Checkpoint,然后發(fā)射結(jié)果,需要緩存處理數(shù)據(jù))
exactly-once:為offset和內(nèi)存狀態(tài)保存檢查點(diǎn),并且發(fā)射輸出結(jié)果是一個(gè)事務(wù)中的原子操作。(processor依賴事務(wù),有性能損耗)

狀態(tài)語義與輸出語義的對(duì)應(yīng)關(guān)系:

在Facebook內(nèi)部,雖然對(duì)于各種語義需求都有,但是從論文看是非常重視at-least-once處理的。因?yàn)閍t-least-once一方面能提供極致的性能體驗(yàn),另一面exactly-once需要借助事務(wù)存儲(chǔ)系統(tǒng)。
狀態(tài)存儲(chǔ)機(jī)制
狀態(tài)存儲(chǔ)的目的是當(dāng)發(fā)生fo后,來恢復(fù)狀態(tài)。對(duì)于狀態(tài)的存儲(chǔ)和恢復(fù),有多種方式。
狀態(tài)節(jié)點(diǎn)多副本,通過啟動(dòng)多個(gè)狀態(tài)節(jié)點(diǎn),來達(dá)到狀態(tài)節(jié)點(diǎn)多副本的能力。該方式會(huì)帶來額外的硬件開銷。
本地狀態(tài)存儲(chǔ),類似Samza將狀態(tài)存儲(chǔ)到本地db,并同時(shí)將數(shù)據(jù)寫到kafka。
遠(yuǎn)端數(shù)據(jù)庫持久化,將Checkpoint數(shù)據(jù)存儲(chǔ)到遠(yuǎn)端數(shù)據(jù)庫中,比如HBase。
上游備份,事件緩存在上游節(jié)點(diǎn),當(dāng)當(dāng)前節(jié)點(diǎn)fo后,上游重放。
全局一致性快照,F(xiàn)link所使用的分布式快照算法,當(dāng)一個(gè)節(jié)點(diǎn)fo后,需要將多個(gè)節(jié)點(diǎn)恢復(fù)到一致(聯(lián)動(dòng)fo)。
在Facebook 內(nèi)部對(duì)流處理系統(tǒng)的容錯(cuò)有著不同的需求,Puma提供了有狀態(tài)聚合的容錯(cuò),Stylus提供了local database模式和remote database模式。

local database 首先定時(shí)將內(nèi)存數(shù)據(jù)寫到local database,后端進(jìn)程在使用一個(gè)更大的時(shí)間間隔來將local database數(shù)據(jù)異步同步到HDFS。當(dāng)處理進(jìn)程fo時(shí)優(yōu)先使用本地?cái)?shù)據(jù)恢復(fù),如果恢復(fù)后的進(jìn)程不在之前的機(jī)器,則從HDFS并行拉取數(shù)據(jù)。

remote database( ZippyDB) 不會(huì)將狀態(tài)存儲(chǔ)在本地,使用remote database一個(gè)主要優(yōu)勢(shì)就是快速FO,因?yàn)椴恍枰葟倪h(yuǎn)端下載完整的狀態(tài)數(shù)據(jù)。
回填處理(Backfill processing)
回填處理也就是需要重新處理舊數(shù)據(jù)。
僅流處理,需要數(shù)據(jù)傳輸機(jī)制保存足夠長的數(shù)據(jù),以便當(dāng)需要回放時(shí)進(jìn)行重新處理。
雙系統(tǒng)(lambda架構(gòu)),流批雙系統(tǒng)進(jìn)行處理。
批處理環(huán)境可運(yùn)行流應(yīng)用的系統(tǒng),比如Spark Streaming、Flink等。
Facebook內(nèi)部使用MapReduce讀取Hive數(shù)據(jù)方式來進(jìn)行舊數(shù)據(jù)處理。
一些經(jīng)驗(yàn)教訓(xùn)
多系統(tǒng)能夠快速實(shí)現(xiàn)和迭代(分別針對(duì)易用性、容錯(cuò)、擴(kuò)展等)、流系統(tǒng)的易用性非常重要(編寫、調(diào)試、部署、監(jiān)控)。
總結(jié)
設(shè)計(jì)目標(biāo)是秒級(jí)延遲而不是毫秒級(jí)延遲,是一個(gè)重要的設(shè)計(jì)決策。在Facebook還有很多系統(tǒng)能夠提供毫秒級(jí)或微妙級(jí)延遲。秒級(jí)延遲允許使用消息隊(duì)列來做數(shù)據(jù)傳輸,這種傳輸機(jī)制方便系統(tǒng)實(shí)現(xiàn)容錯(cuò)、擴(kuò)展和多種語義的正確性。
易用性和其它特性一樣重要。在Facebook內(nèi)部的黑客文化中,快速實(shí)現(xiàn)是非常重要的,產(chǎn)品需要具備正確的學(xué)習(xí)曲線。簡單的調(diào)試、部署和監(jiān)控,能夠增加系統(tǒng)的使用率。
提供一個(gè)正確性的選擇范圍。并不是所有的case都需要ACID語義的,用戶根據(jù)自己需要選擇正確性語義。