Hadoop

1. Hadoop簡介

Apache Hadoop 標(biāo)志

1.1 背景介紹

Hadoop是Apache旗下的開源的分布式計(jì)算平臺,它可以運(yùn)行在計(jì)算機(jī)集群之上,提供可靠的、可擴(kuò)展的分布式計(jì)算功能。
Hadoop與三遍論文密不可分:
① 2003年,谷歌發(fā)布的分布式文件系統(tǒng)GFS的論文,可以用于解決海量數(shù)據(jù)存儲的問題。
② 2004年,谷歌發(fā)布了MapReduce的論文,可以用于解決海量數(shù)據(jù)計(jì)算的問題。
③ 2006年,谷歌發(fā)布了BigTable的論文,它是以GFS為底層數(shù)據(jù)存儲的分布式存儲系統(tǒng)。

Hadoop中的HDFS是GFS的開源實(shí)現(xiàn);
Hadoop中的MapReduce是谷歌MapReduce的開源實(shí)現(xiàn);
Hadoop中的HBase是谷歌BigTable的開源實(shí)現(xiàn)。

1.2 Hadoop核心組件

Hadoop由四個主要模塊組成:
Hadoop Common: 一組工具和庫,可補(bǔ)充其他模塊并確保與用戶計(jì)算機(jī)系統(tǒng)的兼容性;
Hadoop Distributed File System (HDFS):提供可對應(yīng)用數(shù)據(jù)進(jìn)行高吞吐量訪問的分布式文件系統(tǒng)。;
Hadoop YARN(Yet Another Resource Negotiator):作業(yè)調(diào)度和集群資源管理框架;
Hadoop MapReduce:基于YARN的大數(shù)據(jù)集并行處理系統(tǒng),從數(shù)據(jù)庫讀取,轉(zhuǎn)換和分析數(shù)據(jù)。

Hadoop生態(tài)系統(tǒng)

Hadoop生態(tài)系統(tǒng)

基于Hadoop技術(shù)架構(gòu)數(shù)據(jù)倉庫搭建

在大數(shù)據(jù)處理當(dāng)中,最核心要解決的其實(shí)就是兩個問題,大數(shù)據(jù)存儲和大數(shù)據(jù)計(jì)算。在Hadoop生態(tài)當(dāng)中,解決大數(shù)據(jù)存儲,主要依靠就是HDFS,再配合數(shù)據(jù)庫去完成。而MapReduce為海量的數(shù)據(jù)提供了計(jì)算。

1.2.1 HDFS

HDFS是什么
HDFS即Hadoop distributed file system(hadoop分布式文件系統(tǒng)),在Hadoop當(dāng)中負(fù)責(zé)分布式存儲階段的任務(wù),它是一個分布式的文件系統(tǒng),也可以用來存放單個機(jī)器的數(shù)據(jù),只是大部分用來存儲分布式數(shù)據(jù)。HDFS跟window下的NTFS一樣可以通過目錄樹來查找數(shù)據(jù)。
HDFS有什么用
整個Hadoop框架,本質(zhì)上來說都是基于分布式實(shí)現(xiàn)的,隨著數(shù)據(jù)規(guī)模的越來越大,一臺機(jī)器無法存儲所有的數(shù)據(jù),所以需要多臺機(jī)器來存儲。而多臺機(jī)器存儲又不方便管理,所以需要一個分布式的系統(tǒng)來管理分布在不同機(jī)器中的數(shù)據(jù)。
事實(shí)上,HDFS的出現(xiàn),就是為了解決分布式框架下,數(shù)據(jù)存儲管理的問題。而HDFS只是分布式文件系統(tǒng)中的一種,依托于Hadoop生態(tài),去承擔(dān)大數(shù)據(jù)分布式存儲的管理任務(wù)。
HDFS的概念與架構(gòu)
HDFS是典型的主/從架構(gòu)。HDFS集群由一個NameNode組成,NameNode是一個主服務(wù)器,它管理文件系統(tǒng)名稱空間并控制客戶機(jī)對文件的訪問。此外,還有許多datanode,通常是集群中的每個節(jié)點(diǎn)一個datanode,它們管理附加到它們所運(yùn)行的節(jié)點(diǎn)上的存儲。

HDFS架構(gòu)

Namenode:

即master,有以下功能

*管理文件系統(tǒng)命名空間;

*控制client對數(shù)據(jù)的讀取和寫入請求;

*管理數(shù)據(jù)塊到datanode的映射關(guān)系;

*管理副本策略。

Datanode:

