數(shù)據(jù)倉(cāng)庫(kù)之Hive快速入門 - 離線&實(shí)時(shí)數(shù)倉(cāng)架構(gòu)

數(shù)據(jù)倉(cāng)庫(kù)VS數(shù)據(jù)庫(kù)

數(shù)據(jù)倉(cāng)庫(kù)的定義:

  • 數(shù)據(jù)倉(cāng)庫(kù)是將多個(gè)數(shù)據(jù)源的數(shù)據(jù)經(jīng)過(guò)ETL(Extract(抽?。ransform(轉(zhuǎn)換)、Load(加載))理之后,按照一定的主題集成起來(lái)提供決策支持和聯(lián)機(jī)分析應(yīng)用的結(jié)構(gòu)化數(shù)據(jù)環(huán)境

數(shù)據(jù)倉(cāng)庫(kù)VS數(shù)據(jù)庫(kù):

  • 數(shù)據(jù)庫(kù)是面向事務(wù)的設(shè)計(jì),數(shù)據(jù)倉(cāng)庫(kù)是面向主題設(shè)計(jì)的
  • 數(shù)據(jù)庫(kù)一般存儲(chǔ)在線交易數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù)存儲(chǔ)的一般是歷史數(shù)據(jù)
  • 數(shù)據(jù)庫(kù)設(shè)計(jì)是避免冗余,采用三范式的規(guī)則來(lái)設(shè)計(jì),數(shù)據(jù)倉(cāng)庫(kù)在設(shè)計(jì)是有意引入冗余,采用反范式的方式來(lái)設(shè)計(jì)

OLTP VS OLAP:

  • 聯(lián)機(jī)事務(wù)處理OLTP是傳統(tǒng)的關(guān)系型數(shù)據(jù)庫(kù)的主要應(yīng)用,主要是基本的、日常的事務(wù)處理,例如銀行交易
  • 聯(lián)機(jī)分析處理OLAP是數(shù)據(jù)倉(cāng)庫(kù)系統(tǒng)的主要應(yīng)用,支持復(fù)雜的分析操作,側(cè)重決策支持,并且提供直觀易懂的查詢結(jié)果

常規(guī)的數(shù)倉(cāng)架構(gòu):


image.png

為什么建設(shè)數(shù)據(jù)倉(cāng)庫(kù):

  • 各個(gè)業(yè)務(wù)數(shù)據(jù)存在不一致,數(shù)據(jù)關(guān)系混亂
  • 業(yè)務(wù)系統(tǒng)一般針對(duì)于OLTP,而數(shù)據(jù)倉(cāng)庫(kù)可以實(shí)現(xiàn)OLAP分析
  • 數(shù)據(jù)倉(cāng)庫(kù)是多源的復(fù)雜環(huán)境,可以對(duì)多個(gè)業(yè)務(wù)的數(shù)據(jù)進(jìn)行統(tǒng)一分析

數(shù)據(jù)倉(cāng)庫(kù)建設(shè)目標(biāo):

  • 集成多源數(shù)據(jù),數(shù)據(jù)來(lái)源和去向可追溯,梳理血緣關(guān)系
  • 減少重復(fù)開發(fā),保存通用型中間數(shù)據(jù),避免重復(fù)計(jì)算
  • 屏蔽底層業(yè)務(wù)邏輯,對(duì)外提供一致的、 結(jié)構(gòu)清晰的數(shù)據(jù)

如何實(shí)現(xiàn):

  • 實(shí)現(xiàn)通用型數(shù)據(jù)ETL工具
  • 根據(jù)業(yè)務(wù)建立合理的數(shù)據(jù)分層模型

數(shù)據(jù)倉(cāng)庫(kù)分層建設(shè)

數(shù)倉(cāng)建設(shè)背景:

  • 數(shù)據(jù)建設(shè)剛起步,大部分?jǐn)?shù)據(jù)經(jīng)過(guò)粗暴的數(shù)據(jù)接入后直接對(duì)接業(yè)務(wù)
  • 數(shù)據(jù)建設(shè)發(fā)展到一定階段,發(fā)現(xiàn)數(shù)據(jù)的使用雜亂無(wú)章,各種業(yè)務(wù)都是從原始數(shù)據(jù)直接計(jì)算而得。
  • 各種重復(fù)計(jì)算,嚴(yán)重浪費(fèi)了計(jì)算資源,需要優(yōu)化性能

為什么進(jìn)行數(shù)倉(cāng)分層:

  • 清晰數(shù)據(jù)結(jié)構(gòu):每個(gè)數(shù)據(jù)分層都有對(duì)應(yīng)的作用域
  • 數(shù)據(jù)血緣追蹤:對(duì)各層之間的數(shù)據(jù)表轉(zhuǎn)換進(jìn)行跟蹤,建立血緣關(guān)系
  • 減少重復(fù)開發(fā):規(guī)范數(shù)據(jù)分層,開發(fā)通用的中間層數(shù)據(jù)
  • 屏蔽原始數(shù)據(jù)的異常:通過(guò)數(shù)據(jù)分層管控?cái)?shù)據(jù)質(zhì)量
  • 屏蔽業(yè)務(wù)的影響:不必改一次業(yè)務(wù)就需要重新接入數(shù)據(jù)
  • 復(fù)雜問(wèn)題簡(jiǎn)單化:將復(fù)雜的數(shù)倉(cāng)架構(gòu)分解成多個(gè)數(shù)據(jù)層來(lái)完成

常見的分層含義:


image.png

STG層

  • 原始數(shù)據(jù)層:存儲(chǔ)原始數(shù)據(jù),數(shù)據(jù)結(jié)構(gòu)與采集數(shù)據(jù)一致
  • 存儲(chǔ)周期:保存全部數(shù)據(jù)
  • 表命名規(guī)范:stg_主題_表內(nèi)容_分表規(guī)則

ODS層

  • 數(shù)據(jù)操作層:對(duì)STG層數(shù)據(jù)進(jìn)行初步處理,如去除臟數(shù)據(jù),去除無(wú)用字段.
  • 存儲(chǔ)周期:默認(rèn)保留近30天數(shù)據(jù)
  • 表命名規(guī)范:ods_主題_表內(nèi)容_分表規(guī)則

DWD層

  • 數(shù)據(jù)明細(xì)層:數(shù)據(jù)處理后的寬表,目標(biāo)為滿足80%的業(yè)務(wù)需求
  • 存儲(chǔ)周期:保留歷史至今所有的數(shù)據(jù)
  • 表命名規(guī)范:dwd_業(yè)務(wù)描述時(shí)間粒度

DWS層

  • 數(shù)據(jù)匯總層:匯總數(shù)據(jù),解決數(shù)據(jù)匯總計(jì)算和數(shù)據(jù)完整度問(wèn)題
  • 存儲(chǔ)周期:保留歷史至今所有的數(shù)據(jù)
  • 表命名規(guī)范:dws_業(yè)務(wù)描述_時(shí)間粒度_sum

DIM層

  • 公共維度層:存儲(chǔ)公共的信息數(shù)據(jù),用于DWD、DWS的數(shù)據(jù)關(guān)聯(lián)
  • 存儲(chǔ)周期:按需存儲(chǔ),一般保留歷史至今所有的數(shù)據(jù)
  • 表命名規(guī)范:dim_維度描述

