1. Hadoop簡介

1.1 背景介紹
Hadoop是Apache旗下的開源的分布式計(jì)算平臺,它可以運(yùn)行在計(jì)算機(jī)集群之上,提供可靠的、可擴(kuò)展的分布式計(jì)算功能。
Hadoop與三遍論文密不可分:
① 2003年,谷歌發(fā)布的分布式文件系統(tǒng)GFS的論文,可以用于解決海量數(shù)據(jù)存儲的問題。
② 2004年,谷歌發(fā)布了MapReduce的論文,可以用于解決海量數(shù)據(jù)計(jì)算的問題。
③ 2006年,谷歌發(fā)布了BigTable的論文,它是以GFS為底層數(shù)據(jù)存儲的分布式存儲系統(tǒng)。
Hadoop中的HDFS是GFS的開源實(shí)現(xiàn);
Hadoop中的MapReduce是谷歌MapReduce的開源實(shí)現(xiàn);
Hadoop中的HBase是谷歌BigTable的開源實(shí)現(xiàn)。
1.2 Hadoop核心組件
Hadoop由四個主要模塊組成:
Hadoop Common: 一組工具和庫,可補(bǔ)充其他模塊并確保與用戶計(jì)算機(jī)系統(tǒng)的兼容性;
Hadoop Distributed File System (HDFS):提供可對應(yīng)用數(shù)據(jù)進(jìn)行高吞吐量訪問的分布式文件系統(tǒng)。;
Hadoop YARN(Yet Another Resource Negotiator):作業(yè)調(diào)度和集群資源管理框架;
Hadoop MapReduce:基于YARN的大數(shù)據(jù)集并行處理系統(tǒng),從數(shù)據(jù)庫讀取,轉(zhuǎn)換和分析數(shù)據(jù)。
Hadoop生態(tài)系統(tǒng)


在大數(shù)據(jù)處理當(dāng)中,最核心要解決的其實(shí)就是兩個問題,大數(shù)據(jù)存儲和大數(shù)據(jù)計(jì)算。在Hadoop生態(tài)當(dāng)中,解決大數(shù)據(jù)存儲,主要依靠就是HDFS,再配合數(shù)據(jù)庫去完成。而MapReduce為海量的數(shù)據(jù)提供了計(jì)算。
1.2.1 HDFS
HDFS是什么
HDFS即Hadoop distributed file system(hadoop分布式文件系統(tǒng)),在Hadoop當(dāng)中負(fù)責(zé)分布式存儲階段的任務(wù),它是一個分布式的文件系統(tǒng),也可以用來存放單個機(jī)器的數(shù)據(jù),只是大部分用來存儲分布式數(shù)據(jù)。HDFS跟window下的NTFS一樣可以通過目錄樹來查找數(shù)據(jù)。
HDFS有什么用
整個Hadoop框架,本質(zhì)上來說都是基于分布式實(shí)現(xiàn)的,隨著數(shù)據(jù)規(guī)模的越來越大,一臺機(jī)器無法存儲所有的數(shù)據(jù),所以需要多臺機(jī)器來存儲。而多臺機(jī)器存儲又不方便管理,所以需要一個分布式的系統(tǒng)來管理分布在不同機(jī)器中的數(shù)據(jù)。
事實(shí)上,HDFS的出現(xiàn),就是為了解決分布式框架下,數(shù)據(jù)存儲管理的問題。而HDFS只是分布式文件系統(tǒng)中的一種,依托于Hadoop生態(tài),去承擔(dān)大數(shù)據(jù)分布式存儲的管理任務(wù)。
HDFS的概念與架構(gòu)
HDFS是典型的主/從架構(gòu)。HDFS集群由一個NameNode組成,NameNode是一個主服務(wù)器,它管理文件系統(tǒng)名稱空間并控制客戶機(jī)對文件的訪問。此外,還有許多datanode,通常是集群中的每個節(jié)點(diǎn)一個datanode,它們管理附加到它們所運(yùn)行的節(jié)點(diǎn)上的存儲。