即slave,主要是存儲文件塊數(shù)據(jù),接受來自namenode的指令,并執(zhí)行指令對數(shù)據(jù)塊的創(chuàng)建,刪除,復(fù)制等操作。

Client:

即客戶端,有以下功能:

*對文件的切分,HDFS上傳數(shù)據(jù)時,client將文件切分成多個block再進(jìn)行上傳;

*與namenode交互,獲取文件的索引信息;

*與datanode交互,對數(shù)據(jù)的讀取和寫入;

*在客戶端中提供相關(guān)HDFS的命令,比如對HDFS的管理,格式化namenode,對HDFS對數(shù)據(jù)操作,比如上傳文件到HDFS等。

Secondary namenode:

并非namenode的熱備,當(dāng)namenode掛掉的時候,并不能立馬替換namenode并提供服務(wù),只是在定時的對namenode進(jìn)行備份,存在一定的時間誤差,secondary會備份namenode的Fsimage和Edits,在緊急情況下,可以適用secondarynamenode來恢復(fù)部分的namenode。

關(guān)于大數(shù)據(jù)學(xué)習(xí),Hadoop HDFS存儲入門,以上就為大家做了簡單的介紹了。在Hadoop大數(shù)據(jù)框架當(dāng)中,HDFS作為分布式文件系統(tǒng),始終是重要的核心組件,學(xué)習(xí)當(dāng)中也自然需要深入理解掌握。

參考:
http://www.itdecent.cn/p/fe77da35b653

1.2.2 YARN

Yarn 的全稱是 Yet Another Resource Negotiator,意思是“另一種資源調(diào)度器”


YARN架構(gòu)

YARN應(yīng)用示例

Container

容器(Container)這個東西是 Yarn 對資源做的一層抽象。就像我們平時開發(fā)過程中,經(jīng)常需要對底層一些東西進(jìn)行封裝,只提供給上層一個調(diào)用接口一樣,Yarn 對資源的管理也是用到了這種思想。

Container

如上所示,Yarn 將CPU核數(shù),內(nèi)存這些計(jì)算資源都封裝成為一個個的容器(Container)。需要注意兩點(diǎn):

  • 容器由 NodeManager 啟動和管理,并被它所監(jiān)控。
  • 容器被 ResourceManager 進(jìn)行調(diào)度。

ResourceManager(RM)

從名字上我們就能知道這個組件是負(fù)責(zé)資源管理的,整個系統(tǒng)有且只有一個 RM ,來負(fù)責(zé)資源的調(diào)度。它也包含了兩個主要的組件:定時調(diào)用器(Scheduler)以及應(yīng)用管理器(ApplicationManager)。

  1. 定時調(diào)度器(Scheduler):從本質(zhì)上來說,定時調(diào)度器就是一種策略,或者說一種算法。當(dāng) Client 提交一個任務(wù)的時候,它會根據(jù)所需要的資源以及當(dāng)前集群的資源狀況進(jìn)行分配。注意,它只負(fù)責(zé)向應(yīng)用程序分配資源,并不做監(jiān)控以及應(yīng)用程序的狀態(tài)跟蹤。
  2. 應(yīng)用管理器(ApplicationManager):應(yīng)用管理器就是負(fù)責(zé)管理 Client 用戶提交的應(yīng)用。定時調(diào)度器(Scheduler)不對用戶提交的程序監(jiān)控,監(jiān)控應(yīng)用的工作正是由應(yīng)用管理器(ApplicationManager)完成的。

ApplicationMaster

每當(dāng) Client 提交一個 Application 時候,就會新建一個 ApplicationMaster 。由這個 ApplicationMaster 去與 ResourceManager 申請容器資源,獲得資源后會將要運(yùn)行的程序發(fā)送到容器上啟動,然后進(jìn)行分布式計(jì)算。

為什么是把運(yùn)行程序發(fā)送到容器上去運(yùn)行?如果以傳統(tǒng)的思路來看,是程序運(yùn)行著不動,然后數(shù)據(jù)進(jìn)進(jìn)出出不停流轉(zhuǎn)。但當(dāng)數(shù)據(jù)量大的時候就沒法這么玩了,因?yàn)楹A繑?shù)據(jù)移動成本太大,時間太長。大數(shù)據(jù)分布式計(jì)算就是這種思想,既然大數(shù)據(jù)難以移動,那我就把容易移動的應(yīng)用程序發(fā)布到各個節(jié)點(diǎn)進(jìn)行計(jì)算唄,這就是大數(shù)據(jù)分布式計(jì)算的思路。

NodeManager

