簡單的 MapReduce 作業(yè),需要一個 map 函數(shù),一個 reduce 函數(shù)和一些用來運行作業(yè)的代碼
// Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
// Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
橫向擴展(Scaling out)
需要把數(shù)據(jù)存儲在分布式文件系統(tǒng)中,通過使用 Hadoop 資源管理系統(tǒng) YARN,Hadoop 可以將 MapReduce 計算轉(zhuǎn)移到存儲有部分?jǐn)?shù)據(jù)的各機器上
相關(guān)概念
MapReduce 作業(yè)
MapReduce 作業(yè) == 輸入數(shù)據(jù) + MapReduce程序 + 配置信息
任務(wù)分類
Hadoop 將作業(yè)分成若干個任務(wù)(task)來執(zhí)行,其中包括兩類任務(wù):map 任務(wù)和 reduce 任務(wù),這些任務(wù)運行在集群幾點上,并通過 YARN 進行調(diào)度。如果一個任務(wù)失敗,它將在另一個不同的節(jié)點上自動重新調(diào)度運行
分片(input split)
Hadoop 將 MapReduce 的輸入數(shù)據(jù)劃分成等長的小數(shù)據(jù)塊,成為輸入分片(input split)或簡稱“分片”
Hadoop 為每個分片構(gòu)建一個 map 任務(wù),并由該任務(wù)來運行用戶自定義的 map 函數(shù)從而處理分片中的每條記錄
分片切分的粒度
相對來說,分片被切分的越細,作業(yè)的負載平衡質(zhì)量會更高。但是如果分片切分的太細,那么管理分片的總時間和構(gòu)建 map 任務(wù)的總時間將決定作業(yè)的整個執(zhí)行時間
對于大多數(shù)作業(yè)來說,一個合理的分片大小趨向于 HDFS 的一個塊的大小(128MB)
數(shù)據(jù)本地化優(yōu)化(data locality optimization)
Hadoop 在存儲輸入數(shù)據(jù)的節(jié)點上運行 map 任務(wù),可以獲得最佳性能,而無需使用寶貴的集群帶寬資源
跨機架的 map 任務(wù)
有時對于一個 map 任務(wù)的輸入分片來說,存儲該分片的 HDFS 數(shù)據(jù)塊副本的所有節(jié)點可能正在運行其他的 map 任務(wù),此時作業(yè)調(diào)度需要從某一個數(shù)據(jù)塊所在的機架中的一個節(jié)點上尋找一個空閑的 map 槽(slot)來運行該 map 任務(wù),這將導(dǎo)致機架與機架之間的網(wǎng)絡(luò)傳輸
為何最佳分片的大小應(yīng)該與塊大小相同?
如果分片跨越兩個數(shù)據(jù)塊,那么對于任何一個 HDFS 節(jié)點,基本上都不可能同時存儲這兩個數(shù)據(jù)塊,因此分片中的部分?jǐn)?shù)據(jù)需要通過網(wǎng)絡(luò)傳輸?shù)?map 任務(wù)運行的節(jié)點。這與使用本地數(shù)據(jù)運行整個 map 任務(wù)相比,顯然效率更低
reduce 任務(wù)并不具備數(shù)據(jù)本地化的優(yōu)勢,單個 reduce 任務(wù)的輸入通常來自于所有的 mapper 的輸出;多個 reduce 任務(wù),每個 map 任務(wù)針對輸出進行分區(qū)
reduce 的輸出通常存儲在 HDFS 中以實現(xiàn)可靠存儲。第一個副本存儲在本地節(jié)點上,其他的副本處于可靠性考慮存儲在其他機架的節(jié)點上
reduce 任務(wù)的數(shù)量并非由輸入數(shù)據(jù)的大小決定,反而是獨立指定的
combiner 函數(shù)
combiner 函數(shù)能夠幫助減少 mapper 和 reducer 之間的數(shù)據(jù)傳輸量
// 通過如下方式調(diào)用來啟用 combiner 函數(shù)
job.setComiberClass(XXXReducer.class)
Hadoop Streaming
Hadoop Streaming 使用 Unix 標(biāo)準(zhǔn)流作為 Hadoop 和應(yīng)用程序之間的接口,所以可以使用任何編程語言通過標(biāo)準(zhǔn)輸入/輸出來寫 MapReduce 程序
Streaming 天生適合用于文本處理。map 的輸入數(shù)據(jù)通過標(biāo)準(zhǔn)輸入流傳遞給 map 函數(shù),并且是一行一行地傳輸,最后將結(jié)果行寫到標(biāo)準(zhǔn)輸出。map 輸出的鍵-值對以一個制表符分隔的行,reduce 函數(shù)的輸入格式與之相同并通過標(biāo)準(zhǔn)輸入流進行傳輸。reduce 函數(shù)從標(biāo)準(zhǔn)輸入流中讀取輸入行,該輸入已由 Hadoop 框架根據(jù)鍵排過序,最后將結(jié)果寫入標(biāo)準(zhǔn)輸出