Hadoop 之 MapReduce 作業(yè)初體驗

簡單的 MapReduce 作業(yè),需要一個 map 函數(shù),一個 reduce 函數(shù)和一些用來運行作業(yè)的代碼

// Mapper
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MaxTemperatureMapper
  extends Mapper<LongWritable, Text, Text, IntWritable> {

  private static final int MISSING = 9999;
  
  @Override
  public void map(LongWritable key, Text value, Context context)
      throws IOException, InterruptedException {
    
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      context.write(new Text(year), new IntWritable(airTemperature));
    }
  }
}

// Reducer
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class MaxTemperatureReducer
  extends Reducer<Text, IntWritable, Text, IntWritable> {
  
  @Override
  public void reduce(Text key, Iterable<IntWritable> values,
      Context context)
      throws IOException, InterruptedException {
    
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
      maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
  }
}

橫向擴展(Scaling out)

需要把數(shù)據(jù)存儲在分布式文件系統(tǒng)中,通過使用 Hadoop 資源管理系統(tǒng) YARN,Hadoop 可以將 MapReduce 計算轉(zhuǎn)移到存儲有部分?jǐn)?shù)據(jù)的各機器上

相關(guān)概念

MapReduce 作業(yè)

MapReduce 作業(yè) == 輸入數(shù)據(jù) + MapReduce程序 + 配置信息

任務(wù)分類

Hadoop 將作業(yè)分成若干個任務(wù)(task)來執(zhí)行,其中包括兩類任務(wù):map 任務(wù)和 reduce 任務(wù),這些任務(wù)運行在集群幾點上,并通過 YARN 進行調(diào)度。如果一個任務(wù)失敗,它將在另一個不同的節(jié)點上自動重新調(diào)度運行

分片(input split)

Hadoop 將 MapReduce 的輸入數(shù)據(jù)劃分成等長的小數(shù)據(jù)塊,成為輸入分片(input split)或簡稱“分片”
Hadoop 為每個分片構(gòu)建一個 map 任務(wù),并由該任務(wù)來運行用戶自定義的 map 函數(shù)從而處理分片中的每條記錄

分片切分的粒度

相對來說,分片被切分的越細,作業(yè)的負載平衡質(zhì)量會更高。但是如果分片切分的太細,那么管理分片的總時間和構(gòu)建 map 任務(wù)的總時間將決定作業(yè)的整個執(zhí)行時間

對于大多數(shù)作業(yè)來說,一個合理的分片大小趨向于 HDFS 的一個塊的大小(128MB)

數(shù)據(jù)本地化優(yōu)化(data locality optimization)

Hadoop 在存儲輸入數(shù)據(jù)的節(jié)點上運行 map 任務(wù),可以獲得最佳性能,而無需使用寶貴的集群帶寬資源

跨機架的 map 任務(wù)

有時對于一個 map 任務(wù)的輸入分片來說,存儲該分片的 HDFS 數(shù)據(jù)塊副本的所有節(jié)點可能正在運行其他的 map 任務(wù),此時作業(yè)調(diào)度需要從某一個數(shù)據(jù)塊所在的機架中的一個節(jié)點上尋找一個空閑的 map 槽(slot)來運行該 map 任務(wù),這將導(dǎo)致機架與機架之間的網(wǎng)絡(luò)傳輸

為何最佳分片的大小應(yīng)該與塊大小相同?

如果分片跨越兩個數(shù)據(jù)塊,那么對于任何一個 HDFS 節(jié)點,基本上都不可能同時存儲這兩個數(shù)據(jù)塊,因此分片中的部分?jǐn)?shù)據(jù)需要通過網(wǎng)絡(luò)傳輸?shù)?map 任務(wù)運行的節(jié)點。這與使用本地數(shù)據(jù)運行整個 map 任務(wù)相比,顯然效率更低

reduce 任務(wù)并不具備數(shù)據(jù)本地化的優(yōu)勢,單個 reduce 任務(wù)的輸入通常來自于所有的 mapper 的輸出;多個 reduce 任務(wù),每個 map 任務(wù)針對輸出進行分區(qū)

reduce 的輸出通常存儲在 HDFS 中以實現(xiàn)可靠存儲。第一個副本存儲在本地節(jié)點上,其他的副本處于可靠性考慮存儲在其他機架的節(jié)點上

reduce 任務(wù)的數(shù)量并非由輸入數(shù)據(jù)的大小決定,反而是獨立指定的

combiner 函數(shù)

combiner 函數(shù)能夠幫助減少 mapper 和 reducer 之間的數(shù)據(jù)傳輸量

// 通過如下方式調(diào)用來啟用 combiner 函數(shù)
job.setComiberClass(XXXReducer.class)

Hadoop Streaming

Hadoop Streaming 使用 Unix 標(biāo)準(zhǔn)流作為 Hadoop 和應(yīng)用程序之間的接口,所以可以使用任何編程語言通過標(biāo)準(zhǔn)輸入/輸出來寫 MapReduce 程序

Streaming 天生適合用于文本處理。map 的輸入數(shù)據(jù)通過標(biāo)準(zhǔn)輸入流傳遞給 map 函數(shù),并且是一行一行地傳輸,最后將結(jié)果行寫到標(biāo)準(zhǔn)輸出。map 輸出的鍵-值對以一個制表符分隔的行,reduce 函數(shù)的輸入格式與之相同并通過標(biāo)準(zhǔn)輸入流進行傳輸。reduce 函數(shù)從標(biāo)準(zhǔn)輸入流中讀取輸入行,該輸入已由 Hadoop 框架根據(jù)鍵排過序,最后將結(jié)果寫入標(biāo)準(zhǔn)輸出

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 先思考問題 我們處在一個大數(shù)據(jù)的時代已經(jīng)是不爭的事實,這主要表現(xiàn)在數(shù)據(jù)源多且大,如互聯(lián)網(wǎng)數(shù)據(jù),人們也認識到數(shù)據(jù)里往...
    墻角兒的花閱讀 7,674評論 0 9
  • 參考:hadoop 學(xué)習(xí)筆記:mapreduce框架詳解 [toc] 總結(jié) Mapreduce是一個計算框架,既然...
    小小少年Boy閱讀 899評論 0 12
  • 參考:hadoop 學(xué)習(xí)筆記:mapreduce框架詳解 [toc] 總結(jié) Mapreduce是一個計算框架,既然...
    小小少年Boy閱讀 1,220評論 0 4
  • 思考問題 MapReduce總結(jié) MapReduce MapReduce的定義MapReduce是一種編程模型, ...
    Sakura_P閱讀 1,022評論 0 1
  • 今天晚上班級舉行了一次篝火晚會有幾個人借機喝酒然后裝醉或許待在喝醉的皮囊里能讓自己舒服些酒壯慫人膽再次恭喜這些影帝...
    3流浪閱讀 285評論 0 0

友情鏈接更多精彩內(nèi)容