NodeManager 是 ResourceManager 在每臺機(jī)器的上代理,負(fù)責(zé)容器的管理,并監(jiān)控他們的資源使用情況(cpu,內(nèi)存,磁盤及網(wǎng)絡(luò)等),以及向 ResourceManager/Scheduler 提供這些資源使用報(bào)告。

提交一個 Application 到 Yarn 的流程

YARN如何運(yùn)行一個應(yīng)用程序

這張圖簡單地標(biāo)明了提交一個程序所經(jīng)歷的流程,接下來我們來具體說說每一步的過程。

  1. Client 向 Yarn 提交 Application,這里我們假設(shè)是一個 MapReduce 作業(yè)。
  2. ResourceManager 向 NodeManager 通信,為該 Application 分配第一個容器。并在這個容器中運(yùn)行這個應(yīng)用程序?qū)?yīng)的 ApplicationMaster。
  3. ApplicationMaster 啟動以后,對 作業(yè)(也就是 Application) 進(jìn)行拆分,拆分 task 出來,這些 task 可以運(yùn)行在一個或多個容器中。然后向 ResourceManager 申請要運(yùn)行程序的容器,并定時向 ResourceManager 發(fā)送心跳。
  4. 申請到容器后,ApplicationMaster 會去和容器對應(yīng)的 NodeManager 通信,而后將作業(yè)分發(fā)到對應(yīng)的 NodeManager 中的容器去運(yùn)行,這里會將拆分后的 MapReduce 進(jìn)行分發(fā),對應(yīng)容器中運(yùn)行的可能是 Map 任務(wù),也可能是 Reduce 任務(wù)。
  5. 容器中運(yùn)行的任務(wù)會向 ApplicationMaster 發(fā)送心跳,匯報(bào)自身情況。當(dāng)程序運(yùn)行完成后, ApplicationMaster 再向 ResourceManager 注銷并釋放容器資源。

參考:https://zhuanlan.zhihu.com/p/54192454

1.2.3 MapReduce

MapReduce是Hadoop的核心框架之一,主要負(fù)責(zé)分布式并行計(jì)算。MapReduce 既是計(jì)算框架,也是編程模型,主要基于Java語言來編程,這也是為什么Hadoop學(xué)習(xí)要求要有一定的Java基礎(chǔ)。當(dāng)然,在這幾年的發(fā)展當(dāng)中,MapReduce的計(jì)算性能受到詬病,取而代之受到重用的是Spark。
MapReduce運(yùn)行過程,通常涉及到input、split、map、shuffle、reduce、output幾個階段,其中shuffle過程包括sort、copy、combine操作,reduce之前有時涉及二次排序。

MapReduce編程,主要有三種方式:

1、Hadoop streaming執(zhí)行mapreduce

2、Hive執(zhí)行mapreduce

3、Java MR編程

①Hadoop streaming執(zhí)行MapReduce

優(yōu)點(diǎn):

可以用大多數(shù)語言開發(fā);

代碼量少,開發(fā)速度快;

方便本地調(diào)試。

不足:

只能通過參數(shù)控制MR框架,控制性較弱,比如定制partitioner、combiner;

支持的數(shù)據(jù)類型和數(shù)據(jù)結(jié)構(gòu)有限,不適合做復(fù)雜處理,處理字符型較多;

②Hive執(zhí)行MapReduce

將類SQL轉(zhuǎn)換成MapReduce,定位于數(shù)據(jù)倉庫。

優(yōu)點(diǎn):

開發(fā)速度快,易調(diào)試,易理解;

易于構(gòu)建數(shù)據(jù)倉庫模型;

內(nèi)置函數(shù)功能齊全,比如rownumber等窗口函數(shù);

可擴(kuò)展性好,比如自定義存儲格式、自定義函數(shù)UDF;

多接口,比如JDBC、Thrift、Rest等。

缺點(diǎn):

不能用于復(fù)雜計(jì)算,比如涉及時序處理的數(shù)據(jù);

可控制性較弱,比如partition、關(guān)聯(lián)等操作。

③Java MR編程

用Java編寫MR,可以說是最“原始”的一種方式,Java面向?qū)ο缶幊?,設(shè)計(jì)模式成熟,通用性好,并且Java方面第三方類庫非常豐富。

優(yōu)點(diǎn):

定制性強(qiáng),比如定制partitioner、定制combiner等;

數(shù)據(jù)類型和數(shù)據(jù)結(jié)構(gòu)豐富,隊(duì)列、堆棧、自定義類等使用方便;

控制性非常高,包括MR運(yùn)行過程的一些控制,Map端join等;

可以方便使用Hadoop組件類庫中的類或工具,比如HDFS文件操作類等。

缺點(diǎn):