DM層

  • 數(shù)據(jù)集市層:用于BI、多維分析、標(biāo)簽、數(shù)據(jù)挖掘等
  • 存儲(chǔ)周期:按需存儲(chǔ),--般保留歷史至今所有的數(shù)據(jù)
  • 表命名規(guī)范:dm_主題_表內(nèi)容_分表規(guī)則

分層之間的數(shù)據(jù)流轉(zhuǎn):


image.png

Hive是什么

Hive簡(jiǎn)介:

  • Hive是基于Hadoop的數(shù)據(jù)倉(cāng)庫(kù)工具,提供類SQL語(yǔ)法(HiveQL)
  • 默認(rèn)以MR作為計(jì)算引擎(也支持其他計(jì)算引擎,例如tez)、HDFS 作為存儲(chǔ)系統(tǒng),提供超大數(shù)據(jù)集的計(jì)算/擴(kuò)展能力
  • Hive是將數(shù)據(jù)映射成數(shù)據(jù)庫(kù)和一張張的表,庫(kù)和表的元數(shù)據(jù)信息一般存在關(guān)系型數(shù)據(jù)庫(kù)

Hive的簡(jiǎn)單架構(gòu)圖:


image.png

Hive VS Hadoop:

  • Hive數(shù)據(jù)存儲(chǔ):Hive的數(shù)據(jù)是存儲(chǔ)在HDFS.上的,Hive的庫(kù)和表是對(duì)HDFS.上數(shù)據(jù)的映射
  • Hive元數(shù)據(jù)存儲(chǔ):元數(shù)據(jù)存儲(chǔ)一般在外部關(guān)系庫(kù)( Mysql )與Presto Impala等共享
  • Hive語(yǔ)句的執(zhí)行過(guò)程:將HQL轉(zhuǎn)換為MapReduce任務(wù)運(yùn)行

Hive與關(guān)系數(shù)據(jù)庫(kù)Mysql的區(qū)別

產(chǎn)品定位

Hive是數(shù)據(jù)倉(cāng)庫(kù),為海量數(shù)據(jù)的離線分析設(shè)計(jì)的,不支持OLTP(聯(lián)機(jī)事務(wù)處理所需的關(guān)鍵功能ACID,而更接近于OLAP(聯(lián)機(jī)分析技術(shù))),適給離線處理大數(shù)據(jù)集。而MySQL是關(guān)系型數(shù)據(jù)庫(kù),是為實(shí)時(shí)業(yè)務(wù)設(shè)計(jì)的。

可擴(kuò)展性

Hive中的數(shù)據(jù)存儲(chǔ)在HDFS(Hadoop的分布式文件系統(tǒng)),metastore元數(shù)據(jù)一 般存儲(chǔ)在獨(dú)立的關(guān)系型數(shù)據(jù)庫(kù)中,而MySQL則是服務(wù)器本地的文件系統(tǒng)。因此Hive具有良好的可擴(kuò)展性,數(shù)據(jù)庫(kù)由于ACID語(yǔ)義的嚴(yán)格限制,擴(kuò)展性十分有限。

讀寫模式

Hive為讀時(shí)模式,數(shù)據(jù)的驗(yàn)證則是在查詢時(shí)進(jìn)行的,這有利于大數(shù)據(jù)集的導(dǎo)入,讀時(shí)模式使數(shù)據(jù)的加載非常迅速,數(shù)據(jù)的加載僅是文件復(fù)制或移動(dòng)。MySQL為寫時(shí)模式,數(shù)據(jù)在寫入數(shù)據(jù)庫(kù)時(shí)對(duì)照模式檢查。寫時(shí)模式有利于提升查詢性能,因?yàn)閿?shù)據(jù)庫(kù)可以對(duì)列進(jìn)行索引。

數(shù)據(jù)更新

Hive是針對(duì)數(shù)據(jù)倉(cāng)庫(kù)應(yīng)用設(shè)計(jì)的,而數(shù)倉(cāng)的內(nèi)容是讀多寫少的,Hive中不支持對(duì)數(shù)據(jù)進(jìn)行改寫,所有數(shù)據(jù)都是在加載的時(shí)候確定好的。而數(shù)據(jù)庫(kù)中的數(shù)據(jù)通常是需要經(jīng)常進(jìn)行修改的。

索引

Hive支持索引,但是Hive的索引與關(guān)系型數(shù)據(jù)庫(kù)中的索引并不相同,比如,Hive不支持主鍵或者外鍵。Hive提供了有限的索引功能,可以為-些字段建立索引,一張表的索引數(shù)據(jù)存儲(chǔ)在另外一張表中。由于數(shù)據(jù)的訪問(wèn)延遲較高,Hive不適合在線數(shù)據(jù)查詢。數(shù)據(jù)庫(kù)在少星的特定條件的數(shù)據(jù)訪問(wèn)中,索引可以提供較低的延遲。

計(jì)算模型

Hive默認(rèn)使用的模型是MapReduce(也可以on spark、on tez),而MySQL使用的是自己設(shè)計(jì)的Executor計(jì)算模型

image.png

Hive安裝部署

參考:


Hive基本使用(上)Hive數(shù)據(jù)類型/分區(qū)/基礎(chǔ)語(yǔ)法

Hive數(shù)據(jù)類型:

  • 基本數(shù)據(jù)類型:int、 float、 double、 string、 boolean、 bigint等
  • 復(fù)雜類型:array、map、 struct

Hive分區(qū):

  • Hive將海量數(shù)據(jù)按某幾個(gè)字段進(jìn)行分區(qū),查詢時(shí)不必加載全部數(shù)據(jù)
  • 分區(qū)對(duì)應(yīng)到HDFS就是HDFS的目錄.
  • 分區(qū)分為靜態(tài)分區(qū)和動(dòng)態(tài)分區(qū)兩種

Hive常用基礎(chǔ)語(yǔ)法:

  • USE DATABASE_NAME
  • CREATE DATABASE IF NOT EXISTS DB NAME
  • DESC DATABASE DB NAME
  • CREATE TABLE TABLE_ NAME (..) ROW FORMAT DELIMITED FIELDS TERMINATED BY "\t" STORE AS TEXTFILE
  • SELECT * FROM TABLE NAME
  • ALTER TABLE TABLE_NAME RENAME TO NEW_TABLE_NAME

寫個(gè)Python腳本生成一些測(cè)試數(shù)據(jù):

import json
import random
import uuid

name = ('Tom', 'Jerry', 'Jim', 'Angela', 'Ann', 'Bella', 'Bonnie', 'Caroline')
hobby = ('reading', 'play', 'dancing', 'sing')
subject = ('math', 'chinese', 'english', 'computer')

data = []
for item in name:
    scores = {key: random.randint(60, 100) for key in subject}
    data.append("|".join([uuid.uuid4().hex, item, ','.join(
        random.sample(set(hobby), 2)), ','.join(["{0}:{1}".format(k, v) for k, v in scores.items()])]))

with open('test.csv', 'w') as f:
    f.write('\n'.join(data))

執(zhí)行該腳本,生成測(cè)試數(shù)據(jù)文件:

[root@hadoop01 ~/py-script]# python3 gen_data.py
[root@hadoop01 ~/py-script]# ll -h
...
-rw-r--r--. 1 root root  745 11月  9 11:09 test.csv
[root@hadoop01 ~/py-script]# 

我們可以看一下生成的數(shù)據(jù):

