學(xué)習(xí)筆記—MapReduce

MapReduce是什么

MapReduce是一種分布式計算編程框架,是Hadoop主要組成部分之一,可以讓用戶專注于編寫核心邏輯代碼,最后以高可靠、高容錯的方式在大型集群上并行處理大量數(shù)據(jù)。

MapReduce的存儲

MapReduce的數(shù)據(jù)是存儲在HDFS上的,HDFS也是Hadoop的主要組成部分之一。下邊是MapReduce在HDFS上的存儲的圖解

HDFS Architecture

HDFS主要有Namenode和Datanode兩部分組成,整個集群有一個Namenode和多個DataNode,通常每一個節(jié)點一個DataNode,Namenode的主要功能是用來管理客戶端client對數(shù)據(jù)文件的操作請求和儲存數(shù)據(jù)文件的地址。DataNode主要是用來儲存和管理本節(jié)點的數(shù)據(jù)文件。節(jié)點內(nèi)部數(shù)據(jù)文件被分為一個或多個block塊(block默認(rèn)大小原來是64MB,后來變?yōu)?28MB),然后這些塊儲存在一組DataNode中。(這里不對HDFS做過多的介紹,后續(xù)會寫一篇詳細(xì)的HDFS筆記)

MapReduce的運行流程

屏幕快照 2018-12-05 下午10.43.38
屏幕快照 2018-12-05 下午10.56.38

1、首先把需要處理的數(shù)據(jù)文件上傳到HDFS上,然后這些數(shù)據(jù)會被分為好多個小的分片,然后每個分片對應(yīng)一個map任務(wù),推薦情況下分片的大小等于block塊的大小。然后map的計算結(jié)果會暫存到一個內(nèi)存緩沖區(qū)內(nèi),該緩沖區(qū)默認(rèn)為100M,等緩存的數(shù)據(jù)達(dá)到一個閾值的時候,默認(rèn)情況下是80%,然后會在磁盤創(chuàng)建一個文件,開始向文件里邊寫入數(shù)據(jù)。

2、map任務(wù)的輸入數(shù)據(jù)的格式是<key,value>對的形式,我們也可以自定義自己的<key,value>類型。然后map在往內(nèi)存緩沖區(qū)里寫入數(shù)據(jù)的時候會根據(jù)key進(jìn)行排序,同樣溢寫到磁盤的文件里的數(shù)據(jù)也是排好序的,最后map任務(wù)結(jié)束的時候可能會產(chǎn)生多個數(shù)據(jù)文件,然后把這些數(shù)據(jù)文件再根據(jù)歸并排序合并成一個大的文件。

3、然后每個分片都會經(jīng)過map任務(wù)后產(chǎn)生一個排好序的文件,同樣文件的格式也是<key,value>對的形式,然后通過對key進(jìn)行hash的方式把數(shù)據(jù)分配到不同的reduce里邊去,這樣對每個分片的數(shù)據(jù)進(jìn)行hash,再把每個分片分配過來的數(shù)據(jù)進(jìn)行合并,合并過程中也是不斷進(jìn)行排序的。最后數(shù)據(jù)經(jīng)過reduce任務(wù)的處理就產(chǎn)生了最后的輸出。

4、在我們開發(fā)中只需要對中間map和reduce的邏輯進(jìn)行開發(fā)就可以了,中間分片,排序,合并,分配都有MapReduce框架幫我完成了。

MapReduce的資源調(diào)度系統(tǒng)

最后我們來看一下MapReduce的資源調(diào)度系統(tǒng)Yarn。

MapReduce NextGen??????

Yarn的基本思想是將資源管理和作業(yè)調(diào)度/監(jiān)視的功能分解為單獨的守護(hù)進(jìn)程。全局唯一的ResourceManager是負(fù)責(zé)所有應(yīng)用程序之間的資源的調(diào)度和分配,每個程序有一個ApplicationMaster,ApplicationMaster實際上是一個特定于框架的庫,其任務(wù)是協(xié)調(diào)來自ResourceManager的資源,并與NodeManager一起執(zhí)行和監(jiān)視任務(wù)。NodeManager是每臺機器框架代理,監(jiān)視其資源使用情況(CPU,內(nèi)存,磁盤,網(wǎng)絡(luò))并將其報告給ResourceManager。

WordConut代碼

  • python實現(xiàn)

map.py

#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys

for line in sys.stdin:
    words = line.strip().split()
    for word in words:
        print('%s\t%s' % (word, 1))

reduce.py

#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys

current_word = None
sum = 0

for line in sys.stdin:
    word, count = line.strip().split(' ')

    if current_word == None:
        current_word = word

    if word != current_word:
        print('%s\t%s' % (current_word, sum))
        current_word = word
        sum = 0

    sum += int(count)

print('%s\t%s' % (current_word, sum))

我們先把輸入文件上傳到HDFS上去

hadoop fs -put /input.txt /

? 然后在Linux下運行,為了方便我們把命令寫成了shell文件

HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

INPUT_FILE_PATH="/input.txt"
OUTPUT_FILE_PATH="/output"

$HADOOP_CMD fs -rmr -skipTrush $OUTPUT_FILE_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH \
    -output $OUTPUT_FILE_PATH \
    -mapper "python map.py" \
    -reducer "python reduce.py" \
    -file "./map.py" \
    -file "./reduce.py" 

  • java實現(xiàn)

MyMap.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class MyMap extends Mapper<LongWritable, Text, Text, IntWritable> {

    private IntWritable one = new IntWritable(1);
    private Text text = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] words = line.split(" ");

        for (String word: words){
            text.set(word);
            context.write(text,one);
        }
    }
}

MyReduce.java

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable i:values){
            sum+=i.get();
        }
        result.set(sum);
        context.write(key,result);
    }
}

WordCount.java

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCount {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration, "WordCount");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(MyMap.class);
        job.setReducerClass(MyReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

把工程打成jar包,然后把jar包和輸入文件上傳到HDfs

$ hadoop fs -put /wordcount.jar /
$ hadoop fs -put /input.txt /

執(zhí)行wordcount任務(wù)

$ bin/hadoop jar wordcount.jar WordCount /input.txt /user/joe/wordcount/output
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Zookeeper用于集群主備切換。 YARN讓集群具備更好的擴(kuò)展性。 Spark沒有存儲能力。 Spark的Ma...
    Yobhel閱讀 7,604評論 0 34
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,311評論 2 89
  • 【什么是大數(shù)據(jù)、大數(shù)據(jù)技術(shù)】 大數(shù)據(jù),又稱巨量資料,指的是所涉及的數(shù)據(jù)資料量規(guī)模巨大到無法在合理時間內(nèi)通過傳統(tǒng)的應(yīng)...
    kimibob閱讀 2,901評論 0 51
  • 三個月前發(fā)誓要寫的日志終于在今天戰(zhàn)勝了拖延癥。 2018年3月7日,通過層層隨機分配,終于還是蹭...
    cxy_368a閱讀 556評論 0 0
  • 父母智慧 做了一回啞人挽扶著聾人帶著復(fù)雜的心情走完了一程,但我的心還在路上,想起...
    48班3614閱讀 137評論 0 0

友情鏈接更多精彩內(nèi)容