相比Hive、Hadoop streaming或Pyspark,開發(fā)代碼量較大,對開發(fā)環(huán)境要求高且不易調(diào)試;

通常每個操作都要寫一個MR類;

不如Spark執(zhí)行效率高。

參考:http://www.itdecent.cn/p/3c9aa4214dd3

1.3 Hadoop的特點(diǎn)

① 跨平臺性:
hadoop是基于java語言開發(fā)的,有很好的跨平臺性,可以運(yùn)行在Linux平臺上;
② 高可靠性:
hadoop中的HDFS是分布式文件系統(tǒng),可以將海量數(shù)據(jù)分布冗余存儲在不同的機(jī)器節(jié)點(diǎn)上,即使是某個機(jī)器副本上發(fā)生故障,其他的機(jī)器副本也能正常運(yùn)行;
③ 高容錯性:
HDFS把把文件分布存儲在很多不同的機(jī)器節(jié)點(diǎn)上,能實(shí)現(xiàn)自動保存多個副本,因此某個節(jié)點(diǎn)上的任務(wù)失敗后也能實(shí)現(xiàn)自動重新分配;
④ 高效性:
hadoop的核心組件HDFS和MapReduce,一個負(fù)責(zé)分布式存儲一個負(fù)責(zé)分布式處理,能夠處理PB級別的數(shù)據(jù);

⑤ 低成本與高擴(kuò)展:
hadoop在廉價的計(jì)算機(jī)集群上就可以運(yùn)行,因此成本比較低,并且可以擴(kuò)展到幾千個計(jì)算機(jī)節(jié)點(diǎn)上,完成海量數(shù)據(jù)的存儲和計(jì)算。

參考:
https://zhuanlan.zhihu.com/p/369486877

1.4 應(yīng)用實(shí)例

  • Adobe

    • We use Apache Hadoop and Apache HBase in several areas from social services to structured data storage and processing for internal use.
    • We currently have about 30 nodes running HDFS, Hadoop and HBase in clusters ranging from 5 to 14 nodes on both production and development. We plan a deployment on an 80 nodes cluster.
    • We constantly write data to Apache HBase and run MapReduce jobs to process then store it back to Apache HBase or external systems.
    • Our production cluster has been running since Oct 2008.
  • Alibaba

    • A 15-node cluster dedicated to processing sorts of business data dumped out of database and joining them together. These data will then be fed into iSearch, our vertical search engine.
    • Each node has 8 cores, 16G RAM and 1.4T storage.
  • Facebook
    • We use Apache Hadoop to store copies of internal log and dimension data sources and use it as a source for reporting/analytics and machine learning.
    • Currently we have 2 major clusters:
      • A 1100-machine cluster with 8800 cores and about 12 PB raw storage.
      • A 300-machine cluster with 2400 cores and about 3 PB raw storage.
      • Each (commodity) node has 8 cores and 12 TB of storage.
      • We are heavy users of both streaming as well as the Java APIs. We have built a higher level data warehousing framework using these features called Hive (see the http://hadoop.apache.org/hive/). We have also developed a FUSE implementation over HDFS.

參考:https://cwiki.apache.org/confluence/display/hadoop2/PoweredBy

2. Hadoop安裝

2.1 安裝前的準(zhǔn)備

2.1.1 支持的平臺

  • GNU/Linux是產(chǎn)品開發(fā)和運(yùn)行的平臺。 Hadoop已在有2000個節(jié)點(diǎn)的GNU/Linux主機(jī)組成的集群系統(tǒng)上得到驗(yàn)證。
    本文將以Ubuntu 18.04作為安裝平臺
  • Windows平臺同樣受官方支持??蓞⒖脊俜轿臋n或者自行百度谷歌https://cwiki.apache.org/confluence/display/HADOOP2/Hadoop2OnWindows

2.1.2 需要提前安裝的軟件

Java

官方現(xiàn)在推薦OpenJDK,其他版本(比如Oracle)也可以
Hadoop 3.3.x支持Java 8 和 Java 11,推薦Java 8
Hadoop 3.0.x 到 3.2.x n只支持 Java 8
Hadoop from 2.7.x 到 2.10.x 同時支持 Java 7 and 8
本文安裝Hadoop 3.2.2, OpenJDK版本為系統(tǒng)自帶的1.8.0

ssh

必須安裝并且保證 sshd一直運(yùn)行,以便用Hadoop 腳本管理遠(yuǎn)端Hadoop守護(hù)進(jìn)程
安裝ssh

 sudo apt-get install ssh

2.1.3 下載Hadoop

考慮到網(wǎng)速以及以后適配Spark的版本問題,我們選擇比較新的Hadoop 3.2.2穩(wěn)定發(fā)行版。香港可以直接從官網(wǎng)下載https://hadoop.apache.org/releases.html
內(nèi)地可以使用鏡像網(wǎng)站,比如清華鏡像
https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-3.2.2/

清華大學(xué)開源鏡像

2.2 安裝過程(單機(jī)版)

參考
http://www.itdecent.cn/p/cdae5bab030f
https://blog.csdn.net/weixin_42001089/article/details/81865101
https://blog.csdn.net/qq_44830040/article/details/104873759
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
集群配置
https://blog.csdn.net/qq_39785575/article/details/106300628

2.3 環(huán)境變量設(shè)置

1d0aee74363e019589680eff8e31891.jpg

2.4 Hadoop 啟動

格式化namenode

hadoop namenode -format
image.png

啟動hdfs

start-all.sh
image.png

查看相應(yīng)進(jìn)程

jps

瀏覽器訪問

localhost:9870
image.png

3. Hadoop 應(yīng)用

3.1 上傳文件到HDFS

3.1.1 Linux 命令行

(1)格式畫NameNode

hdfs namenode -format

(2) 啟動hdfs

start-all.sh

(3)查看hdfs狀態(tài)

hdfs dfsadmin -report

(4)上傳文件到hdfs

hdfs dfs -put 要上傳的文件路徑  目標(biāo)路徑
#舉例
hdfs dfs -put test.txt /

(5)查看hdfs路徑下的文件

hdfs dfs -ls /

(6)讀取hdfs文件內(nèi)容

hdfs dfs -cat /test.txt

(7)刪除hdfs文件

hdfs dfs -rm /test.txt

3.1.2 Java

3.2 MapReduce

3.2.1 MapReduce是什么

(1) MapReduce是一種分布式計(jì)算框架 ,以一種可靠的,具有容錯能力的方式并行地處理上TB級別的海量數(shù)據(jù)集。主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)的計(jì)算問題。
(2) MapReduce有兩個階段組成:Map和Reduce,用戶只需實(shí)現(xiàn)map()和reduce()兩個函數(shù),即可實(shí)現(xiàn)分布式計(jì)算。