[root@hadoop01 ~/py-script]# cat test.csv 
f4914b91c5284b01832149776ca53c8d|Tom|reading,dancing|math:91,chinese:86,english:67,computer:77

...
  • 數(shù)據(jù)以 | 符進(jìn)行分割,前兩個(gè)字段都是string類型,第三個(gè)字段是array類型,第四個(gè)字段是map類型

創(chuàng)建測(cè)試用的數(shù)據(jù)庫(kù):

0: jdbc:hive2://localhost:10000> create database hive_test;
No rows affected (0.051 seconds)
0: jdbc:hive2://localhost:10000> use hive_test;
No rows affected (0.06 seconds)
0: jdbc:hive2://localhost:10000> 

創(chuàng)建測(cè)試表:

CREATE TABLE test(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將本地?cái)?shù)據(jù)加載到Hive中:

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table test;
No rows affected (0.785 seconds)
0: jdbc:hive2://localhost:10000> 

查詢數(shù)據(jù):


image.png

Hive將HQL轉(zhuǎn)換為MapReduce的流程

了解了Hive中的SQL基本操作之后,我們來(lái)看看Hive是如何將SQL轉(zhuǎn)換為MapReduce任務(wù)的,整個(gè)轉(zhuǎn)換過(guò)程分為六個(gè)階段:

  1. Antr定義SQL的語(yǔ)法規(guī)則,完成SQL詞法,語(yǔ)法解析,將SQL 轉(zhuǎn)化為抽象語(yǔ)法樹AST Tree
  2. 遍歷AST Tree,抽象出查詢的基本組成單元QueryBlock
  3. 遍歷QueryBlock,翻譯為執(zhí)行操作樹OperatorTree
  4. 邏輯層優(yōu)化器進(jìn)行OperatorTree變換,合并不必要的ReduceSinkOperator,減少shufle數(shù)據(jù)量
  5. 遍歷OperatorTree,翻譯為MapReduce任務(wù)
  6. 物理層優(yōu)化器進(jìn)行MapReduce任務(wù)的變換,生成最終的執(zhí)行計(jì)劃
image.png

與普通SQL一樣,我們可以通過(guò)在HQL前面加上explain關(guān)鍵字查看HQL的執(zhí)行計(jì)劃:

explain select * from test where id > 10 limit 1000

Hive會(huì)將這條語(yǔ)句解析成一個(gè)個(gè)的Operator,Operator就是Hive解析之后的最小單元,每個(gè)Operator其實(shí)都是對(duì)應(yīng)一個(gè)MapReduce任務(wù)。例如,上面這條語(yǔ)句被Hive解析后,就是由如下Operator組成:


image.png

同時(shí),Hive實(shí)現(xiàn)了優(yōu)化器對(duì)這些Operator的順序進(jìn)行優(yōu)化,幫助我們提升查詢效率。Hive中的優(yōu)化器主要分為三類:

  • RBO(Rule-Based Optimizer):基于規(guī)則的優(yōu)化器
  • CBO(Cost-Based Optimizer):基于代價(jià)的優(yōu)化器,這是默認(rèn)的優(yōu)化器
  • 動(dòng)態(tài)CBO:在執(zhí)行計(jì)劃生成的過(guò)程中動(dòng)態(tài)優(yōu)化的方式

Hive基本使用(中)內(nèi)部表/外部表/分區(qū)表/分桶表

內(nèi)部表:

和傳統(tǒng)數(shù)據(jù)庫(kù)的Table概念類似,對(duì)應(yīng)HDFS上存儲(chǔ)目錄,刪除表時(shí),刪除元數(shù)據(jù)和表數(shù)據(jù)。內(nèi)部表的數(shù)據(jù),會(huì)存放在HDFS中的特定的位置中,可以通過(guò)配置文件指定。當(dāng)刪除表時(shí),數(shù)據(jù)文件也會(huì)一并刪除。適用于臨時(shí)創(chuàng)建的中間表。

外部表:

指向已經(jīng)存在的HDFS數(shù)據(jù),刪除時(shí)只刪除元數(shù)據(jù)信息。適用于想要在Hive之外使用表的數(shù)據(jù)的情況,當(dāng)你刪除External Table時(shí),只是刪除了表的元數(shù)據(jù),它的數(shù)據(jù)并沒有被刪除。適用于數(shù)據(jù)多部門共享。建表時(shí)使用create external table。指定external關(guān)鍵字即可。

分區(qū)表:

Partition對(duì)應(yīng)普通數(shù)據(jù)庫(kù)對(duì)Partition列的密集索引,將數(shù)據(jù)按照Partition列存儲(chǔ)到不同目錄,便于并行分析,減少數(shù)據(jù)量。分區(qū)表創(chuàng)建表的時(shí)候需要指定分區(qū)字段。

分區(qū)字段與普通字段的區(qū)別:分區(qū)字段會(huì)在HDFS表目錄下生成一個(gè)分區(qū)字段名稱的目錄,而普通字段則不會(huì),查詢的時(shí)候可以當(dāng)成普通字段來(lái)使用,一般不直接和業(yè)務(wù)直接相關(guān)。

分桶表:

對(duì)數(shù)據(jù)進(jìn)行hash,放到不同文件存儲(chǔ),方便抽樣和join查詢??梢詫?nèi)部表,外部表和分區(qū)表進(jìn)一步組織成桶表,可以將表的列通過(guò)Hash算法進(jìn)一步分解成不同的文件存儲(chǔ)。

對(duì)于內(nèi)部表和外部表的概念和應(yīng)用場(chǎng)景我們很容易理解,我們需要重點(diǎn)關(guān)注一下分區(qū)表和分桶表。 我們?yōu)槭裁匆⒎謪^(qū)表和分桶表呢?HQL通過(guò)where子句來(lái)限制條件提取數(shù)據(jù),那么與其遍歷一張大表,不如將這張大表拆分成多個(gè)小表,并通過(guò)合適的索引來(lái)掃描表中的一小部分,分區(qū)和分桶都是采用了這種理念。

分區(qū)會(huì)創(chuàng)建物理目錄,并且可以具有子目錄(通常會(huì)按照時(shí)間、地區(qū)分區(qū)),目錄名以 分區(qū)名=值 形式命名,例如:create_time=202011。分區(qū)名會(huì)作為表中的偽列,這樣通過(guò)where字句中加入分區(qū)的限制可以在僅掃描對(duì)應(yīng)子目錄下的數(shù)據(jù)。通過(guò) partitioned by (feld1 type, ...) 創(chuàng)建分區(qū)列。

分桶可以繼續(xù)在分區(qū)的基礎(chǔ)上再劃分小表,分桶根據(jù)哈希值來(lái)確定數(shù)據(jù)的分布(即MapReducer中的分區(qū)),比如分區(qū)下的一部分?jǐn)?shù)據(jù)可以根據(jù)分桶再分為多個(gè)桶,這樣在查詢時(shí)先計(jì)算對(duì)應(yīng)列的哈希值并計(jì)算桶號(hào),只需要掃描對(duì)應(yīng)桶中的數(shù)據(jù)即可。分桶通過(guò)clustered by(field) into n buckets創(chuàng)建。

接下來(lái)簡(jiǎn)單演示下這幾種表的操作,首先將上一小節(jié)生成的測(cè)試數(shù)據(jù)文件上傳到hdfs中:

[root@hadoop01 ~]# hdfs dfs -mkdir /test
[root@hadoop01 ~]# hdfs dfs -put py-script/test.csv /test
[root@hadoop01 ~]# hdfs dfs -ls /test
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /test/test.csv
[root@hadoop01 ~]# 

內(nèi)部表

建表SQL:

CREATE TABLE test_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將hdfs數(shù)據(jù)加載到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table test_table;
No rows affected (0.169 seconds)
0: jdbc:hive2://localhost:10000> 

查看創(chuàng)建的表存儲(chǔ)在hdfs的哪個(gè)目錄下:

0: jdbc:hive2://localhost:10000> show create table test_table;
+----------------------------------------------------+
|                   createtab_stmt                   |
+----------------------------------------------------+
| CREATE TABLE `test_table`(                         |
|   `user_id` string,                                |
|   `user_name` string,                              |
|   `hobby` array<string>,                           |
|   `scores` map<string,int>)                        |
| ROW FORMAT SERDE                                   |
|   'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'  |
| WITH SERDEPROPERTIES (                             |
|   'collection.delim'=',',                          |
|   'field.delim'='|',                               |
|   'line.delim'='\n',                               |
|   'mapkey.delim'=':',                              |
|   'serialization.format'='|')                      |
| STORED AS INPUTFORMAT                              |
|   'org.apache.hadoop.mapred.TextInputFormat'       |
| OUTPUTFORMAT                                       |
|   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' |
| LOCATION                                           |
|   'hdfs://hadoop01:8020/user/hive/warehouse/hive_test.db/test_table' |
| TBLPROPERTIES (                                    |
|   'bucketing_version'='2',                         |
|   'transient_lastDdlTime'='1604893190')            |
+----------------------------------------------------+
22 rows selected (0.115 seconds)
0: jdbc:hive2://localhost:10000> 

在hdfs中可以查看到數(shù)據(jù)文件:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/test_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:34 /user/hive/warehouse/hive_test.db/test_table/test.csv
[root@hadoop01 ~]# 

刪除表:

0: jdbc:hive2://localhost:10000> drop table test_table;
No rows affected (0.107 seconds)
0: jdbc:hive2://localhost:10000> 

查看hdfs會(huì)發(fā)現(xiàn)該表所對(duì)應(yīng)的存儲(chǔ)目錄也一并被刪除了:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table
drwxr-xr-x   - root supergroup          0 2020-11-09 11:23 /user/hive/warehouse/hive_test.db/test
[root@hadoop01 ~]# 

外部表

建表SQL,與內(nèi)部表的區(qū)別就在于external關(guān)鍵字:

CREATE external TABLE external_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將數(shù)據(jù)文件加載到Hive中:

0: jdbc:hive2://localhost:10000> load data inpath '/test/test.csv' overwrite into table external_table;
No rows affected (0.182 seconds)
0: jdbc:hive2://localhost:10000> 

此時(shí)會(huì)發(fā)現(xiàn)hdfs中的數(shù)據(jù)文件會(huì)被移動(dòng)到hive的目錄下:

[root@hadoop01 ~]# hdfs dfs -ls /test
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]# 

刪除表:

0: jdbc:hive2://localhost:10000> drop table external_table;
No rows affected (0.112 seconds)
0: jdbc:hive2://localhost:10000> 

查看hdfs會(huì)發(fā)現(xiàn)該表所對(duì)應(yīng)的存儲(chǔ)目錄仍然存在:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/external_table
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 11:52 /user/hive/warehouse/hive_test.db/external_table/test.csv
[root@hadoop01 ~]# 

分區(qū)表

建表語(yǔ)句:

CREATE TABLE partition_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
PARTITIONED BY (create_time string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

將數(shù)據(jù)文件加載到Hive中,并指定分區(qū):

0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202011');
No rows affected (0.747 seconds)
0: jdbc:hive2://localhost:10000> load data local inpath '/root/py-script/test.csv' overwrite into table partition_table partition (create_time='202012');
No rows affected (0.347 seconds)
0: jdbc:hive2://localhost:10000> 

執(zhí)行如下sql,可以從不同的分區(qū)統(tǒng)計(jì)結(jié)果:

0: jdbc:hive2://localhost:10000> select count(*) from partition_table;
+------+
| _c0  |
+------+
| 16   |
+------+
1 row selected (15.881 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202011';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (14.639 seconds)
0: jdbc:hive2://localhost:10000> select count(*) from partition_table where create_time='202012';
+------+
| _c0  |
+------+
| 8    |
+------+
1 row selected (15.555 seconds)
0: jdbc:hive2://localhost:10000> 

分區(qū)表在hdfs中的存儲(chǔ)結(jié)構(gòu):

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table
Found 2 items
drwxr-xr-x   - root supergroup          0 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
drwxr-xr-x   - root supergroup          0 2020-11-09 12:09 /user/hive/warehouse/hive_test.db/partition_table/create_time=202012
[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/partition_table/create_time=202011
Found 1 items
-rw-r--r--   1 root supergroup        745 2020-11-09 12:08 /user/hive/warehouse/hive_test.db/partition_table/create_time=202011/test.csv
[root@hadoop01 ~]# 

分桶表

建表語(yǔ)句:

CREATE TABLE bucket_table(
    user_id string,
    user_name string,
    hobby array<string>,
    scores map<string,integer>
)
clustered by (user_name) sorted by (user_name) into 2 buckets
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '|'
COLLECTION ITEMS TERMINATED BY ','
MAP KEYS TERMINATED BY ':'
LINES TERMINATED BY '\n';

test表中的數(shù)據(jù)插入到bucket_table中:

0: jdbc:hive2://localhost:10000> insert into bucket_table select * from test;
No rows affected (17.393 seconds)
0: jdbc:hive2://localhost:10000> 

抽樣查詢:


image.png

分桶表在hdfs的存儲(chǔ)目錄如下:

[root@hadoop01 ~]# hdfs dfs -ls /user/hive/warehouse/hive_test.db/bucket_table
Found 2 items
-rw-r--r--   1 root supergroup        465 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000000_0
-rw-r--r--   1 root supergroup        281 2020-11-09 13:54 /user/hive/warehouse/hive_test.db/bucket_table/000001_0
[root@hadoop01 ~]# 

Hive基本使用(下)內(nèi)置函數(shù)/自定義函數(shù)/實(shí)現(xiàn)UDF

Hive常見內(nèi)置函數(shù):

  • 字符串類型:concat、substr、 upper、 lower
  • 時(shí)間類型:year、month、 day
  • 復(fù)雜類型:size、 get_json_object

查詢引擎都自帶了一部分函數(shù)來(lái)幫助我們解決查詢過(guò)程當(dāng)中一些復(fù)雜的數(shù)據(jù)計(jì)算或者數(shù)據(jù)轉(zhuǎn)換操作,但是有時(shí)候自帶的函數(shù)功能不能滿足業(yè)務(wù)的需要。這時(shí)候就需要我們自己開發(fā)自定義的函數(shù)來(lái)輔助完成了,這就是所謂的用戶自定義函數(shù)UDF(User-Defined Functions)。Hive支持三類自定義函數(shù):

  • UDF:普通的用戶自定義函數(shù)。用來(lái)處理輸入一行,輸出一行的操作,類似Map操作。如轉(zhuǎn)換字符串大小寫,獲取字符串長(zhǎng)度等
  • UDAF:用戶自定義聚合函數(shù)(User-defined aggregate function),用來(lái)處理輸入多行,輸出一行的操作,類似Reduce操作。比如MAX、COUNT函數(shù)。
  • UDTF:用戶自定義表產(chǎn)生函數(shù)(User defined table-generating function),用來(lái)處理輸入一行,輸出多行(即一個(gè)表)的操作, 不是特別常用

UDF函數(shù)其實(shí)就是一段遵循一定接口規(guī)范的程序。在執(zhí)行過(guò)程中Hive將SQL轉(zhuǎn)換為MapReduce程序,在執(zhí)行過(guò)程當(dāng)中在執(zhí)行我們的UDF函數(shù)。

本小節(jié)簡(jiǎn)單演示下自定義UDF函數(shù),首先創(chuàng)建一個(gè)空的Maven項(xiàng)目,然后添加hive-exec依賴,版本與你安裝的Hive版本需對(duì)應(yīng)上。完整的pom文件內(nèi)容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>hive-udf-test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

首先創(chuàng)建一個(gè)繼承UDF的類,我們實(shí)現(xiàn)的這個(gè)自定義函數(shù)功能就是簡(jiǎn)單的獲取字段的長(zhǎng)度:

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class StrLen extends UDF {

    public int evaluate(final Text col) {
        return col.getLength();
    }
}

以上這種自定義函數(shù)只能支持處理普通類型的數(shù)據(jù),如果要對(duì)復(fù)雜類型的數(shù)據(jù)做處理則需要繼承GenericUDF,并實(shí)現(xiàn)其抽象方法。例如,我們實(shí)現(xiàn)一個(gè)對(duì)測(cè)試數(shù)據(jù)中的scores字段求平均值的函數(shù):

package com.example.hive.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.text.DecimalFormat;

public class AvgScore extends GenericUDF {

    /**
     * 函數(shù)的名稱
     */
    private static final String FUNC_NAME = "AVG_SCORE";

    /**
     * 函數(shù)所作用的字段類型,這里是map類型
     */
    private transient MapObjectInspector mapOi;

    /**
     * 控制精度只返回兩位小數(shù)
     */
    DecimalFormat df = new DecimalFormat("#.##");

    @Override
    public ObjectInspector initialize(ObjectInspector[] objectInspectors) throws UDFArgumentException {
        // 在此方法中可以做一些前置的校驗(yàn),例如檢測(cè)函數(shù)參數(shù)個(gè)數(shù)、檢測(cè)函數(shù)參數(shù)類型
        mapOi = (MapObjectInspector) objectInspectors[0];
        // 指定函數(shù)的輸出類型
        return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector;
    }

    @Override
    public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
        // 函數(shù)的核心邏輯,取出map中的value進(jìn)行求平均值,并返回一個(gè)Double類型的結(jié)果值
        Object o = deferredObjects[0].get();
        double v = mapOi.getMap(o).values().stream()
                .mapToDouble(a -> Double.parseDouble(a.toString()))
                .average()
                .orElse(0.0);

        return Double.parseDouble(df.format(v));
    }

    @Override
    public String getDisplayString(String[] strings) {
        return "func(map)";
    }
}

對(duì)項(xiàng)目進(jìn)行打包,并上傳到服務(wù)器中:

[root@hadoop01 ~/jars]# ls
hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]# 

將jar包上傳到hdfs中:

[root@hadoop01 ~/jars]# hdfs dfs -mkdir /udfs
[root@hadoop01 ~/jars]# hdfs dfs -put hive-udf-test-1.0-SNAPSHOT.jar /udfs
[root@hadoop01 ~/jars]# hdfs dfs -ls /udfs
Found 1 items
-rw-r--r--   1 root supergroup       4030 2020-11-09 14:25 /udfs/hive-udf-test-1.0-SNAPSHOT.jar
[root@hadoop01 ~/jars]# 

在Hive中添加該jar包:

0: jdbc:hive2://localhost:10000> add jar hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar;
No rows affected (0.022 seconds)
0: jdbc:hive2://localhost:10000> 

然后注冊(cè)臨時(shí)函數(shù),臨時(shí)函數(shù)只會(huì)在當(dāng)前的session中生效:

0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION strlen as "com.example.hive.udf.StrLen";
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> CREATE TEMPORARY FUNCTION avg_score as "com.example.hive.udf.AvgScore";
No rows affected (0.008 seconds)
0: jdbc:hive2://localhost:10000> 

使用自定義函數(shù)處理:

0: jdbc:hive2://localhost:10000> select user_name, strlen(user_name) as length, avg_score(scores) as avg_score from test;
+------------+---------+------------+
| user_name  | length  | avg_score  |
+------------+---------+------------+
| Tom        | 3       | 80.25      |
| Jerry      | 5       | 77.5       |
| Jim        | 3       | 83.75      |
| Angela     | 6       | 84.5       |
| Ann        | 3       | 90.0       |
| Bella      | 5       | 69.25      |
| Bonnie     | 6       | 76.5       |
| Caroline   | 8       | 84.5       |
+------------+---------+------------+
8 rows selected (0.083 seconds)
0: jdbc:hive2://localhost:10000> 

刪除已注冊(cè)的臨時(shí)函數(shù):

0: jdbc:hive2://localhost:10000> drop temporary function strlen;
No rows affected (0.01 seconds)
0: jdbc:hive2://localhost:10000> drop temporary function avg_score;
No rows affected (0.009 seconds)
0: jdbc:hive2://localhost:10000> 

臨時(shí)函數(shù)只會(huì)在當(dāng)前的session中生效,如果需要注冊(cè)成永久函數(shù)則只需要把TEMPORARY關(guān)鍵字給去掉即可。如下所示:

0: jdbc:hive2://localhost:10000> create function strlen as 'com.example.hive.udf.StrLen' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.049 seconds)
0: jdbc:hive2://localhost:10000> create function avg_score as 'com.example.hive.udf.AvgScore' using jar 'hdfs://hadoop01:8020/udfs/hive-udf-test-1.0-SNAPSHOT.jar';
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> 

刪除永久函數(shù)也是把TEMPORARY關(guān)鍵字給去掉即可。如下所示:

0: jdbc:hive2://localhost:10000> drop function strlen;
No rows affected (0.031 seconds)
0: jdbc:hive2://localhost:10000> drop function avg_score;
No rows affected (0.026 seconds)
0: jdbc:hive2://localhost:10000> 

Hive存儲(chǔ)結(jié)構(gòu) - OrcFile

Hive支持的存儲(chǔ)格式:


image.png
  • TextFile是默認(rèn)的存儲(chǔ)格式,通過(guò)簡(jiǎn)單的分隔符可以對(duì)csv等類型的文件進(jìn)行解析。但實(shí)際應(yīng)用中通常都是使用OrcFile格式,因?yàn)镺RCFile是列式存儲(chǔ)格式,更加適合大數(shù)據(jù)查詢的場(chǎng)景。

我們都知道關(guān)系型數(shù)據(jù)庫(kù)基本是使用行式存儲(chǔ)作為存儲(chǔ)格式,而大數(shù)據(jù)領(lǐng)域更多的是采用列式存儲(chǔ),因?yàn)榇髷?shù)據(jù)分析場(chǎng)景中通常需要讀取大量行,但是只需要少數(shù)的幾個(gè)列。這也是為什么通常使用OrcFile作為Hive的存儲(chǔ)格式的原因。由此可見,大數(shù)據(jù)的絕大部分應(yīng)用場(chǎng)景都是OLAP場(chǎng)景。