Namenode:
即master,有以下功能
*管理文件系統(tǒng)命名空間;
*控制client對數(shù)據(jù)的讀取和寫入請求;
*管理數(shù)據(jù)塊到datanode的映射關(guān)系;
*管理副本策略。
Datanode:
即slave,主要是存儲文件塊數(shù)據(jù),接受來自namenode的指令,并執(zhí)行指令對數(shù)據(jù)塊的創(chuàng)建,刪除,復(fù)制等操作。
Client:
即客戶端,有以下功能:
*對文件的切分,HDFS上傳數(shù)據(jù)時,client將文件切分成多個block再進(jìn)行上傳;
*與namenode交互,獲取文件的索引信息;
*與datanode交互,對數(shù)據(jù)的讀取和寫入;
*在客戶端中提供相關(guān)HDFS的命令,比如對HDFS的管理,格式化namenode,對HDFS對數(shù)據(jù)操作,比如上傳文件到HDFS等。
Secondary namenode:
并非namenode的熱備,當(dāng)namenode掛掉的時候,并不能立馬替換namenode并提供服務(wù),只是在定時的對namenode進(jìn)行備份,存在一定的時間誤差,secondary會備份namenode的Fsimage和Edits,在緊急情況下,可以適用secondarynamenode來恢復(fù)部分的namenode。
關(guān)于大數(shù)據(jù)學(xué)習(xí),Hadoop HDFS存儲入門,以上就為大家做了簡單的介紹了。在Hadoop大數(shù)據(jù)框架當(dāng)中,HDFS作為分布式文件系統(tǒng),始終是重要的核心組件,學(xué)習(xí)當(dāng)中也自然需要深入理解掌握。
參考:
http://www.itdecent.cn/p/fe77da35b653
1.2.2 YARN
Yarn 的全稱是 Yet Another Resource Negotiator,意思是“另一種資源調(diào)度器”


Container
容器(Container)這個東西是 Yarn 對資源做的一層抽象。就像我們平時開發(fā)過程中,經(jīng)常需要對底層一些東西進(jìn)行封裝,只提供給上層一個調(diào)用接口一樣,Yarn 對資源的管理也是用到了這種思想。

如上所示,Yarn 將CPU核數(shù),內(nèi)存這些計(jì)算資源都封裝成為一個個的容器(Container)。需要注意兩點(diǎn):
- 容器由 NodeManager 啟動和管理,并被它所監(jiān)控。
- 容器被 ResourceManager 進(jìn)行調(diào)度。
ResourceManager(RM)
從名字上我們就能知道這個組件是負(fù)責(zé)資源管理的,整個系統(tǒng)有且只有一個 RM ,來負(fù)責(zé)資源的調(diào)度。它也包含了兩個主要的組件:定時調(diào)用器(Scheduler)以及應(yīng)用管理器(ApplicationManager)。
- 定時調(diào)度器(Scheduler):從本質(zhì)上來說,定時調(diào)度器就是一種策略,或者說一種算法。當(dāng) Client 提交一個任務(wù)的時候,它會根據(jù)所需要的資源以及當(dāng)前集群的資源狀況進(jìn)行分配。注意,它只負(fù)責(zé)向應(yīng)用程序分配資源,并不做監(jiān)控以及應(yīng)用程序的狀態(tài)跟蹤。
- 應(yīng)用管理器(ApplicationManager):應(yīng)用管理器就是負(fù)責(zé)管理 Client 用戶提交的應(yīng)用。定時調(diào)度器(Scheduler)不對用戶提交的程序監(jiān)控,監(jiān)控應(yīng)用的工作正是由應(yīng)用管理器(ApplicationManager)完成的。
ApplicationMaster
每當(dāng) Client 提交一個 Application 時候,就會新建一個 ApplicationMaster 。由這個 ApplicationMaster 去與 ResourceManager 申請容器資源,獲得資源后會將要運(yùn)行的程序發(fā)送到容器上啟動,然后進(jìn)行分布式計(jì)算。
為什么是把運(yùn)行程序發(fā)送到容器上去運(yùn)行?如果以傳統(tǒng)的思路來看,是程序運(yùn)行著不動,然后數(shù)據(jù)進(jìn)進(jìn)出出不停流轉(zhuǎn)。但當(dāng)數(shù)據(jù)量大的時候就沒法這么玩了,因?yàn)楹A繑?shù)據(jù)移動成本太大,時間太長。大數(shù)據(jù)分布式計(jì)算就是這種思想,既然大數(shù)據(jù)難以移動,那我就把容易移動的應(yīng)用程序發(fā)布到各個節(jié)點(diǎn)進(jìn)行計(jì)算唄,這就是大數(shù)據(jù)分布式計(jì)算的思路。
NodeManager
NodeManager 是 ResourceManager 在每臺機(jī)器的上代理,負(fù)責(zé)容器的管理,并監(jiān)控他們的資源使用情況(cpu,內(nèi)存,磁盤及網(wǎng)絡(luò)等),以及向 ResourceManager/Scheduler 提供這些資源使用報(bào)告。
提交一個 Application 到 Yarn 的流程