3.2.2 MapReduce做什么

(1) MapReduce框架由Map和Reduce組成。
(2) Map()負(fù)責(zé)把一個大的block塊進(jìn)行切片并計(jì)算。
(3) Reduce() 負(fù)責(zé)把Map()切片的數(shù)據(jù)進(jìn)行匯總、計(jì)算。

3.2.3 MapReduce怎么做

image

(1) 第一步對輸入的數(shù)據(jù)進(jìn)行切片,每個切片分配一個map()任務(wù),map()對其中的數(shù)據(jù)進(jìn)行計(jì)算,對每個數(shù)據(jù)用鍵值對的形式記錄,然后輸出到環(huán)形緩沖區(qū)(圖中sort的位置)。
(2) map()中輸出的數(shù)據(jù)在環(huán)形緩沖區(qū)內(nèi)進(jìn)行快排,每個環(huán)形緩沖區(qū)默認(rèn)大小100M,當(dāng)數(shù)據(jù)達(dá)到80M時(默認(rèn)),把數(shù)據(jù)輸出到磁盤上。形成很多個內(nèi)部有序整體無序的小文件。
(3) 框架把磁盤中的小文件傳到Reduce()中來,然后進(jìn)行歸并排序,最終輸出。

3.2.4 要點(diǎn)是什么

MapReduce是六大過程,簡單來說:Input, Split, Map, Shuffle, Reduce, Finalize
(1) MapReduce將輸入的數(shù)據(jù)進(jìn)行邏輯切片,一片對應(yīng)一個Map任務(wù)
(2) Map以并行的方式處理切片
(3) 框架對Map輸出進(jìn)行排序,然后發(fā)給Reduce
(4) MapReduce的輸入輸出數(shù)據(jù)處于同一個文件系統(tǒng)(HDFS)
(5) 框架負(fù)責(zé)任務(wù)調(diào)度、任務(wù)監(jiān)控、失敗任務(wù)的重新執(zhí)行
(6) 框架會對鍵和值進(jìn)行序列化,因此鍵和值需要實(shí)現(xiàn)writable接口,框架會對鍵排序,因此必須實(shí)現(xiàn)writableComparable接口。

3.1.5 舉個例子

如何統(tǒng)計(jì)1TB或1PB文件里的單詞數(shù)呢

image.png