OLAP場(chǎng)景的特點(diǎn)

讀多于寫

不同于事務(wù)處理(OLTP)的場(chǎng)景,比如電商場(chǎng)景中加購(gòu)物車、下單、支付等需要在原地進(jìn)行大量insert、update、delete操作,數(shù)據(jù)分析(OLAP)場(chǎng)景通常是將數(shù)據(jù)批量導(dǎo)入后,進(jìn)行任意維度的靈活探索、BI工具洞察、報(bào)表制作等。

數(shù)據(jù)一次性寫入后,分析師需要嘗試從各個(gè)角度對(duì)數(shù)據(jù)做挖掘、分析,直到發(fā)現(xiàn)其中的商業(yè)價(jià)值、業(yè)務(wù)變化趨勢(shì)等信息。這是一個(gè)需要反復(fù)試錯(cuò)、不斷調(diào)整、持續(xù)優(yōu)化的過(guò)程,其中數(shù)據(jù)的讀取次數(shù)遠(yuǎn)多于寫入次數(shù)。這就要求底層數(shù)據(jù)庫(kù)為這個(gè)特點(diǎn)做專門設(shè)計(jì),而不是盲目采用傳統(tǒng)數(shù)據(jù)庫(kù)的技術(shù)架構(gòu)。

大寬表,讀大量行但是少量列,結(jié)果集較小

在OLAP場(chǎng)景中,通常存在一張或是幾張多列的大寬表,列數(shù)高達(dá)數(shù)百甚至數(shù)千列。對(duì)數(shù)據(jù)分析處理時(shí),選擇其中的少數(shù)幾列作為維度列、其他少數(shù)幾列作為指標(biāo)列,然后對(duì)全表或某一個(gè)較大范圍內(nèi)的數(shù)據(jù)做聚合計(jì)算。這個(gè)過(guò)程會(huì)掃描大量的行數(shù)據(jù),但是只用到了其中的少數(shù)列。而聚合計(jì)算的結(jié)果集相比于動(dòng)輒數(shù)十億的原始數(shù)據(jù),也明顯小得多。

數(shù)據(jù)批量寫入,且數(shù)據(jù)不更新或少更新

OLTP類業(yè)務(wù)對(duì)于延時(shí)(Latency)要求更高,要避免讓客戶等待造成業(yè)務(wù)損失;而OLAP類業(yè)務(wù),由于數(shù)據(jù)量非常大,通常更加關(guān)注寫入吞吐(Throughput),要求海量數(shù)據(jù)能夠盡快導(dǎo)入完成。一旦導(dǎo)入完成,歷史數(shù)據(jù)往往作為存檔,不會(huì)再做更新、刪除操作。

無(wú)需事務(wù),數(shù)據(jù)一致性要求低

OLAP類業(yè)務(wù)對(duì)于事務(wù)需求較少,通常是導(dǎo)入歷史日志數(shù)據(jù),或搭配一款事務(wù)型數(shù)據(jù)庫(kù)并實(shí)時(shí)從事務(wù)型數(shù)據(jù)庫(kù)中進(jìn)行數(shù)據(jù)同步。多數(shù)OLAP系統(tǒng)都支持最終一致性。

靈活多變,不適合預(yù)先建模

分析場(chǎng)景下,隨著業(yè)務(wù)變化要及時(shí)調(diào)整分析維度、挖掘方法,以盡快發(fā)現(xiàn)數(shù)據(jù)價(jià)值、更新業(yè)務(wù)指標(biāo)。而數(shù)據(jù)倉(cāng)庫(kù)中通常存儲(chǔ)著海量的歷史數(shù)據(jù),調(diào)整代價(jià)十分高昂。預(yù)先建模技術(shù)雖然可以在特定場(chǎng)景中加速計(jì)算,但是無(wú)法滿足業(yè)務(wù)靈活多變的發(fā)展需求,維護(hù)成本過(guò)高。

行式存儲(chǔ)和列式存儲(chǔ)

行式存儲(chǔ)和列式存儲(chǔ)的對(duì)比圖:


image.png

與行式存儲(chǔ)將每一行的數(shù)據(jù)連續(xù)存儲(chǔ)不同,列式存儲(chǔ)將每一列的數(shù)據(jù)連續(xù)存儲(chǔ)。相比于行式存儲(chǔ),列式存儲(chǔ)在分析場(chǎng)景下有著許多優(yōu)良的特性:

  1. 如前所述,分析場(chǎng)景中往往需要讀大量行但是少數(shù)幾個(gè)列。在行存模式下,數(shù)據(jù)按行連續(xù)存儲(chǔ),所有列的數(shù)據(jù)都存儲(chǔ)在一個(gè)block中,不參與計(jì)算的列在IO時(shí)也要全部讀出,讀取操作被嚴(yán)重放大。而列存模式下,只需要讀取參與計(jì)算的列即可,極大的減低了IO cost,加速了查詢。
  2. 同一列中的數(shù)據(jù)屬于同一類型,壓縮效果顯著。列存往往有著高達(dá)十倍甚至更高的壓縮比,節(jié)省了大量的存儲(chǔ)空間,降低了存儲(chǔ)成本。
  3. 更高的壓縮比意味著更小的data size,從磁盤中讀取相應(yīng)數(shù)據(jù)耗時(shí)更短。
  4. 自由的壓縮算法選擇。不同列的數(shù)據(jù)具有不同的數(shù)據(jù)類型,適用的壓縮算法也就不盡相同??梢葬槍?duì)不同列類型,選擇最合適的壓縮算法。
  5. 高壓縮比,意味著同等大小的內(nèi)存能夠存放更多數(shù)據(jù),系統(tǒng)cache效果更好。

OrcFile

OrcFile存儲(chǔ)格式:


image.png

Orc列式存儲(chǔ)優(yōu)點(diǎn):

  • 查詢時(shí)只需要讀取查詢所涉及的列,降低IO消耗,同時(shí)保存每一列統(tǒng)計(jì)信息,實(shí)現(xiàn)部分謂詞下推
  • 每列數(shù)據(jù)類型一致,可針對(duì)不同的數(shù)據(jù)類型采用其高效的壓縮算法
  • 列式存儲(chǔ)格式假設(shè)數(shù)據(jù)不會(huì)發(fā)生改變,支持分片、流式讀取,更好的適應(yīng)分布式文件存儲(chǔ)的特性

除了Orc外,Parquet也是常用的列式存儲(chǔ)格式。Orc VS Parquet:

  • OrcFile和Parquet都是Apache的頂級(jí)項(xiàng)目
  • Parquet不支持ACID、不支持更新,Orc支持有限的ACID和更新
  • Parquet的壓縮能力較高,Orc的查詢效率較高

離線數(shù)倉(cāng)VS實(shí)時(shí)數(shù)倉(cāng)

image.png

離線數(shù)倉(cāng):

  • 離線數(shù)據(jù)倉(cāng)庫(kù)主要基于Hive等技術(shù)來(lái)構(gòu)建T+1的離線數(shù)據(jù)
  • 通過(guò)定時(shí)任務(wù)每天拉取增量數(shù)據(jù)導(dǎo)入到Hive表中
  • 創(chuàng)建各個(gè)業(yè)務(wù)相關(guān)的主題維度數(shù)據(jù),對(duì)外提供T+1的數(shù)據(jù)查詢接口

