MapReduce:批處理的基石
MapReduce 的核心思想
Map 階段:將輸入數(shù)據(jù)拆分為多個(gè)分片(Split),并行處理生成中間鍵值對(duì)(Key-Value Pairs)。
Shuffle 階段:隱式階段,負(fù)責(zé)將 Map 輸出排序、分組并傳輸?shù)?Reduce 節(jié)點(diǎn)。
Reduce 階段:對(duì)中間結(jié)果按 Key 分組,進(jìn)行聚合或轉(zhuǎn)換,輸出最終結(jié)果。
-
編程模型
# Map 函數(shù):處理輸入,生成中間鍵值對(duì) def map(key, value): for item in process(value): yield (intermediate_key, intermediate_value) # Reduce 函數(shù):聚合相同 Key 的值 def reduce(key, values): result = aggregate(values) yield (key, result)
MapReduce 執(zhí)行流程
-
輸入分片(Input Splits)
- 數(shù)據(jù)存儲(chǔ)于分布式文件系統(tǒng)(如 HDFS),按固定大小(如 128MB)分片。
- 每個(gè)分片啟動(dòng)一個(gè) Map 任務(wù),由框架自動(dòng)調(diào)度到集群節(jié)點(diǎn)。
-
Map 階段
- 讀取輸入:從 HDFS 讀取分片數(shù)據(jù)(如文本文件的一行)。
- 執(zhí)行 Map 函數(shù):用戶自定義邏輯處理數(shù)據(jù),輸出中間鍵值對(duì)。
- 寫入本地磁盤:Map 結(jié)果按 Key 分區(qū)(Partition)后寫入節(jié)點(diǎn)本地磁盤(非 HDFS)。
-
Shuffle 與排序
- Fetch 階段:Reduce 任務(wù)從所有 Map 節(jié)點(diǎn)拉取對(duì)應(yīng)分區(qū)的數(shù)據(jù)。
- 排序與合并:拉取的數(shù)據(jù)按 Key 排序,合并相同 Key 的值列表。
-
Reduce 階段
- 執(zhí)行 Reduce 函數(shù):對(duì)排序后的鍵值對(duì)進(jìn)行聚合(如求和、去重)。
- 輸出結(jié)果:結(jié)果寫入 HDFS,每個(gè) Reduce 任務(wù)生成一個(gè)輸出文件。
MapReduce 的關(guān)鍵機(jī)制
-
容錯(cuò)與恢復(fù)
-
Task 失敗:
Map/Reduce 任務(wù)失敗時(shí),框架重新調(diào)度到其他節(jié)點(diǎn)執(zhí)行
已完成的 Map 任務(wù)需重新執(zhí)行(因中間結(jié)果存儲(chǔ)在失敗節(jié)點(diǎn)的本地磁盤)。
節(jié)點(diǎn)故障:通過心跳檢測(cè)(Heartbeat)發(fā)現(xiàn)宕機(jī)節(jié)點(diǎn),重新分配其任務(wù)。
-
-
數(shù)據(jù)本地性優(yōu)化
- 將 Map 任務(wù)調(diào)度到存儲(chǔ)輸入分片的節(jié)點(diǎn),避免跨網(wǎng)絡(luò)讀取數(shù)據(jù)。
- 若本地節(jié)點(diǎn)繁忙,選擇同一機(jī)架內(nèi)的節(jié)點(diǎn)(機(jī)架感知策略)。
-
Combiner 優(yōu)化
- 局部聚合:在 Map 階段后、Shuffle 前,對(duì)中間結(jié)果進(jìn)行預(yù)聚合(類似 Reduce)。
- 減少數(shù)據(jù)傳輸:例如 WordCount 中,Map 節(jié)點(diǎn)先合并相同單詞的計(jì)數(shù)。
現(xiàn)代批處理框架
Apache Spark
-
內(nèi)存計(jì)算:
- 將中間數(shù)據(jù)緩存到內(nèi)存(RDD),減少磁盤I/O。
- 相比MapReduce,迭代算法性能提升10-100倍。
-
DAG執(zhí)行引擎:
- 將作業(yè)拆分為有向無環(huán)圖(DAG),優(yōu)化任務(wù)調(diào)度。
- 支持多階段任務(wù)合并(如Map后直接Reduce,跳過Shuffle)。
統(tǒng)一API:支持批處理(Spark Core)、流處理(Spark Streaming)、機(jī)器學(xué)習(xí)(MLlib)。
執(zhí)行引擎優(yōu)化
- 向量化處理:使用SIMD指令批量處理數(shù)據(jù)(如Apache Arrow內(nèi)存格式)。
- 動(dòng)態(tài)資源分配:根據(jù)任務(wù)負(fù)載動(dòng)態(tài)調(diào)整CPU/內(nèi)存(如YARN或Kubernetes調(diào)度器)。
存儲(chǔ)優(yōu)化:列式存儲(chǔ)
-
Parquet/ORC格式:
- 按列存儲(chǔ),高壓縮率(同列數(shù)據(jù)類型一致)。
- 支持謂詞下推(Predicate Pushdown),僅讀取需要的列。
數(shù)據(jù)分區(qū)分桶:按時(shí)間或哈希分區(qū),加速過濾和聚合。
批處理系統(tǒng)架構(gòu)
數(shù)據(jù)存儲(chǔ)層
-
分布式文件系統(tǒng)
- HDFS(Hadoop Distributed File System) :分塊存儲(chǔ)(128MB/塊),多副本冗余;適合存儲(chǔ)大文件,如日志、原始數(shù)據(jù)。
- 云存儲(chǔ)(S3、GCS) :對(duì)象存儲(chǔ)服務(wù),彈性擴(kuò)展,按需付費(fèi)。
-
數(shù)據(jù)湖(Data Lake)
- 原始數(shù)據(jù)存儲(chǔ):支持結(jié)構(gòu)化(CSV)、半結(jié)構(gòu)化(JSON)、非結(jié)構(gòu)化(圖片)數(shù)據(jù)。
- 列式存儲(chǔ)格式:Parquet-高壓縮率,適合OLAP查詢;ORC-優(yōu)化Hive查詢性能,支持謂詞下推。
-
數(shù)據(jù)倉庫(Data Warehouse)
- 結(jié)構(gòu)化存儲(chǔ):基于星型/雪花模型,優(yōu)化聚合查詢。
- 代表系統(tǒng):Snowflake、Redshift(云原生數(shù)倉);Hive(基于HDFS的SQL查詢引擎)。
計(jì)算引擎層
-
MapReduce(Hadoop)
- 經(jīng)典批處理模型:分Map、Shuffle、Reduce三階段。
- 適用場(chǎng)景:簡(jiǎn)單ETL、離線分析。
-
Apache Spark
- 內(nèi)存計(jì)算:通過RDD(彈性分布式數(shù)據(jù)集)減少磁盤I/O。
- DAG調(diào)度:優(yōu)化任務(wù)依賴,支持多階段流水線執(zhí)行。
- 統(tǒng)一API:支持批處理(Spark SQL)、流處理(Structured Streaming)、機(jī)器學(xué)習(xí)(MLlib)。
-
Apache Flink(批處理模式)
- 流批一體:同一API處理批和流數(shù)據(jù)(DataSet API)。
- 增量計(jì)算:優(yōu)化迭代任務(wù)(如圖計(jì)算、機(jī)器學(xué)習(xí))。
-
Presto/Trino
- 分布式SQL引擎:直接查詢數(shù)據(jù)湖(如Hive表、S3文件)。
- 聯(lián)邦查詢:跨數(shù)據(jù)源聯(lián)合分析(如MySQL + Hive)。
資源管理層
-
Hadoop YARN
- 資源調(diào)度:將集群資源劃分為容器(Container),分配給MapReduce、Spark等任務(wù)。
- 隊(duì)列管理:支持多租戶資源隔離(如生產(chǎn)隊(duì)列 vs. 實(shí)驗(yàn)隊(duì)列)。
-
Kubernetes
容器化部署:將批處理作業(yè)封裝為Pod,動(dòng)態(tài)擴(kuò)縮容。
-
Operator模式:
Spark Operator:在K8s上原生運(yùn)行Spark作業(yè)。
Flink Kubernetes Session:管理Flink集群生命周期。
工作流調(diào)度層
-
Apache Airflow
- DAG定義:通過Python代碼定義任務(wù)依賴關(guān)系。
- 監(jiān)控與重試:可視化任務(wù)狀態(tài),自動(dòng)重試失敗任務(wù)。
-
Oozie(Hadoop生態(tài))
- XML配置:定義MapReduce、Hive、Sqoop任務(wù)流。
- 協(xié)調(diào)器(Coordinator) :定時(shí)觸發(fā)工作流(如每天凌晨執(zhí)行ETL)。
服務(wù)層
-
數(shù)據(jù)服務(wù)API
- RESTful API:通過HTTP接口暴露聚合結(jié)果(如統(tǒng)計(jì)報(bào)表)。
- GraphQL:靈活查詢數(shù)據(jù)湖中的多源數(shù)據(jù)。
-
BI與可視化工具
- Tableau/Power BI:連接數(shù)據(jù)倉庫,生成交互式儀表盤。
- Superset:開源BI工具,支持SQL Lab直接查詢。
-
機(jī)器學(xué)習(xí)模型服務(wù)
- 批量預(yù)測(cè):定期生成用戶推薦列表、風(fēng)險(xiǎn)評(píng)分。
- 模型更新:每天訓(xùn)練新模型并發(fā)布到生產(chǎn)環(huán)境。
批處理應(yīng)用場(chǎng)景
ETL(Extract, Transform, Load)
- 數(shù)據(jù)清洗:過濾無效記錄(如空值、異常值)、標(biāo)準(zhǔn)化格式(日期、貨幣單位)。
- 數(shù)據(jù)轉(zhuǎn)換:合并多源數(shù)據(jù)(如用戶信息與訂單記錄關(guān)聯(lián))、計(jì)算衍生字段(如用戶年齡=當(dāng)前日期-出生日期)。
- 數(shù)據(jù)加載:將處理后的數(shù)據(jù)寫入目標(biāo)存儲(chǔ)(如Hive表、Snowflake數(shù)倉)。
離線機(jī)器學(xué)習(xí)
- 特征工程:批量生成特征(如用戶過去30天的購買總額)。
- 模型訓(xùn)練:在分布式集群上訓(xùn)練分類、回歸或聚類模型。
- 離線評(píng)估:計(jì)算模型指標(biāo)(如準(zhǔn)確率、AUC)。
數(shù)據(jù)倉庫構(gòu)建
-
貼源層(ODS,Operational Data Store)
- 全量/增量同步:通過ETL工具從業(yè)務(wù)庫(MySQL、Oracle)實(shí)時(shí)或定期抽取數(shù)據(jù)。
- 數(shù)據(jù)原樣存儲(chǔ):保留原始格式(如JSON日志、數(shù)據(jù)庫表結(jié)構(gòu)),不做業(yè)務(wù)邏輯處理。
- 短期存儲(chǔ):通常保留最近7~30天的數(shù)據(jù),供問題回溯使用。
-
明細(xì)層(DWD,Data Warehouse Detail)
- 數(shù)據(jù)清洗:處理缺失值、去重、統(tǒng)一格式(如時(shí)間戳標(biāo)準(zhǔn)化)。
- 維度退化:將多張表關(guān)聯(lián)為寬表(如訂單表+商品表→訂單明細(xì)寬表)。
- 業(yè)務(wù)邏輯解耦:屏蔽底層業(yè)務(wù)系統(tǒng)的表結(jié)構(gòu)差異(如不同系統(tǒng)的用戶ID映射)。
-
匯總層(DWS,Data Warehouse Summary)
- 維度聚合:按時(shí)間、地域、產(chǎn)品等維度統(tǒng)計(jì)指標(biāo)(如每日銷售額、用戶活躍數(shù))。
- 主題域劃分:按業(yè)務(wù)主題(如交易、流量、用戶)組織數(shù)據(jù)。
- 中間結(jié)果復(fù)用:避免重復(fù)計(jì)算,加速上層應(yīng)用查詢。
-
應(yīng)用層(ADS,Application Data Service)
- 業(yè)務(wù)指標(biāo)封裝:生成報(bào)表、BI看板所需的數(shù)據(jù)(如GMV、轉(zhuǎn)化率)。
- 跨主題整合:融合多個(gè)主題數(shù)據(jù)(如用戶畫像+交易數(shù)據(jù)→高價(jià)值用戶列表)。
- 數(shù)據(jù)服務(wù)化:通過API或數(shù)據(jù)接口暴露給業(yè)務(wù)系統(tǒng)(如推薦系統(tǒng)、風(fēng)控系統(tǒng))。
-
維度層(DIM,Dimension)
- 維度一致性:統(tǒng)一各層的維度定義(如地區(qū)、時(shí)間、產(chǎn)品分類)。
- 緩慢變化維(SCD)管理:處理維度屬性變化(如用戶地址變更)。
- 字典表存儲(chǔ):碼值映射表(如狀態(tài)碼→中文描述)。
批處理系統(tǒng)優(yōu)化
計(jì)算性能優(yōu)化
| 優(yōu)化方向 | 具體措施 | 案例/工具 |
|---|---|---|
| 內(nèi)存計(jì)算 | 將中間數(shù)據(jù)緩存至內(nèi)存,減少磁盤I/O。 | Spark RDD緩存、Flink托管內(nèi)存 |
| 向量化執(zhí)行 | 使用SIMD指令批量處理數(shù)據(jù),提升CPU利用率。 | Apache Arrow、Presto向量化引擎 |
| 動(dòng)態(tài)代碼生成 | 運(yùn)行時(shí)生成優(yōu)化代碼,避免解釋執(zhí)行開銷。 | Spark Catalyst優(yōu)化器、LLVM編譯 |
| 數(shù)據(jù)本地性 | 調(diào)度任務(wù)到數(shù)據(jù)所在節(jié)點(diǎn),減少網(wǎng)絡(luò)傳輸。 | Hadoop機(jī)架感知、Kubernetes拓?fù)湔{(diào)度 |
數(shù)據(jù)傾斜處理
| 優(yōu)化方向 | 具體措施 | 適用場(chǎng)景 |
|---|---|---|
| Salting(加鹽) | 為Key添加隨機(jī)前綴,將熱點(diǎn)數(shù)據(jù)分散到多個(gè)分區(qū)。 | 大Key聚合(如用戶ID熱點(diǎn)) |
| 兩階段聚合 | 先局部聚合(Map端Combiner),再全局聚合(Reduce端)。 | 高基數(shù)Key統(tǒng)計(jì)(如PV計(jì)數(shù)) |
| 動(dòng)態(tài)分區(qū)調(diào)整 | 根據(jù)數(shù)據(jù)分布自動(dòng)調(diào)整分區(qū)策略(如范圍分區(qū)→哈希分區(qū))。 | Spark自適應(yīng)查詢(AQE) |
| 傾斜Key隔離 | 識(shí)別熱點(diǎn)Key單獨(dú)處理,其余正常計(jì)算。 | 電商大促期間頭部商品訂單分析 |
容錯(cuò)與一致性保障
| 優(yōu)化方向 | 具體措施 | 技術(shù)實(shí)現(xiàn) |
|---|---|---|
| 檢查點(diǎn)機(jī)制 | 定期保存任務(wù)狀態(tài),故障時(shí)從檢查點(diǎn)恢復(fù)。 | Spark Checkpoint、Flink Savepoints |
| 冪等性設(shè)計(jì) | 確保任務(wù)重試不會(huì)導(dǎo)致重復(fù)結(jié)果。 | 數(shù)據(jù)庫UPSERT、Kafka生產(chǎn)者冪等配置 |
| 輸出提交協(xié)議 | 僅在所有任務(wù)成功后提交最終結(jié)果,避免部分輸出。 | Hadoop _SUCCESS文件、S3一致性模型 |
| 副本冗余 | 關(guān)鍵數(shù)據(jù)多副本存儲(chǔ),預(yù)防節(jié)點(diǎn)故障。 | HDFS 3副本策略、RAID存儲(chǔ) |
資源效率提升
| 優(yōu)化方向 | 具體措施 | 工具/框架 |
|---|---|---|
| 動(dòng)態(tài)資源分配 | 根據(jù)負(fù)載自動(dòng)擴(kuò)縮容,空閑時(shí)釋放資源。 | Kubernetes HPA、YARN彈性調(diào)度 |
| 資源隔離 | 通過容器化(Docker)或隊(duì)列隔離(YARN隊(duì)列)避免作業(yè)間干擾。 | YARN Capacity Scheduler、Mesos資源隔離 |
| 流批資源復(fù)用 | 共享集群運(yùn)行批處理和流任務(wù),提高資源利用率。 | Flink統(tǒng)一運(yùn)行時(shí)、Spark Structured Streaming |
存儲(chǔ)與數(shù)據(jù)管理優(yōu)化
| 優(yōu)化方向 | 具體措施 | 技術(shù)方案 |
|---|---|---|
| 列式存儲(chǔ) | 按列壓縮存儲(chǔ),提升掃描效率。 | Parquet、ORC格式 |
| 數(shù)據(jù)分層存儲(chǔ) | 冷熱數(shù)據(jù)分離,熱數(shù)據(jù)存SSD,冷數(shù)據(jù)歸檔低成本存儲(chǔ) | HDFS分層存儲(chǔ)、S3 Intelligent-Tiering |
| 增量處理 | 僅處理新增數(shù)據(jù),避免全量重跑。 | Hudi/Iceberg增量更新、CDC(Change Data Capture) |
| 數(shù)據(jù)壓縮 | 使用高效壓縮算法減少存儲(chǔ)與傳輸開銷。 | Spark壓縮配置、Kafka消息壓縮 |
流批一體
Lambda 架構(gòu)
- 批處理層(Batch Layer):處理全量歷史數(shù)據(jù)(如Hadoop、Spark),生成精準(zhǔn)結(jié)果。
- 速度層(Speed Layer):處理實(shí)時(shí)數(shù)據(jù)(如Flink、Kafka Streams),生成近似結(jié)果。
- 服務(wù)層(Serving Layer):合并批和流的結(jié)果(如HBase、Redis),供查詢使用。
Kappa 架構(gòu)
- 僅保留流處理層:通過流處理引擎(如Flink)重放歷史數(shù)據(jù),替代批處理層。
- 依賴持久化事件日志:如Kafka長(zhǎng)期存儲(chǔ)原始數(shù)據(jù)(支持全量回放)。
流批一體的核心思想
-
統(tǒng)一的數(shù)據(jù)模型
- 批數(shù)據(jù):視為有界流(Bounded Stream),即流的一個(gè)有限子集。
- 流數(shù)據(jù):視為無界流(Unbounded Stream),持續(xù)追加。
- 統(tǒng)一處理語義:無論數(shù)據(jù)來自歷史還是實(shí)時(shí),均通過 事件時(shí)間(Event Time) 和 處理時(shí)間(Processing Time) 統(tǒng)一管理。
-
統(tǒng)一的 API
- 聲明式編程:通過同一套API(如SQL、DataStream/DataSet API)描述批和流任務(wù)。
- 代碼復(fù)用:業(yè)務(wù)邏輯(如聚合、過濾)無需為批和流分別實(shí)現(xiàn)。
-
統(tǒng)一的運(yùn)行時(shí)引擎
- 共享執(zhí)行引擎:批和流任務(wù)在同一運(yùn)行時(shí)中執(zhí)行(如Flink的流式運(yùn)行時(shí)支持批處理優(yōu)化)。
- 資源動(dòng)態(tài)分配:根據(jù)負(fù)載自動(dòng)調(diào)配資源(如夜間跑批時(shí)分配更多資源)。
流批一體的技術(shù)實(shí)現(xiàn)
-
Apache Flink
-
流式優(yōu)先,批是流的特例:
流處理:
DataStream API處理無界數(shù)據(jù),支持事件時(shí)間、狀態(tài)管理、精確一次語義。批處理:
DataSet API(舊版)或直接使用DataStream API處理有界數(shù)據(jù),自動(dòng)優(yōu)化執(zhí)行計(jì)劃。
- 批處理優(yōu)化:對(duì)有界數(shù)據(jù)關(guān)閉冗余容錯(cuò)機(jī)制(如Checkpoint),減少開銷。
- 動(dòng)態(tài)延遲調(diào)度:批任務(wù)優(yōu)先處理關(guān)鍵路徑數(shù)據(jù),縮短作業(yè)完成時(shí)間。
-
-
Apache Spark(Structured Streaming)
- 微批處理:將流數(shù)據(jù)切分為小批次(如1秒窗口),復(fù)用批處理引擎。
- 連續(xù)處理模式:實(shí)驗(yàn)性支持低至毫秒級(jí)的延遲(類似流處理)。