目標
該文檔作為一份個人指導全面性得描述了所有用戶使用Hadoop Mapreduce框架時遇到的方方面面。
準備條件
確保Hadoop安裝、配置和運行。更多細節(jié):
? ? ? ?初次使用用戶配置單節(jié)點。
? ? ? ?配置大型、分布式集群。
綜述
Hadoop Mapreduce是一個易于編程并且能在大型集群(上千節(jié)點)快速地并行得處理大量數(shù)據(jù)的軟件框架,以可靠,容錯的方式部署在商用機器上。
MapReduce Job通常將獨立大塊數(shù)據(jù)切片以完全并行的方式在map任務(wù)中處理。該框架對maps輸出的做為reduce輸入的數(shù)據(jù)進行排序,Job的輸入輸出都是存儲在文件系統(tǒng)中。該框架調(diào)度任務(wù)、監(jiān)控任務(wù)和重啟失效的任務(wù)。
一般來說計算節(jié)點和存儲節(jié)點都是同樣的設(shè)置,MapReduce框架和HDFS運行在同組節(jié)點。這樣的設(shè)定使得MapReduce框架能夠以更高的帶寬來執(zhí)行任務(wù),當數(shù)據(jù)已經(jīng)在節(jié)點上時。
MapReduce 框架包含一個主ResourceManager,每個集群節(jié)點都有一個從NodeManager和每個應用都有一個MRAppMaster。
應用最少必須指定輸入和輸出的路徑并且通過實現(xiàn)合適的接口或者抽象類來提供map和reduce功能。前面這部分內(nèi)容和其他Job參數(shù)構(gòu)成了Job的配置。
Hadoop 客戶端提交Job和配置信息給ResourceManger,它將負責把配置信息分配給從屬節(jié)點,調(diào)度任務(wù)并且監(jiān)控它們,把狀態(tài)信息和診斷信息傳輸給客戶端。
盡管 MapReduce 框架是用Java實現(xiàn)的,但是 MapReduce 應用卻不一定要用Java編寫。
Hadoop Streaming 是一個工具允許用戶創(chuàng)建和運行任何可執(zhí)行文件。
Hadoop Pipes 是兼容SWIG用來實現(xiàn) MapReduce 應用的C++ API(不是基于JNI).
輸入和輸出
MapReduce 框架只操作鍵值對,MapReduce 將job的不同類型輸入當做鍵值對來處理并且生成一組鍵值對作為輸出。
Key和Value類必須通過實現(xiàn)Writable接口來實現(xiàn)序列化。此外,Key類必須實現(xiàn)WritableComparable 來使得排序更簡單。
MapRedeuce job 的輸入輸出類型:
(input) ->map-> ?->combine-> ?->reduce-> (output)
MapReduce - 用戶接口
這部分將展示 MapReduce 中面向用戶方面的盡可能多的細節(jié)。這將會幫助用戶更小粒度地實現(xiàn)、配置和調(diào)試它們的Job。然而,請在 Javadoc 中查看每個類和接口的綜合用法,這里僅僅是作為一份指導。
讓我們首先來看看Mapper和Reducer接口。應用通常只實現(xiàn)它們提供的map和reduce方法。
我們將會討論其他接口包括Job、Partitioner、InputFormat和其他的。
最后,我們會討論一些有用的特性像分布式緩存、隔離運行等。
有效負載
應用通常實現(xiàn)Mapper和Reducer接口提供map和reduce方法。這是Job的核心代碼。
Mapper
Mappers將輸入的鍵值對轉(zhuǎn)換成中間鍵值對。
Maps是多個單獨執(zhí)行的任務(wù)將輸入轉(zhuǎn)換成中間記錄。那些被轉(zhuǎn)換的中間記錄不一定要和輸入的記錄為相同類型。輸入鍵值對可以在map后輸出0或者更多的鍵值對。
MapReduce 會根據(jù) InputFormat 切分成的各個 InputSplit 都創(chuàng)建一個map任務(wù)
總的來說,通過 job.setMapperClass(Class)來給Job設(shè)置Mapper實現(xiàn)類,并且將InputSplit輸入到map方法進行處理。應用可復寫cleanup方法來執(zhí)行任何需要回收清除的操作。
輸出鍵值對不一定要和輸入鍵值對為相同的類型。一個鍵值對輸入可以輸出0至多個不等的鍵值對。輸出鍵值對將通過context.write(WritableComparable,Writable)方法進行緩存。
應用可以通過Counter進行統(tǒng)計。
所有的中間值都會按照Key進行排序,然后傳輸給一個特定的Reducer做最后確定的輸出。用戶可以通過Job.setGroupingComparatorClass(Class)來控制分組規(guī)則。
Mapper輸出會被排序并且分區(qū)到每一個Reducer。分區(qū)數(shù)和Reduce的數(shù)目是一致的。用戶可以通過實現(xiàn)一個自定義的Partitioner來控制哪個key對應哪個Reducer。
用戶可以隨意指定一個combiner,Job.setCombinerClass(Class),來執(zhí)行局部輸出數(shù)據(jù)的整合,將有效地降低Mapper和Reducer之間的數(shù)據(jù)傳輸量。
那些經(jīng)過排序的中間記錄通常會以(key-len, key, value-len, value)的簡單格式儲存。應用可以通過配置來決定是否需要和怎樣壓縮數(shù)據(jù)和選擇壓縮方式。
How Many Maps?
maps的數(shù)據(jù)通常依賴于輸入數(shù)據(jù)的總長度,也就是,輸入文檔的總block數(shù)。
每個節(jié)點map的正常并行度應該在10-100之間,盡管每個cpu已經(jīng)設(shè)置的上限值為300。任務(wù)的配置會花費一些時間,最少需要花費一分鐘來啟動運行。
因此,如果你有10TB的數(shù)據(jù)輸入和定義blocksize為128M,那么你將需要82000 maps,除非通過Configuration.set(MRJobConfig.NUM_MAPS, int)(設(shè)置一個默認值通知框架)來設(shè)置更高的值。
下面是原文
Purpose
This document comprehensively describes all user-facing facets of the Hadoop MapReduce framework and serves as a tutorial.
Prerequisites
Ensure that Hadoop is installed, configured and is running. More details:
? ??Single Node Setupfor first-time users.
? ??Cluster Setupfor large, distributed clusters.
Overview
Hadoop MapReduce is a software framework for easily writing applications which process vast amounts of data (multi-terabyte data-sets) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
A MapReducejobusually splits the input data-set into independent chunks which are processed by themap tasksin a completely parallel manner.The framework sorts the outputs of the maps, which are then input to thereducetasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.
Typically the compute nodes and the storage nodes are the same, that is, the MapReduce framework and the HadoopDistributed File System (seeHDFSArchitecture Guide) are running on the same set of nodes. This configuration allows the framework to effectively schedule tasks on the nodes where data is already present, resulting in very highaggregate bandwidthacross the cluster.
The MapReduce framework consists of a single master ResourceManager, one slave NodeManager per cluster-node, and MRAppMaster per application (seeYARNArchitecture Guide).
Minimally, applications specify the input/output locations and supplymapandreducefunctions via implementations of appropriate interfaces and/or abstract-classes. These, and other job parameters,comprise thejobconfiguration.
The Hadoopjob clientthen submits the job(jar/executable etc.) and configuration to the ResourceManager which then assumes the responsibility of distributing the software/configuration to the slaves, scheduling tasks and monitoring them, providing status and diagnostic information to the job-client.
Although the Hadoop framework is implemented in Java, MapReduce applications need not be written in Java.
Hadoop Streamingis ? ? a utility which allows users to create and run jobs with any executables ? ? (e.g. shell utilities) as the mapper and/or the reducer.
Hadoop Pipesis aSWIG-compatible ?C++ API to implement MapReduce applications (non JNI based).
Inputs and Outputs
The MapReduce framework operates exclusively on value pairs, that is, the framework views the input to the job as a set of pairs and produces a set of pairs as the output of the job,conceivably of different types.
The key and value classes have to be serializable by the framework and hence need to implement theWritableinterface. Additionally, the key classes have to implement theWritableComparableinterface to facilitate sorting by the framework.
Input and Output types of a MapReduce job:
(input) v1> ->map-> ?->combine-> ?->reduce-> (output)
MapReduce - User Interfaces
This section provides a reasonable amount of detail on every user-facing aspect of the MapReduce framework. This should help users implement, configure and tune their jobs in a fine-grained manner. However, please note that the java doc for each class/interface remains the most comprehensive documentation available; this is only meant to be a tutorial.
Let us first take the Mapper and Reducer interfaces. Applications typically implement them to provide the map and reduce methods.
We will then discuss other core interfaces including Job,Partitioner,InputFormat,OutputFormat, and others.
Finally, we will wrap up by discussing some useful features of the framework such as
the Distributed Cache,Isolation Runner etc.
Payload
Applications typically implement the Mapper and Reducer interfaces to provide the map and reduce methods. These form the core of the job.
Mapper
Mappermaps input key/value pairs to a set of intermediate key/value pairs.
Maps are the individual tasks that transform input records into intermediate records.The transformed intermediate records do not need to be of the same type as the input records. A given input pair may map to zero or many output pairs.
The Hadoop MapReduce framework spawns one map task for each InputSplit generated by the InputFormat for the job.
Overall,Mapper implementations are passed the Job for the job via theJob.setMapperClass(Class)method.
The framework then callsmap(WritableComparable,Writable, Context)for each key/value pair in
the InputSplit for that task. Applications can then override the cleanup(Context)method to perform any required cleanup.
Output pairs do not need to be of the same types as input pairs. A given input pair may map to zero or many output pairs. Output pairs are collected with calls to context.write(WritableComparable, Writable).
Applications can use the Counter to report its statistics.
All intermediate values associated with a given output key are subsequently(隨后)grouped by the framework, and passed to the Reducer(s) to determine the final output. Users can control the grouping by specifying a Comparator viaJob.setGroupingComparatorClass(Class).
The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job. Users can control which keys (and hence records) go to which Reducer by implementing a custom Partitioner.
Users can optionally specify a combiner, viaJob.setCombinerClass(Class), to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the Mapper to the Reducer.
The intermediate, sorted outputs are always stored in a simple (key-len, key, value-len,value) format. Applications can control if, and how, the intermediate outputs are to be compressed and theCompression Codecto be used via the Configuration.
How Many Maps?
The number of maps is usually driven by the total size of the inputs, that is, the total number of blocks of the input files.
The right level of parallelism for maps seems to be around 10-100 maps per-node, although it has been set up to 300 maps for very cpu-light map tasks. Task setup takes a while, so it is best if the maps take at least a minute to execute.
Thus,if you expect 10TB of input data and have a blocksize of 128 MB, you'll end up with 82,000 maps,unlessConfiguration.set(MRJobConfig.NUM_MAPS, int)(which only provides a hint to the framework) is used to set it even higher.
由于翻譯能力不足所出現(xiàn)的錯誤,請多多指出和包涵