(1) Input: 我們輸入很多文檔,文檔的每一行有很多不同的單詞
(2) Split: 找不同的worker分配不同的任務(wù),就是Split過程,那怎么切分呢?每一行文檔分給一個worker
(3) Map: 就切分成單詞和它出現(xiàn)的次數(shù),寫成鍵值對,每次出現(xiàn)的次數(shù)是1,就寫1。
(4) Shuffle就是把不同的單詞繼續(xù)放到同樣的盒子里面,Bear放一起,Car放一起,這可以由Shuffle寫的時候算法來決定。當(dāng)然現(xiàn)在很多智能都不用做了,有時候還需要隨機(jī)采樣的方式來實(shí)現(xiàn)。
(5) Reduce,就是把相同的數(shù)據(jù)放一起,比如Car有3個就寫3,River是2個就寫2,最后再放到一起,這樣便于提供服務(wù),得到最終結(jié)果。
大家可以看到最后箭頭指過來是亂序的,也就是說這個執(zhí)行過程實(shí)際上是高度并行的,也不用等待每個都完成,所以說這是一個很好的優(yōu)化過程。

Hadoop Streaming是一種運(yùn)行作業(yè)的實(shí)用工具,它允許用戶創(chuàng)建和運(yùn)行任何可執(zhí)行程序 (例如:Java, Python, Shell工具)來做為mapper和reducer。

代碼形式:


image.png

Map輸入每一行的ID是key,value是這一行的單詞。有這個結(jié)果以后就可以統(tǒng)計(jì)每個單詞出現(xiàn)的次數(shù)。Reduce輸入的還是各個單詞,但后面跟的是一個串,是在里面出現(xiàn)的次數(shù),1,1,1,我們把1加到一起就是sum的過程,這就是MapReduce的整個過程。輸出我們的關(guān)鍵詞和它的出現(xiàn)次數(shù)

3.1.6 再舉個例子

參考:
https://zhuanlan.zhihu.com/p/55884610
https://www.cnblogs.com/duanzi6/p/11348960.html

將天氣數(shù)據(jù)每年整合成一個文件,存入hdfs

hdfs dfs -mkdir /ncdc
hdfs dfs -put ~/ncdc/raw/ /ncdc

hdfs 查看命令

hdfs dfs -ls -R 

以NCDC原始數(shù)據(jù)求每個年份最高氣溫為例:
首先原始數(shù)據(jù)如下(去除了一些列用省略號表示),這些數(shù)據(jù)就是map函數(shù)的輸入

0067011990999991950051507004...9999999N9+00001+99999999999999...
0043011990999991950051512004...9999999N9+00221+99999999999999...
0043011990999991950051518004...9999999N9-00111+99999999999999...
0043012650999991949032412004...0500001N9+01111+99999999999999...
0043012650999991949032418004...0500001N9+00781+99999999999999...

這些行以鍵值對的方式作為map函數(shù)的輸入:

(0,0067011990999991950051507004...9999999N9+00001+99999999999999...)
(106,0043011990999991950051512004...9999999N9+00221+99999999999999...)
(212,0043011990999991950051518004...9999999N9-00111+99999999999999...)
(318,0043012650999991949032412004...0500001N9+01111+99999999999999...)
(424,0043012650999991949032418004...0500001N9+00781+99999999999999...)

其中key是文件中的行偏移量,map函數(shù)會提取輸入中的年份和氣溫信息(粗體),并作為輸出:

(1950,0)
(1950,22)
(1950,-11)
(1949,111)
(1949,78)

map函數(shù)的輸出會經(jīng)過MapReduce計(jì)算框架基于鍵的排序和分組后發(fā)送到reduce函數(shù)作為輸入:

(1949,[111,78])
(1950,[0,22,-11])

reduce遍歷值列表找到最大溫度后返回:

(1949,111)
(1950,22)


整個MapReduce計(jì)算流程如下:

image

從本地模式到分布式集群計(jì)算

處理少量輸入數(shù)據(jù)并不能體現(xiàn)MapReduce計(jì)算框架的優(yōu)勢,當(dāng)有大量輸入的數(shù)據(jù)流時,我們需要分布式文件系統(tǒng)HDFS)和Hadoop資源管理系統(tǒng)YARN)實(shí)現(xiàn)集群分布式計(jì)算。

一、術(shù)語

先了解一下關(guān)于數(shù)據(jù)流的一些術(shù)語