離線數(shù)倉(cāng)架構(gòu):

  • 數(shù)據(jù)源通過(guò)離線的方式導(dǎo)入到離線數(shù)倉(cāng)中
  • 數(shù)據(jù)分層架構(gòu):ODS、DWD、 DM
  • 下游應(yīng)用根據(jù)業(yè)務(wù)需求選擇直接讀取DM

實(shí)時(shí)數(shù)倉(cāng):

  • 實(shí)時(shí)數(shù)倉(cāng)基于數(shù)據(jù)采集工具,將原始數(shù)據(jù)寫入到Kafka等數(shù)據(jù)通道
  • 數(shù)據(jù)最終寫入到類似于HBase這樣支持快速讀寫的存儲(chǔ)系統(tǒng)
  • 對(duì)外提供分鐘級(jí)別、甚至秒級(jí)別的查詢方案

實(shí)時(shí)數(shù)倉(cāng)架構(gòu):

  • 業(yè)務(wù)實(shí)時(shí)性要求的不斷提高,實(shí)時(shí)處理從次要部分變成了主要部分
  • Lambda架構(gòu):在離線大數(shù)據(jù)架構(gòu)基礎(chǔ)上加了一個(gè)加速層,使用流處理技術(shù)完成實(shí)時(shí)性較高的指標(biāo)計(jì)算
  • Kappa架構(gòu):以實(shí)時(shí)事件處理為核心,統(tǒng)一數(shù)據(jù)處理

圖解Lambda架構(gòu)數(shù)據(jù)流程

Lambda 架構(gòu)(Lambda Architecture)是由 Twitter 工程師南森·馬茨(Nathan Marz)提出的大數(shù)據(jù)處理架構(gòu)。這一架構(gòu)的提出基于馬茨在 BackType 和 Twitter 上的分布式數(shù)據(jù)處理系統(tǒng)的經(jīng)驗(yàn)。

Lambda 架構(gòu)使開發(fā)人員能夠構(gòu)建大規(guī)模分布式數(shù)據(jù)處理系統(tǒng)。它具有很好的靈活性和可擴(kuò)展性,也對(duì)硬件故障和人為失誤有很好的容錯(cuò)性。

Lambda 架構(gòu)總共由三層系統(tǒng)組成:批處理層(Batch Layer),速度處理層(Speed Layer),以及用于響應(yīng)查詢的服務(wù)層(Serving Layer)。


image.png

在 Lambda 架構(gòu)中,每層都有自己所肩負(fù)的任務(wù)。批處理層存儲(chǔ)管理主數(shù)據(jù)集(不可變的數(shù)據(jù)集)和預(yù)先批處理計(jì)算好的視圖。批處理層使用可處理大量數(shù)據(jù)的分布式處理系統(tǒng)預(yù)先計(jì)算結(jié)果。它通過(guò)處理所有的已有歷史數(shù)據(jù)來(lái)實(shí)現(xiàn)數(shù)據(jù)的準(zhǔn)確性。這意味著它是基于完整的數(shù)據(jù)集來(lái)重新計(jì)算的,能夠修復(fù)任何錯(cuò)誤,然后更新現(xiàn)有的數(shù)據(jù)視圖。輸出通常存儲(chǔ)在只讀數(shù)據(jù)庫(kù)中,更新則完全取代現(xiàn)有的預(yù)先計(jì)算好的視圖。

速度處理層會(huì)實(shí)時(shí)處理新來(lái)的數(shù)據(jù)。速度層通過(guò)提供最新數(shù)據(jù)的實(shí)時(shí)視圖來(lái)最小化延遲。速度層所生成的數(shù)據(jù)視圖可能不如批處理層最終生成的視圖那樣準(zhǔn)確或完整,但它們幾乎在收到數(shù)據(jù)后立即可用。而當(dāng)同樣的數(shù)據(jù)在批處理層處理完成后,在速度層的數(shù)據(jù)就可以被替代掉了。

本質(zhì)上,速度層彌補(bǔ)了批處理層所導(dǎo)致的數(shù)據(jù)視圖滯后。比如說(shuō),批處理層的每個(gè)任務(wù)都需要 1 個(gè)小時(shí)才能完成,而在這 1 個(gè)小時(shí)里,我們是無(wú)法獲取批處理層中最新任務(wù)給出的數(shù)據(jù)視圖的。而速度層因?yàn)槟軌驅(qū)崟r(shí)處理數(shù)據(jù)給出結(jié)果,就彌補(bǔ)了這 1 個(gè)小時(shí)的滯后。

所有在批處理層和速度層處理完的結(jié)果都輸出存儲(chǔ)在服務(wù)層中,服務(wù)層通過(guò)返回預(yù)先計(jì)算的數(shù)據(jù)視圖或從速度層處理構(gòu)建好數(shù)據(jù)視圖來(lái)響應(yīng)查詢。

所有的新用戶行為數(shù)據(jù)都可以同時(shí)流入批處理層和速度層。批處理層會(huì)永久保存數(shù)據(jù)并且對(duì)數(shù)據(jù)進(jìn)行預(yù)處理,得到我們想要的用戶行為模型并寫入服務(wù)層。而速度層也同時(shí)對(duì)新用戶行為數(shù)據(jù)進(jìn)行處理,得到實(shí)時(shí)的用戶行為模型。

而當(dāng)“應(yīng)該對(duì)用戶投放什么樣的廣告”作為一個(gè)查詢(Query)來(lái)到時(shí),我們從服務(wù)層既查詢服務(wù)層中保存好的批處理輸出模型,也對(duì)速度層中處理的實(shí)時(shí)行為進(jìn)行查詢,這樣我們就可以得到一個(gè)完整的用戶行為歷史了。

一個(gè)查詢就如下圖所示,既通過(guò)批處理層兼顧了數(shù)據(jù)的完整性,也可以通過(guò)速度層彌補(bǔ)批處理層的高延時(shí)性,讓整個(gè)查詢具有實(shí)時(shí)性。


image.png

Kappa 架構(gòu) VS Lambda

Lambda 架構(gòu)的不足

雖然 Lambda 架構(gòu)使用起來(lái)十分靈活,并且可以適用于很多的應(yīng)用場(chǎng)景,但在實(shí)際應(yīng)用的時(shí)候,Lambda 架構(gòu)也存在著一些不足,主要表現(xiàn)在它的維護(hù)很復(fù)雜。

使用 Lambda 架構(gòu)時(shí),架構(gòu)師需要維護(hù)兩個(gè)復(fù)雜的分布式系統(tǒng),并且保證他們邏輯上產(chǎn)生相同的結(jié)果輸出到服務(wù)層中。舉個(gè)例子吧,我們?cè)诓渴?Lambda 架構(gòu)的時(shí)候,可以部署 Apache Hadoop 到批處理層上,同時(shí)部署 Apache Flink 到速度層上。

我們都知道,在分布式框架中進(jìn)行編程其實(shí)是十分復(fù)雜的,尤其是我們還會(huì)針對(duì)不同的框架進(jìn)行專門的優(yōu)化。所以幾乎每一個(gè)架構(gòu)師都認(rèn)同,Lambda 架構(gòu)在實(shí)戰(zhàn)中維護(hù)起來(lái)具有一定的復(fù)雜性。