這張圖簡單地標(biāo)明了提交一個程序所經(jīng)歷的流程,接下來我們來具體說說每一步的過程。
- Client 向 Yarn 提交 Application,這里我們假設(shè)是一個 MapReduce 作業(yè)。
- ResourceManager 向 NodeManager 通信,為該 Application 分配第一個容器。并在這個容器中運(yùn)行這個應(yīng)用程序?qū)?yīng)的 ApplicationMaster。
- ApplicationMaster 啟動以后,對 作業(yè)(也就是 Application) 進(jìn)行拆分,拆分 task 出來,這些 task 可以運(yùn)行在一個或多個容器中。然后向 ResourceManager 申請要運(yùn)行程序的容器,并定時向 ResourceManager 發(fā)送心跳。
- 申請到容器后,ApplicationMaster 會去和容器對應(yīng)的 NodeManager 通信,而后將作業(yè)分發(fā)到對應(yīng)的 NodeManager 中的容器去運(yùn)行,這里會將拆分后的 MapReduce 進(jìn)行分發(fā),對應(yīng)容器中運(yùn)行的可能是 Map 任務(wù),也可能是 Reduce 任務(wù)。
- 容器中運(yùn)行的任務(wù)會向 ApplicationMaster 發(fā)送心跳,匯報(bào)自身情況。當(dāng)程序運(yùn)行完成后, ApplicationMaster 再向 ResourceManager 注銷并釋放容器資源。
參考:https://zhuanlan.zhihu.com/p/54192454
1.2.3 MapReduce
MapReduce是Hadoop的核心框架之一,主要負(fù)責(zé)分布式并行計(jì)算。MapReduce 既是計(jì)算框架,也是編程模型,主要基于Java語言來編程,這也是為什么Hadoop學(xué)習(xí)要求要有一定的Java基礎(chǔ)。當(dāng)然,在這幾年的發(fā)展當(dāng)中,MapReduce的計(jì)算性能受到詬病,取而代之受到重用的是Spark。
MapReduce運(yùn)行過程,通常涉及到input、split、map、shuffle、reduce、output幾個階段,其中shuffle過程包括sort、copy、combine操作,reduce之前有時涉及二次排序。
MapReduce編程,主要有三種方式:
1、Hadoop streaming執(zhí)行mapreduce
2、Hive執(zhí)行mapreduce
3、Java MR編程
①Hadoop streaming執(zhí)行MapReduce
優(yōu)點(diǎn):
可以用大多數(shù)語言開發(fā);
代碼量少,開發(fā)速度快;
方便本地調(diào)試。
不足:
只能通過參數(shù)控制MR框架,控制性較弱,比如定制partitioner、combiner;
支持的數(shù)據(jù)類型和數(shù)據(jù)結(jié)構(gòu)有限,不適合做復(fù)雜處理,處理字符型較多;
②Hive執(zhí)行MapReduce
將類SQL轉(zhuǎn)換成MapReduce,定位于數(shù)據(jù)倉庫。
優(yōu)點(diǎn):
開發(fā)速度快,易調(diào)試,易理解;
易于構(gòu)建數(shù)據(jù)倉庫模型;
內(nèi)置函數(shù)功能齊全,比如rownumber等窗口函數(shù);
可擴(kuò)展性好,比如自定義存儲格式、自定義函數(shù)UDF;
多接口,比如JDBC、Thrift、Rest等。
缺點(diǎn):
不能用于復(fù)雜計(jì)算,比如涉及時序處理的數(shù)據(jù);
可控制性較弱,比如partition、關(guān)聯(lián)等操作。
③Java MR編程
用Java編寫MR,可以說是最“原始”的一種方式,Java面向?qū)ο缶幊?,設(shè)計(jì)模式成熟,通用性好,并且Java方面第三方類庫非常豐富。
優(yōu)點(diǎn):
定制性強(qiáng),比如定制partitioner、定制combiner等;
數(shù)據(jù)類型和數(shù)據(jù)結(jié)構(gòu)豐富,隊(duì)列、堆棧、自定義類等使用方便;
控制性非常高,包括MR運(yùn)行過程的一些控制,Map端join等;
可以方便使用Hadoop組件類庫中的類或工具,比如HDFS文件操作類等。
缺點(diǎn):
相比Hive、Hadoop streaming或Pyspark,開發(fā)代碼量較大,對開發(fā)環(huán)境要求高且不易調(diào)試;
通常每個操作都要寫一個MR類;
不如Spark執(zhí)行效率高。
參考:http://www.itdecent.cn/p/3c9aa4214dd3
1.3 Hadoop的特點(diǎn)
① 跨平臺性:
hadoop是基于java語言開發(fā)的,有很好的跨平臺性,可以運(yùn)行在Linux平臺上;
② 高可靠性:
hadoop中的HDFS是分布式文件系統(tǒng),可以將海量數(shù)據(jù)分布冗余存儲在不同的機(jī)器節(jié)點(diǎn)上,即使是某個機(jī)器副本上發(fā)生故障,其他的機(jī)器副本也能正常運(yùn)行;
③ 高容錯性:
HDFS把把文件分布存儲在很多不同的機(jī)器節(jié)點(diǎn)上,能實(shí)現(xiàn)自動保存多個副本,因此某個節(jié)點(diǎn)上的任務(wù)失敗后也能實(shí)現(xiàn)自動重新分配;
④ 高效性:
hadoop的核心組件HDFS和MapReduce,一個負(fù)責(zé)分布式存儲一個負(fù)責(zé)分布式處理,能夠處理PB級別的數(shù)據(jù);
⑤ 低成本與高擴(kuò)展:
hadoop在廉價的計(jì)算機(jī)集群上就可以運(yùn)行,因此成本比較低,并且可以擴(kuò)展到幾千個計(jì)算機(jī)節(jié)點(diǎn)上,完成海量數(shù)據(jù)的存儲和計(jì)算。
參考:
https://zhuanlan.zhihu.com/p/369486877
1.4 應(yīng)用實(shí)例
-
- We use Apache Hadoop and Apache HBase in several areas from social services to structured data storage and processing for internal use.
- We currently have about 30 nodes running HDFS, Hadoop and HBase in clusters ranging from 5 to 14 nodes on both production and development. We plan a deployment on an 80 nodes cluster.
- We constantly write data to Apache HBase and run MapReduce jobs to process then store it back to Apache HBase or external systems.
- Our production cluster has been running since Oct 2008.
-
- A 15-node cluster dedicated to processing sorts of business data dumped out of database and joining them together. These data will then be fed into iSearch, our vertical search engine.
- Each node has 8 cores, 16G RAM and 1.4T storage.
-
Facebook
- We use Apache Hadoop to store copies of internal log and dimension data sources and use it as a source for reporting/analytics and machine learning.
-
Currently we have 2 major clusters:
- A 1100-machine cluster with 8800 cores and about 12 PB raw storage.
- A 300-machine cluster with 2400 cores and about 3 PB raw storage.
- Each (commodity) node has 8 cores and 12 TB of storage.
- We are heavy users of both streaming as well as the Java APIs. We have built a higher level data warehousing framework using these features called Hive (see the http://hadoop.apache.org/hive/). We have also developed a FUSE implementation over HDFS.
參考:https://cwiki.apache.org/confluence/display/hadoop2/PoweredBy
2. Hadoop安裝
2.1 安裝前的準(zhǔn)備
2.1.1 支持的平臺
-
GNU/Linux是產(chǎn)品開發(fā)和運(yùn)行的平臺。 Hadoop已在有2000個節(jié)點(diǎn)的GNU/Linux主機(jī)組成的集群系統(tǒng)上得到驗(yàn)證。
本文將以Ubuntu 18.04作為安裝平臺 - Windows平臺同樣受官方支持??蓞⒖脊俜轿臋n或者自行百度谷歌https://cwiki.apache.org/confluence/display/HADOOP2/Hadoop2OnWindows
2.1.2 需要提前安裝的軟件
Java
官方現(xiàn)在推薦OpenJDK,其他版本(比如Oracle)也可以
Hadoop 3.3.x支持Java 8 和 Java 11,推薦Java 8
Hadoop 3.0.x 到 3.2.x n只支持 Java 8
Hadoop from 2.7.x 到 2.10.x 同時支持 Java 7 and 8
本文安裝Hadoop 3.2.2, OpenJDK版本為系統(tǒng)自帶的1.8.0
ssh
必須安裝并且保證 sshd一直運(yùn)行,以便用Hadoop 腳本管理遠(yuǎn)端Hadoop守護(hù)進(jìn)程
安裝ssh
sudo apt-get install ssh
2.1.3 下載Hadoop
考慮到網(wǎng)速以及以后適配Spark的版本問題,我們選擇比較新的Hadoop 3.2.2穩(wěn)定發(fā)行版。香港可以直接從官網(wǎng)下載https://hadoop.apache.org/releases.html
內(nèi)地可以使用鏡像網(wǎng)站,比如清華鏡像
https://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-3.2.2/

2.2 安裝過程(單機(jī)版)
參考
http://www.itdecent.cn/p/cdae5bab030f
https://blog.csdn.net/weixin_42001089/article/details/81865101
https://blog.csdn.net/qq_44830040/article/details/104873759
https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/SingleCluster.html
集群配置
https://blog.csdn.net/qq_39785575/article/details/106300628
2.3 環(huán)境變量設(shè)置

2.4 Hadoop 啟動
格式化namenode
hadoop namenode -format

啟動hdfs
start-all.sh

查看相應(yīng)進(jìn)程
jps
瀏覽器訪問
localhost:9870

3. Hadoop 應(yīng)用
3.1 上傳文件到HDFS
3.1.1 Linux 命令行
(1)格式畫NameNode
hdfs namenode -format
(2) 啟動hdfs
start-all.sh
(3)查看hdfs狀態(tài)
hdfs dfsadmin -report
(4)上傳文件到hdfs
hdfs dfs -put 要上傳的文件路徑 目標(biāo)路徑
#舉例
hdfs dfs -put test.txt /
(5)查看hdfs路徑下的文件
hdfs dfs -ls /
(6)讀取hdfs文件內(nèi)容
hdfs dfs -cat /test.txt
(7)刪除hdfs文件
hdfs dfs -rm /test.txt
3.1.2 Java
略
3.2 MapReduce
3.2.1 MapReduce是什么
(1) MapReduce是一種分布式計(jì)算框架 ,以一種可靠的,具有容錯能力的方式并行地處理上TB級別的海量數(shù)據(jù)集。主要用于搜索領(lǐng)域,解決海量數(shù)據(jù)的計(jì)算問題。
(2) MapReduce有兩個階段組成:Map和Reduce,用戶只需實(shí)現(xiàn)map()和reduce()兩個函數(shù),即可實(shí)現(xiàn)分布式計(jì)算。
3.2.2 MapReduce做什么
(1) MapReduce框架由Map和Reduce組成。
(2) Map()負(fù)責(zé)把一個大的block塊進(jìn)行切片并計(jì)算。
(3) Reduce() 負(fù)責(zé)把Map()切片的數(shù)據(jù)進(jìn)行匯總、計(jì)算。
3.2.3 MapReduce怎么做

(1) 第一步對輸入的數(shù)據(jù)進(jìn)行切片,每個切片分配一個map()任務(wù),map()對其中的數(shù)據(jù)進(jìn)行計(jì)算,對每個數(shù)據(jù)用鍵值對的形式記錄,然后輸出到環(huán)形緩沖區(qū)(圖中sort的位置)。
(2) map()中輸出的數(shù)據(jù)在環(huán)形緩沖區(qū)內(nèi)進(jìn)行快排,每個環(huán)形緩沖區(qū)默認(rèn)大小100M,當(dāng)數(shù)據(jù)達(dá)到80M時(默認(rèn)),把數(shù)據(jù)輸出到磁盤上。形成很多個內(nèi)部有序整體無序的小文件。
(3) 框架把磁盤中的小文件傳到Reduce()中來,然后進(jìn)行歸并排序,最終輸出。
3.2.4 要點(diǎn)是什么
MapReduce是六大過程,簡單來說:Input, Split, Map, Shuffle, Reduce, Finalize
(1) MapReduce將輸入的數(shù)據(jù)進(jìn)行邏輯切片,一片對應(yīng)一個Map任務(wù)
(2) Map以并行的方式處理切片
(3) 框架對Map輸出進(jìn)行排序,然后發(fā)給Reduce
(4) MapReduce的輸入輸出數(shù)據(jù)處于同一個文件系統(tǒng)(HDFS)
(5) 框架負(fù)責(zé)任務(wù)調(diào)度、任務(wù)監(jiān)控、失敗任務(wù)的重新執(zhí)行
(6) 框架會對鍵和值進(jìn)行序列化,因此鍵和值需要實(shí)現(xiàn)writable接口,框架會對鍵排序,因此必須實(shí)現(xiàn)writableComparable接口。
3.1.5 舉個例子
如何統(tǒng)計(jì)1TB或1PB文件里的單詞數(shù)呢

(1) Input: 我們輸入很多文檔,文檔的每一行有很多不同的單詞
(2) Split: 找不同的worker分配不同的任務(wù),就是Split過程,那怎么切分呢?每一行文檔分給一個worker
(3) Map: 就切分成單詞和它出現(xiàn)的次數(shù),寫成鍵值對,每次出現(xiàn)的次數(shù)是1,就寫1。
(4) Shuffle就是把不同的單詞繼續(xù)放到同樣的盒子里面,Bear放一起,Car放一起,這可以由Shuffle寫的時候算法來決定。當(dāng)然現(xiàn)在很多智能都不用做了,有時候還需要隨機(jī)采樣的方式來實(shí)現(xiàn)。
(5) Reduce,就是把相同的數(shù)據(jù)放一起,比如Car有3個就寫3,River是2個就寫2,最后再放到一起,這樣便于提供服務(wù),得到最終結(jié)果。
大家可以看到最后箭頭指過來是亂序的,也就是說這個執(zhí)行過程實(shí)際上是高度并行的,也不用等待每個都完成,所以說這是一個很好的優(yōu)化過程。
Hadoop Streaming是一種運(yùn)行作業(yè)的實(shí)用工具,它允許用戶創(chuàng)建和運(yùn)行任何可執(zhí)行程序 (例如:Java, Python, Shell工具)來做為mapper和reducer。
代碼形式:

Map輸入每一行的ID是key,value是這一行的單詞。有這個結(jié)果以后就可以統(tǒng)計(jì)每個單詞出現(xiàn)的次數(shù)。Reduce輸入的還是各個單詞,但后面跟的是一個串,是在里面出現(xiàn)的次數(shù),1,1,1,我們把1加到一起就是sum的過程,這就是MapReduce的整個過程。輸出我們的關(guān)鍵詞和它的出現(xiàn)次數(shù)
3.1.6 再舉個例子
參考:
https://zhuanlan.zhihu.com/p/55884610
https://www.cnblogs.com/duanzi6/p/11348960.html
將天氣數(shù)據(jù)每年整合成一個文件,存入hdfs
hdfs dfs -mkdir /ncdc
hdfs dfs -put ~/ncdc/raw/ /ncdc
hdfs 查看命令
hdfs dfs -ls -R
以NCDC原始數(shù)據(jù)求每個年份最高氣溫為例:
首先原始數(shù)據(jù)如下(去除了一些列用省略號表示),這些數(shù)據(jù)就是map函數(shù)的輸入:
0067011990999991950051507004...9999999N9+00001+99999999999999...
0043011990999991950051512004...9999999N9+00221+99999999999999...
0043011990999991950051518004...9999999N9-00111+99999999999999...
0043012650999991949032412004...0500001N9+01111+99999999999999...
0043012650999991949032418004...0500001N9+00781+99999999999999...
這些行以鍵值對的方式作為map函數(shù)的輸入:
(0,0067011990999991950051507004...9999999N9+00001+99999999999999...)
(106,0043011990999991950051512004...9999999N9+00221+99999999999999...)
(212,0043011990999991950051518004...9999999N9-00111+99999999999999...)
(318,0043012650999991949032412004...0500001N9+01111+99999999999999...)
(424,0043012650999991949032418004...0500001N9+00781+99999999999999...)
其中key是文件中的行偏移量,map函數(shù)會提取輸入中的年份和氣溫信息(粗體),并作為輸出:
(1950,0)
(1950,22)
(1950,-11)
(1949,111)
(1949,78)
map函數(shù)的輸出會經(jīng)過MapReduce計(jì)算框架基于鍵的排序和分組后發(fā)送到reduce函數(shù)作為輸入:
(1949,[111,78])
(1950,[0,22,-11])
reduce遍歷值列表找到最大溫度后返回:
(1949,111)
(1950,22)
整個MapReduce計(jì)算流程如下:

從本地模式到分布式集群計(jì)算
處理少量輸入數(shù)據(jù)并不能體現(xiàn)MapReduce計(jì)算框架的優(yōu)勢,當(dāng)有大量輸入的數(shù)據(jù)流時,我們需要分布式文件系統(tǒng)(HDFS)和Hadoop資源管理系統(tǒng)(YARN)實(shí)現(xiàn)集群分布式計(jì)算。
一、術(shù)語
先了解一下關(guān)于數(shù)據(jù)流的一些術(shù)語
Job:MapReduce作業(yè),是客戶端需要執(zhí)行的一個工作單元:包括輸入數(shù)據(jù)、MapReduce程序和配置信息
Task:Hadoop會將作業(yè)job分成若干個任務(wù)(task)執(zhí)行,其中包括兩類任務(wù):map任務(wù)和Reduce任務(wù)
Input split:輸入分片,Hadoop會將MapReduce的輸入數(shù)據(jù)劃分成等長的小數(shù)據(jù)塊,稱為“分片”,Hadoop為每個分片構(gòu)建一個map任務(wù),并由該任務(wù)運(yùn)行用戶自定義的map函數(shù)從而處理分片中的每條記錄。
二、分片
1、分片的意義
處理單個分片的時間小于處理整個輸入數(shù)據(jù)花費(fèi)的時間,因此并行處理每個分片且每個分片數(shù)據(jù)比較小的話,則整個處理過程會獲得更好的負(fù)載平衡(因?yàn)橐慌_較快的計(jì)算機(jī)能夠處理的數(shù)據(jù)分片比一臺較慢的計(jì)算機(jī)更多,且成一定比例)。
2、分片的大小
盡管隨著分片切分得更細(xì),負(fù)載平衡的質(zhì)量也會更高。但是分片切分得太小的時候,管理分片的總時間和構(gòu)建map任務(wù)的總時間將決定整個作業(yè)的執(zhí)行時間。
對于大多數(shù)作業(yè)來說,一個合理的分片大小趨于HDFS一個塊的大小,默認(rèn)是128MB。
三、數(shù)據(jù)本地化優(yōu)化(map任務(wù))
Hadoop在存儲有輸入數(shù)據(jù)(HDFS中的數(shù)據(jù))的節(jié)點(diǎn)上運(yùn)行map任務(wù),可以獲得最佳性能(因?yàn)闊o需使用寶貴的集群帶寬資源),這就是“數(shù)據(jù)本地化優(yōu)化”(data locality optimization)。
1、本地?cái)?shù)據(jù)、本地機(jī)架與跨機(jī)架map任務(wù)
有時候存儲該分片的HDFS數(shù)據(jù)塊復(fù)本的所有節(jié)點(diǎn)可能正在運(yùn)行其他map任務(wù),此時作業(yè)調(diào)度需要從某一數(shù)據(jù)塊所在的機(jī)架中一個節(jié)點(diǎn)尋找一個空閑的map槽(slot)來運(yùn)行該map任務(wù)分片。特別偶然的情況下(幾乎不會發(fā)生)會使用其他機(jī)架中的節(jié)點(diǎn)運(yùn)行該map任務(wù),這將導(dǎo)致機(jī)架與機(jī)架之間的網(wǎng)絡(luò)傳輸。下圖顯示了這三種可能性。