Job:MapReduce作業(yè),是客戶端需要執(zhí)行的一個工作單元:包括輸入數(shù)據(jù)、MapReduce程序和配置信息
Task:Hadoop會將作業(yè)job分成若干個任務(wù)(task)執(zhí)行,其中包括兩類任務(wù):map任務(wù)和Reduce任務(wù)
Input split:輸入分片,Hadoop會將MapReduce的輸入數(shù)據(jù)劃分成等長的小數(shù)據(jù)塊,稱為“分片”,Hadoop為每個分片構(gòu)建一個map任務(wù),并由該任務(wù)運(yùn)行用戶自定義的map函數(shù)從而處理分片中的每條記錄。

二、分片

1、分片的意義

處理單個分片的時間小于處理整個輸入數(shù)據(jù)花費(fèi)的時間,因此并行處理每個分片且每個分片數(shù)據(jù)比較小的話,則整個處理過程會獲得更好的負(fù)載平衡(因?yàn)橐慌_較快的計(jì)算機(jī)能夠處理的數(shù)據(jù)分片比一臺較慢的計(jì)算機(jī)更多,且成一定比例)。

2、分片的大小

盡管隨著分片切分得更細(xì),負(fù)載平衡的質(zhì)量也會更高。但是分片切分得太小的時候,管理分片的總時間和構(gòu)建map任務(wù)的總時間將決定整個作業(yè)的執(zhí)行時間。
對于大多數(shù)作業(yè)來說,一個合理的分片大小趨于HDFS一個塊的大小,默認(rèn)是128MB。

三、數(shù)據(jù)本地化優(yōu)化(map任務(wù))

Hadoop在存儲有輸入數(shù)據(jù)(HDFS中的數(shù)據(jù))的節(jié)點(diǎn)上運(yùn)行map任務(wù),可以獲得最佳性能(因?yàn)闊o需使用寶貴的集群帶寬資源),這就是“數(shù)據(jù)本地化優(yōu)化”(data locality optimization)。

1、本地?cái)?shù)據(jù)、本地機(jī)架與跨機(jī)架map任務(wù)

有時候存儲該分片的HDFS數(shù)據(jù)塊復(fù)本的所有節(jié)點(diǎn)可能正在運(yùn)行其他map任務(wù),此時作業(yè)調(diào)度需要從某一數(shù)據(jù)塊所在的機(jī)架中一個節(jié)點(diǎn)尋找一個空閑的map槽(slot)來運(yùn)行該map任務(wù)分片。特別偶然的情況下(幾乎不會發(fā)生)會使用其他機(jī)架中的節(jié)點(diǎn)運(yùn)行該map任務(wù),這將導(dǎo)致機(jī)架與機(jī)架之間的網(wǎng)絡(luò)傳輸。下圖顯示了這三種可能性。

image

2、數(shù)據(jù)本地化原則決定了最佳分片大小

數(shù)據(jù)本地化的原則解釋了為什么最佳分片大小應(yīng)該與HDFS塊大小相同:因?yàn)檫@是確??梢源鎯υ趩蝹€節(jié)點(diǎn)上最大輸入塊的大小。

3、reduce任務(wù)不具備數(shù)據(jù)本地化的優(yōu)勢

單個reduce任務(wù)的輸入通常來自于所有mapper的輸出。排過序的map輸出需通過網(wǎng)絡(luò)傳輸發(fā)送到運(yùn)行reduce任務(wù)的節(jié)點(diǎn),數(shù)據(jù)在reduce端合并并由用戶定義的reduce函數(shù)處理。

四、MapReduce任務(wù)數(shù)據(jù)流

reduce任務(wù)的數(shù)量并非由輸入數(shù)據(jù)的大小決定,而是獨(dú)立指定的。

真實(shí)的應(yīng)用中,幾乎所有作業(yè)都會把reducer的個數(shù)設(shè)置成較大的數(shù)字,否則由于所有中間數(shù)據(jù)都會放到一個reduce任務(wù)中,作業(yè)的處理效率就會及其低下。
增加reducer的數(shù)量能縮短reduce進(jìn)程;但是reducer數(shù)量過多又會導(dǎo)致小文件過多而不夠優(yōu)化。一條經(jīng)驗(yàn)法則是:目標(biāo)reducer保持每個運(yùn)行在5分鐘左右,且產(chǎn)生至少一個HDFS塊的輸出比較合適。

1、單個reduce任務(wù)的MapReduce數(shù)據(jù)流

image

虛線框表示節(jié)點(diǎn),虛線箭頭表示節(jié)點(diǎn)內(nèi)部的數(shù)據(jù)傳輸,實(shí)線箭頭表示不同節(jié)點(diǎn)之間的數(shù)據(jù)傳輸。

2、多個reduce任務(wù)的MapReduce數(shù)據(jù)流

image