那要怎么解決這個(gè)問(wèn)題呢?我們先來(lái)思考一下,造成這個(gè)架構(gòu)維護(hù)起來(lái)如此復(fù)雜的根本原因是什么呢?

維護(hù) Lambda 架構(gòu)的復(fù)雜性在于我們要同時(shí)維護(hù)兩套系統(tǒng)架構(gòu):批處理層和速度層。我們已經(jīng)說(shuō)過(guò)了,在架構(gòu)中加入批處理層是因?yàn)閺呐幚韺拥玫降慕Y(jié)果具有高準(zhǔn)確性,而加入速度層是因?yàn)樗谔幚泶笠?guī)模數(shù)據(jù)時(shí)具有低延時(shí)性。

那我們能不能改進(jìn)其中某一層的架構(gòu),讓它具有另外一層架構(gòu)的特性呢?例如,改進(jìn)批處理層的系統(tǒng)讓它具有更低的延時(shí)性,又或者是改進(jìn)速度層的系統(tǒng),讓它產(chǎn)生的數(shù)據(jù)視圖更具準(zhǔn)確性和更加接近歷史數(shù)據(jù)呢?

另外一種在大規(guī)模數(shù)據(jù)處理中常用的架構(gòu)——Kappa 架構(gòu)(Kappa Architecture),便是在這樣的思考下誕生的。

Kappa 架構(gòu)

Kappa 架構(gòu)是由 LinkedIn 的前首席工程師杰伊·克雷普斯(Jay Kreps)提出的一種架構(gòu)思想。克雷普斯是幾個(gè)著名開源項(xiàng)目(包括 Apache Kafka 和 Apache Samza 這樣的流處理系統(tǒng))的作者之一,也是現(xiàn)在 Confluent 大數(shù)據(jù)公司的 CEO。

克雷普斯提出了一個(gè)改進(jìn) Lambda 架構(gòu)的觀點(diǎn):

我們能不能改進(jìn) Lambda 架構(gòu)中速度層的系統(tǒng)性能,使得它也可以處理好數(shù)據(jù)的完整性和準(zhǔn)確性問(wèn)題呢?我們能不能改進(jìn) Lambda 架構(gòu)中的速度層,使它既能夠進(jìn)行實(shí)時(shí)數(shù)據(jù)處理,同時(shí)也有能力在業(yè)務(wù)邏輯更新的情況下重新處理以前處理過(guò)的歷史數(shù)據(jù)呢?

他根據(jù)自身多年的架構(gòu)經(jīng)驗(yàn)發(fā)現(xiàn),我們是可以做到這樣的改進(jìn)的。我們知道像 Apache Kafka 這樣的流處理平臺(tái)是具有永久保存數(shù)據(jù)日志的功能的。通過(guò)Kafka的這一特性,我們可以重新處理部署于速度層架構(gòu)中的歷史數(shù)據(jù)。

下面我就以 Kafka 為例來(lái)介紹整個(gè)全新架構(gòu)的過(guò)程。

第一步,部署 Kafka,并設(shè)置數(shù)據(jù)日志的保留期(Retention Period)。

這里的保留期指的是你希望能夠重新處理的歷史數(shù)據(jù)的時(shí)間區(qū)間。例如,如果你希望重新處理最多一年的歷史數(shù)據(jù),那就可以把 Apache Kafka 中的保留期設(shè)置為 365 天。如果你希望能夠處理所有的歷史數(shù)據(jù),那就可以把 Apache Kafka 中的保留期設(shè)置為“永久(Forever)”。

第二步,如果我們需要改進(jìn)現(xiàn)有的邏輯算法,那就表示我們需要對(duì)歷史數(shù)據(jù)進(jìn)行重新處理。我們需要做的就是重新啟動(dòng)一個(gè) Kafka 作業(yè)實(shí)例(Instance)。這個(gè)作業(yè)實(shí)例將重頭開始,重新計(jì)算保留好的歷史數(shù)據(jù),并將結(jié)果輸出到一個(gè)新的數(shù)據(jù)視圖中。

我們知道 Kafka 的底層是使用 Log Offset 來(lái)判斷現(xiàn)在已經(jīng)處理到哪個(gè)數(shù)據(jù)塊了,所以只需要將 Log Offset 設(shè)置為 0,新的作業(yè)實(shí)例就會(huì)重頭開始處理歷史數(shù)據(jù)。

第三步,當(dāng)這個(gè)新的數(shù)據(jù)視圖處理過(guò)的數(shù)據(jù)進(jìn)度趕上了舊的數(shù)據(jù)視圖時(shí),我們的應(yīng)用便可以切換到從新的數(shù)據(jù)視圖中讀取。

第四步,停止舊版本的作業(yè)實(shí)例,并刪除舊的數(shù)據(jù)視圖。

這個(gè)架構(gòu)就如同下圖所示。


image.png

與 Lambda 架構(gòu)不同的是,Kappa 架構(gòu)去掉了批處理層這一體系結(jié)構(gòu),而只保留了速度層。你只需要在業(yè)務(wù)邏輯改變又或者是代碼更改的時(shí)候進(jìn)行數(shù)據(jù)的重新處理。Kappa 架構(gòu)統(tǒng)一了數(shù)據(jù)的處理方式,不再維護(hù)離線和實(shí)時(shí)兩套代碼邏輯。

Kappa 架構(gòu)的不足

Kappa 架構(gòu)也是有著它自身的不足的。因?yàn)?Kappa 架構(gòu)只保留了速度層而缺少批處理層,在速度層上處理大規(guī)模數(shù)據(jù)可能會(huì)有數(shù)據(jù)更新出錯(cuò)的情況發(fā)生,這就需要我們花費(fèi)更多的時(shí)間在處理這些錯(cuò)誤異常上面。如果需求發(fā)生變化或歷史數(shù)據(jù)需要重新處理都得通過(guò)上游重放來(lái)完成。并且重新處理歷史的吞吐能力會(huì)低于批處理。

還有一點(diǎn),Kappa 架構(gòu)的批處理和流處理都放在了速度層上,這導(dǎo)致了這種架構(gòu)是使用同一套代碼來(lái)處理算法邏輯的。所以 Kappa 架構(gòu)并不適用于批處理和流處理代碼邏輯不一致的場(chǎng)景。

Lambda VS Kappa

image.png

主流大公司的實(shí)時(shí)數(shù)倉(cāng)架構(gòu)

阿里菜鳥實(shí)時(shí)數(shù)倉(cāng)

image.png

image.png

美團(tuán)實(shí)時(shí)數(shù)倉(cāng)

image.png

實(shí)時(shí)數(shù)倉(cāng)建設(shè)特征

  • 整體架構(gòu)設(shè)計(jì)通過(guò)分層設(shè)計(jì)為OLAP查詢分擔(dān)壓力
  • 復(fù)雜的計(jì)算統(tǒng)一在實(shí)時(shí)計(jì)算層做,避免給OLAP查詢帶來(lái)過(guò)大的壓力
  • 匯總計(jì)算通過(guò)OLAP數(shù)據(jù)查詢引擎進(jìn)行
  • 整個(gè)架構(gòu)中實(shí)時(shí)計(jì)算一般 是Spark+Flink配合
  • 消息隊(duì)列Kafka一家獨(dú)大,配合HBase、ES、 Mysq|進(jìn)行數(shù)據(jù)落盤
  • OLAP領(lǐng)域Presto、Druid、 Clickhouse、 Greenplum等等層出不窮
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容