2、數(shù)據(jù)本地化原則決定了最佳分片大小
數(shù)據(jù)本地化的原則解釋了為什么最佳分片大小應(yīng)該與HDFS塊大小相同:因?yàn)檫@是確??梢源鎯υ趩蝹€節(jié)點(diǎn)上最大輸入塊的大小。
3、reduce任務(wù)不具備數(shù)據(jù)本地化的優(yōu)勢
單個reduce任務(wù)的輸入通常來自于所有mapper的輸出。排過序的map輸出需通過網(wǎng)絡(luò)傳輸發(fā)送到運(yùn)行reduce任務(wù)的節(jié)點(diǎn),數(shù)據(jù)在reduce端合并并由用戶定義的reduce函數(shù)處理。
四、MapReduce任務(wù)數(shù)據(jù)流
reduce任務(wù)的數(shù)量并非由輸入數(shù)據(jù)的大小決定,而是獨(dú)立指定的。
真實(shí)的應(yīng)用中,幾乎所有作業(yè)都會把reducer的個數(shù)設(shè)置成較大的數(shù)字,否則由于所有中間數(shù)據(jù)都會放到一個reduce任務(wù)中,作業(yè)的處理效率就會及其低下。
增加reducer的數(shù)量能縮短reduce進(jìn)程;但是reducer數(shù)量過多又會導(dǎo)致小文件過多而不夠優(yōu)化。一條經(jīng)驗(yàn)法則是:目標(biāo)reducer保持每個運(yùn)行在5分鐘左右,且產(chǎn)生至少一個HDFS塊的輸出比較合適。
1、單個reduce任務(wù)的MapReduce數(shù)據(jù)流