map任務(wù)到reduce任務(wù)的數(shù)據(jù)流稱為shuffle(混洗,類似洗牌的意思),每個reduce任務(wù)的輸入都來自許多map任務(wù)。shuffle比圖示的更加復(fù)雜而且調(diào)整shuffle參數(shù)對作業(yè)總執(zhí)行時間的影響非常大。

3、無reduce任務(wù)

當(dāng)數(shù)據(jù)完全可以并行處理時可能會出現(xiàn)無reduce任務(wù)的情況,唯一的非本地節(jié)點(diǎn)數(shù)據(jù)傳輸是map任務(wù)將結(jié)果寫入HDFS。

image

五、combiner函數(shù)(減少map和reduce之間的數(shù)據(jù)傳輸)

由前面的描述我們知道數(shù)據(jù)傳輸會占用集群上的可用帶寬資源,從而限制了MapReduce作業(yè)的數(shù)量,因此我們應(yīng)該盡量避免map和reduce任務(wù)之間的數(shù)據(jù)傳輸。combiner作為一個中間函數(shù)簡化map任務(wù)的輸出從而減少了map任務(wù)和reduce任務(wù)之間的數(shù)據(jù)傳輸。


舉個例子
假設(shè)我們有一個計(jì)算每年最高氣溫的任務(wù),1950年的讀數(shù)由兩個map任務(wù)處理(因?yàn)樗鼈冊诓煌姆制校?。假設(shè)第一個map的輸出如下:

(1950,0)
(1950,20)
(1950,10)

第二個map的輸出如下:

(1950,25)
(1950,15)

reduce函數(shù)調(diào)用時,輸入如下:

(1950,[0,20,10,25,15)

reduce的輸出如下:

(1950,25)

為了優(yōu)化reduce函數(shù)的輸入,我們可以使用combiner找到每個map任務(wù)輸出結(jié)果中的最高氣溫。那么可以將reduce函數(shù)調(diào)用的輸入更改為:

(1950,[20,25])
其中20和25為每個map函數(shù)輸出結(jié)果中的最大值

我們可以用下面表達(dá)式來說明尋找最高氣溫的MapReduce過程中combiner的作用

max(0,20,10,25,15) = max(max(0,20,10),max(25,15)) = max(20,25)=25


很遺憾,并非所有函數(shù)都具有該屬性,比如計(jì)算平均值時就不能用mean作為combiner函數(shù),因?yàn)椋?/p>

mean(0,20,10,25,15) = 14
mean(mean(0,20,10),mean(25,15)) = mean(10,20) = 15

另外,combiner函數(shù)并不能取代reduce函數(shù)的位置,因?yàn)槲覀內(nèi)匀恍枰猺educe函數(shù)來處理不同map輸出中具有相同鍵的記錄。

寫MapReduce程序(Hadoop streaming)

Hadoop Streaming使用Unix標(biāo)準(zhǔn)流作為Hadoop和應(yīng)用程序之間的接口,允許程序員用多種語言寫MapReduce程序。
Streaming天生適合于文本處理。map的輸入數(shù)據(jù)通過標(biāo)準(zhǔn)輸入傳遞給map函數(shù),并且是一行一行地傳輸,并且將結(jié)果行寫到標(biāo)準(zhǔn)輸出。map輸出的鍵-值對是以一個制表符分隔的行,reduce輸入格式與之相同。reduce函數(shù)從標(biāo)準(zhǔn)輸入流中讀取輸入行,該輸入已經(jīng)由Hadoop框架根據(jù)鍵排過序,最后將結(jié)果寫入標(biāo)準(zhǔn)輸出。
接下里我們用Streaming重寫按年份查找最高氣溫的MapReduce程序。
Map函數(shù)

import re
import sys

for line in sys.stdin:
    val = line.strip()
    (year,temp,q) = (val[15:19],val[87:92],val[92:93])
    if (temp != "+9999" and re.match("[01459]",q)
        print ("%s\t%s" % (year,temp))

Reduce函數(shù)



import sys 
(last_key,max_val) = (None,-sys.maxint) 
for line in sys.stdin: 
    (key,val) = line.strip().split("\t") 
    if last_key and last_key != key: 
        print ("%s\t%s" % (last_key,max_val))
        (last_key,max_val) = (key,int(val)) 
    else: 
        (last_key,max_val = (key,max(max_val,int(val))) 
if last_key: 
    print("%s\t%s" % (last_key,max_val))


示例3:
https://zhuanlan.zhihu.com/p/34903460

參考
https://zhuanlan.zhihu.com/p/62135686
https://zhuanlan.zhihu.com/p/32172999
https://zhuanlan.zhihu.com/p/55884610
https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容