虛線框表示節(jié)點(diǎn),虛線箭頭表示節(jié)點(diǎn)內(nèi)部的數(shù)據(jù)傳輸,實(shí)線箭頭表示不同節(jié)點(diǎn)之間的數(shù)據(jù)傳輸。
2、多個reduce任務(wù)的MapReduce數(shù)據(jù)流

map任務(wù)到reduce任務(wù)的數(shù)據(jù)流稱為shuffle(混洗,類似洗牌的意思),每個reduce任務(wù)的輸入都來自許多map任務(wù)。shuffle比圖示的更加復(fù)雜而且調(diào)整shuffle參數(shù)對作業(yè)總執(zhí)行時間的影響非常大。
3、無reduce任務(wù)
當(dāng)數(shù)據(jù)完全可以并行處理時可能會出現(xiàn)無reduce任務(wù)的情況,唯一的非本地節(jié)點(diǎn)數(shù)據(jù)傳輸是map任務(wù)將結(jié)果寫入HDFS。

五、combiner函數(shù)(減少map和reduce之間的數(shù)據(jù)傳輸)
由前面的描述我們知道數(shù)據(jù)傳輸會占用集群上的可用帶寬資源,從而限制了MapReduce作業(yè)的數(shù)量,因此我們應(yīng)該盡量避免map和reduce任務(wù)之間的數(shù)據(jù)傳輸。combiner作為一個中間函數(shù)簡化map任務(wù)的輸出從而減少了map任務(wù)和reduce任務(wù)之間的數(shù)據(jù)傳輸。
舉個例子:
假設(shè)我們有一個計(jì)算每年最高氣溫的任務(wù),1950年的讀數(shù)由兩個map任務(wù)處理(因?yàn)樗鼈冊诓煌姆制校?。假設(shè)第一個map的輸出如下:
(1950,0)
(1950,20)
(1950,10)
第二個map的輸出如下:
(1950,25)
(1950,15)
reduce函數(shù)調(diào)用時,輸入如下:
(1950,[0,20,10,25,15)
reduce的輸出如下:
(1950,25)
為了優(yōu)化reduce函數(shù)的輸入,我們可以使用combiner找到每個map任務(wù)輸出結(jié)果中的最高氣溫。那么可以將reduce函數(shù)調(diào)用的輸入更改為:
(1950,[20,25])
其中20和25為每個map函數(shù)輸出結(jié)果中的最大值
我們可以用下面表達(dá)式來說明尋找最高氣溫的MapReduce過程中combiner的作用:
max(0,20,10,25,15) = max(max(0,20,10),max(25,15)) = max(20,25)=25
很遺憾,并非所有函數(shù)都具有該屬性,比如計(jì)算平均值時就不能用mean作為combiner函數(shù),因?yàn)椋?/p>
mean(0,20,10,25,15) = 14
mean(mean(0,20,10),mean(25,15)) = mean(10,20) = 15
另外,combiner函數(shù)并不能取代reduce函數(shù)的位置,因?yàn)槲覀內(nèi)匀恍枰猺educe函數(shù)來處理不同map輸出中具有相同鍵的記錄。
寫MapReduce程序(Hadoop streaming)
Hadoop Streaming使用Unix標(biāo)準(zhǔn)流作為Hadoop和應(yīng)用程序之間的接口,允許程序員用多種語言寫MapReduce程序。
Streaming天生適合于文本處理。map的輸入數(shù)據(jù)通過標(biāo)準(zhǔn)輸入傳遞給map函數(shù),并且是一行一行地傳輸,并且將結(jié)果行寫到標(biāo)準(zhǔn)輸出。map輸出的鍵-值對是以一個制表符分隔的行,reduce輸入格式與之相同。reduce函數(shù)從標(biāo)準(zhǔn)輸入流中讀取輸入行,該輸入已經(jīng)由Hadoop框架根據(jù)鍵排過序,最后將結(jié)果寫入標(biāo)準(zhǔn)輸出。
接下里我們用Streaming重寫按年份查找最高氣溫的MapReduce程序。
Map函數(shù)
import re
import sys
for line in sys.stdin:
val = line.strip()
(year,temp,q) = (val[15:19],val[87:92],val[92:93])
if (temp != "+9999" and re.match("[01459]",q)
print ("%s\t%s" % (year,temp))
Reduce函數(shù)
import sys
(last_key,max_val) = (None,-sys.maxint)
for line in sys.stdin:
(key,val) = line.strip().split("\t")
if last_key and last_key != key:
print ("%s\t%s" % (last_key,max_val))
(last_key,max_val) = (key,int(val))
else:
(last_key,max_val = (key,max(max_val,int(val)))
if last_key:
print("%s\t%s" % (last_key,max_val))
示例3:
https://zhuanlan.zhihu.com/p/34903460
參考
https://zhuanlan.zhihu.com/p/62135686
https://zhuanlan.zhihu.com/p/32172999
https://zhuanlan.zhihu.com/p/55884610
https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html