Hadoop MapReduce 學習筆記

前言

本文是個人之前紀錄的MapReduce學習筆記,主要涉及到MapReduce基本概念、Hadoop 經(jīng)典示例WordCount的使用解析、hdfs與hbase的簡單了解使用?,F(xiàn)在整理了一下分享出來,希望對別人有所幫助。

學習MapReduce一定要理解這種Map、Reduce的編程模型以及Mapper、Reducer數(shù)據(jù)處理的原理,否則只是一味的復制粘貼可能比較難上手。

同時學習大數(shù)據(jù)的知識,一定要將自己對分布式的理解研究透徹。

一、概念理解

  • MapReduce 是一種線性的可伸縮的編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算
  • 在MapReduce里,Map處理的是原始數(shù)據(jù),每條數(shù)據(jù)之間互相沒有關系(這一點一定要注意)。Reduce階段,以key為標識,對同一個key下的value進行統(tǒng)計,類似{key,[value1,value2……]}
  • 可以把MapReduce理解為,把一堆雜亂無章的數(shù)據(jù)按照某種特征歸納起來,然后處理并得到最后的結(jié)果。
    Map面對的是雜亂無章的互不相關的數(shù)據(jù),它解析每個數(shù)據(jù),從中提取出key和value,也就是提取了數(shù)據(jù)的特征。
    經(jīng)過MapReduce的Shuffle階段之后,在Reduce階段看到的都是已經(jīng)歸納好的數(shù)據(jù)了,在此基礎上我們可以做進一步的處理以便得到結(jié)果。
  • 缺點:不適用于實時計算,實時計算一般最低都是要求秒級響應的,MR很難滿足這個要求,實時計算一般采用storm等流式計算系統(tǒng)
  • MapReduce計算流程


    MapReduce計算流程--來源網(wǎng)絡

二、編程模型

  • 每個應用程序稱為一個作業(yè)(Job),每個Job是由一系列的Mapper和Reducer來完成
  • 任務過程分為兩個階段,map和reduce階段,兩個階段都是使用鍵值對(key/value)作為輸入輸出的
  • 每個Mapper處理一個Split,每個split對應一個map線程。Split中的數(shù)據(jù)作為map的輸入,map的輸出一定在map端
  • Map方法:Map(k1,v1) -> list(k2,v2) ,并行應用于每一個輸入的數(shù)據(jù)集,每一次調(diào)用都會產(chǎn)生一個(k2,v2)的隊列 。
  • Reduce方法:Reduce(k2,list(v2)) -> list(k3,v3)。收集map端輸出隊列l(wèi)ist(k2,v2)中有相同key的數(shù)據(jù)對,把它們聚集在一起,輸出時形成目的數(shù)據(jù) list(k3,v3)。
  • 新舊版本API的區(qū)別:
    • 新的api放在:org.apache.hadoop.mapreduce,舊版api放在:org.apache.hadoop.mapred
    • 新API使用虛類,舊版使用的是接口,虛類更加利于擴展

三、運行機制

  1. 輸入分片(input split)

    map計算之前,MapReduce會根據(jù)輸入文件計算輸入分片(input -> spliting),每個input split針對一個map任務。split存儲的并不是數(shù)據(jù),而是一個分片長度和一個記錄數(shù)據(jù)的位置的數(shù)組

  2. map階段

    map階段的操作一般都是在數(shù)據(jù)存儲節(jié)點上操作,所以有時候為了能夠減輕數(shù)據(jù)傳輸?shù)木W(wǎng)絡壓力,可以先combiner階段處理一下數(shù)據(jù),在進行reduce

  3. combiner階段

    此階段是可選的,不是必須經(jīng)過的一個階段,combiner其實也是一種reduce操作,可以說combiner是一種本地化的reduce操作,是map運算的后續(xù)操作,可以減輕網(wǎng)絡傳輸?shù)膲毫?。但是combiner的使用需要注意不要影響到reduce的最終結(jié)果,比如計算平均值的時候如果使用combiner就會影響最終的結(jié)果,但是計算總數(shù)的話則對最終結(jié)果沒影響

  4. shuffle階段

    將map的輸出作為reduce的輸入,這個過程就是shuffle,是MapReduce優(yōu)化的重要階段。

  5. reduce階段

    reducer階段,輸入是shuffle階段的輸出,對每個不同的鍵和該鍵對應的值的數(shù)據(jù)流進行獨立、并行的處理。

四、WordCount--官方提供的example

代碼

package com.smile.test;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

    private static final String INPUT_PATH = "/user/cdh/yjq/input/words.txt";
    //hdfs輸出路徑
    private static final String OUTPUT_PATH = "/user/cdh/yjq/output/";
    
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        // Text 實現(xiàn)了BinaryComparable類可以作為key值
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            // 解析鍵值對
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            
            for (IntWritable val : values) {
                sum += val.get();
            }

            result.set(sum);
            context.write(key, result);
        }
    }

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws Exception {

        String[] paths = {INPUT_PATH,OUTPUT_PATH};
        //獲得Configuration配置 Configuration: core-default.xml, core-site.xml 
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println("Usage: wordcount <in> <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        // 設置Mapper類
        job.setMapperClass(TokenizerMapper.class);
        // 設置Combiner類 
        job.setCombinerClass(IntSumReducer.class); 
        // 設置Reduce類
        job.setReducerClass(IntSumReducer.class); 
         // 設置輸出key的類型,注意跟reduce的輸出類型保持一致
        job.setOutputKeyClass(Text.class);
        // 設置輸出value的類型,注意跟reduce的輸出類型保持一致
        job.setOutputValueClass(IntWritable.class);
        // 設置輸入路徑
        FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      // 設置輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); 
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

解析

  • MapReduce的輸出路徑一定要保證文件夾不存在,最好的解決方法時在代碼中添加判斷,執(zhí)行之前刪除output文件夾(具體方法見下面的hdfs操作

  • MapReduce可以沒有輸出,但必須設置輸出路徑

  • MapReduce的輸入路徑可以直接寫hdfs的目錄路徑,然后放在集群下執(zhí)行,

      hadoop jar **.jar java類名 參數(shù)1 參數(shù)2 ...
    
  • Mapper

      //map
      public void map(Object key, Text value, Context context)
    

    前面兩個參數(shù)分別是輸入的key,value,Context context可以記錄輸入的key和value,context也可以記錄map運算的狀態(tài)
    map中的context記錄了map執(zhí)行的上下文,在mapper類中,context可以存儲一些job conf的信息,也就是說context是作為參數(shù)傳遞的載體。比如runner中configuration的set信息[conf.set(Str, strValue)],map中可以get到[context.getConfiguration().get(Str)]

      //setup
      protected void setup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
      //cleanup
      protected void cleanup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)
    

    MapReduce框架內(nèi)的setup和cleanup方法只會執(zhí)行一次,所以一些相關變量或者是資源的初始化和釋放最好是在setup中執(zhí)行,如果放在map中執(zhí)行,則在解析每一行數(shù)據(jù)的時候都會執(zhí)行一次,嚴重影響程序運行效率。

  • Reducer

      public void reduce(Text key, Iterable<IntWritable> values, Context context)
    

    reduce的輸入也是key/value形式,不過是values,也就是一個key對應的一組value,例如key,value1;key,value2...
    reducer不是必須的,如果用不到reducer階段可以不寫

    reduce會接收到不同map傳遞過來的數(shù)據(jù) ,并且每個map傳遞過來的數(shù)據(jù)都是有序的。如果reduce端接收到的數(shù)據(jù)量比較小,那么會存儲在內(nèi)存中,如果超出緩沖區(qū)大小一定比例,則會合并后寫到磁盤上

  • 調(diào)用 runner

      Configuration conf = new Configuration();
      //連接hbase,操作hbase
      Configuration conf = HBaseConfiguration.create();
    

    MapReduce運行之前都要初始化Configuration,主要是讀取MapReduce系統(tǒng)配置,如core-site.xml、hdfs-site.xml、mapred-site.xml、hbase-site.xml

      scan.setCaching(500); 
    

    增加緩存讀取條數(shù)(一次RPC調(diào)用返回多行紀錄,也就是每次從服務器端讀取的行數(shù)),加快scanner讀取速度,但耗費內(nèi)存增加,設太大會響應慢、超時或者OOM。

      setBatch(int batch)
    

    設置獲取紀錄的列個數(shù),默認無限制,也就是返回所有的列。實際上就是控制一次next()傳輸多少個columns,如batch為5表示每個result實例返回5個columns
    setBatch使用場景為,用客戶端的scanner緩存進行批量交互從而提高性能時,非常大的行可能無法放入客戶端的內(nèi)存,這時需要用HBase客戶端API中進行batching處理。

    scan.setCacheBlocks(false); 

默認是true,分內(nèi)存,緩存和磁盤,三種方式,一般數(shù)據(jù)的讀取為內(nèi)存->緩存->磁盤;

setCacheBlocks不適合MapReduce工作:
MR程序為非熱點數(shù)據(jù),不需要緩存,因為Blockcache is LRU,也就是最近最少訪問算法(扔掉最少訪問的),那么,前一個請求(比如map讀?。┳x入Blockcache的所有記錄在后一個請求(新的map讀取)中都沒有用,就必須全部被swap,那么RegionServer要不斷的進行無意義的swapping data,也就是無意義的輸入和輸出BlockCache,增加了無必要的IO。而普通讀取時局部查找,或者查找最熱數(shù)據(jù)時,會有提升性能的幫助。

runner方法中可以寫定義多個job,job會順序執(zhí)行。

五、常用hadoop fs命令 (類似Linux的文件操作命令,可類比學習使用)

-help
功能:輸出這個命令參數(shù)手冊

-ls
功能:顯示目錄信息
示例: hadoop fs -ls /yjq

-mkdir 
功能:在hdfs上創(chuàng)建目錄
示例:hadoop fs -mkdir -p /yjq/test

-moveFromLocal
功能:從本地剪切粘貼到hdfs
示例:hadoop fs -moveFromLocal /home/cdh/a.txt /yjq/test

-moveToLocal
功能:從hdfs剪切粘貼到本地
示例:hadoop fs -moveToLocal /yjq/test/a.txt /home/cdh/ 

-copyFromLocal
功能:從本地文件系統(tǒng)中拷貝文件到hdfs路徑去
示例:hadoop fs -copyFromLocal /home/cdh/a.txt /yjq/test

-copyToLocal
功能:從hdfs拷貝到本地
示例:hadoop fs -copyToLocal /yjq/test/a.txt /home/cdh/ 

-get
功能:等同于copyToLocal,從hdfs下載文件到本地路徑(.表示當前路徑)
示例:hadoop fs -get /yjq/test/a.txt .

-getmerge
功能:合并下載多個文件
示例:將目錄下所有的TXT文件下載到本地,并合并成一個文件
hadoop fs -getmerge /yjq/test/*.txt /home/cdh/test.txt

-put
功能:等同于copyFromLocal
示例:hadoop fs -put /home/cdh/a.txt /yjq/test

-cp
功能:從hdfs的一個路徑拷貝hdfs的另一個路徑
示例: hadoop fs -cp /yjq/test1/a.txt /yjq/test2/

-mv
功能:在hdfs目錄中移動文件
示例: hadoop fs -mv /yjq/test1/a.txt /yjq/test2/

-appendToFile
功能:追加一個文件到已經(jīng)存在的文件末尾(本地文件追加到hdfs)
示例:Hadoop fs -appendToFile /home/cdh/a.txt /yjq/test1/a.txt

-cat
功能:顯示文件內(nèi)容
示例:hadoop fs -cat /yjq/test1/a.txt

-tail
功能:顯示一個文件的末尾
示例:hadoop fs -tail /yjq/test1/a.txt

-text
功能:以字符形式打印一個文件的內(nèi)容
示例:hadoop fs -text /yjq/test1/a.txt

-chgrp、-chmod、-chown
功能:修改文件所屬權限 
示例:
hadoop fs -chmod 666 /yjq/test1/a.txt
# cdh為用戶名,hadoop為用戶組
hadoop fs -chown cdh:group /yjq/test1/a.txt

-rm
功能:刪除文件或文件夾
示例:hadoop fs -rm -r /yjq/test/a.txt

-df
功能:統(tǒng)計文件系統(tǒng)的可用空間信息
示例:hadoop fs -df -h /

-du
功能:統(tǒng)計文件夾的大小信息
示例:
hadoop fs -du -s -h /yjq/*

-count
功能:統(tǒng)計一個指定目錄下的文件節(jié)點數(shù)量
示例:hadoop fs -count /yjq/

六、HBase 相關操作

  1. 簡介
    • HBase是一個分布式的、面向列的開源數(shù)據(jù)庫
    • 表由行和列組成,列劃分為多個列族/列簇(column family)
    • RowKey:是Byte array,是表中每條記錄的“主鍵”,方便快速查找,Rowkey的設計非常重要。
    • Column Family:列族,擁有一個名稱(string),包含一個或者多個相關列
    • Column:屬于某一個columnfamily,familyName:columnName,每條記錄可動態(tài)添加
    • Hbase--圖片來源網(wǎng)絡
  1. 編碼

     Configuration conf = HBaseConfiguration.create();
    

    會自動讀取hbase-site.xml配置文件

     Scan scan = new Scan();
     scan.setCaching(1000);
     scan.setStartRow(getBytes(startDate));
     scan.setStopRow(getBytes(endDate));
    
     TableMapReduceUtil.initTableMapperJob(HB_TABLE_NAME, scan, NewsStreamUrlMapper.class, Text.class, Text.class, job);
    

    參數(shù):hbase table name,scan,mapper class,outputKeyClass,outputValueClass,job

七、hdfs操作

  1. 運算之前清除hdfs上的文件夾

     FileSystem fs = FileSystem.get(new Configuration());
     Path outputDir = new Path(OUTPUT_PATH);
     //運算之前如果文件夾存在則清除文件夾
     if(fs.exists(outputDir))
         fs.delete(outputDir, true);
    
  2. HDFS讀流程

    • 客戶端向NameNode發(fā)起讀數(shù)據(jù)請求
    • NameNode找出距離最近的DataNode節(jié)點信息
    • 客戶端從DataNode分塊下載文件
  3. HDFS寫流程

    • 客戶端向NameNode發(fā)起寫數(shù)據(jù)請求
    • 分塊寫入DataNode節(jié)點,DataNode自動完成副本備份
    • DataNode向NameNode匯報存儲完成,NameNode通知客戶端

八、多表操作

MultiTableInputFormat 支持多個mapper的輸出混合到一個shuffle,一個reducer,其中每個mapper擁有不同的inputFormat和mapper處理類。
所有的mapper需要輸出相同的數(shù)據(jù)類型,對于輸出value,需要標記該value來源,以便reducer識別

List<Scan> scans = new ArrayList<Scan>();  

Scan scan1 = new Scan();  
scan1.setCaching(100);  
scan1.setCacheBlocks(false);  
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inTable.getBytes());  
scans.add(scan1);  
  
Scan scan2 = new Scan();  
scan2.setCaching(100);  
scan2.setCacheBlocks(false);  
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inPhoneImsiTable.getBytes());  
scans.add(scan2);   

TableMapReduceUtil.initTableMapperJob(scans, ReadHbaseMapper.class, Text.class,Result.class, job);

九、錯誤處理

  1. ScannerTimeoutException:org.apache.hadoop.hbase.client.ScannerTimeoutException

    這是當從服務器傳輸數(shù)據(jù)到客戶端的時間,或者客戶端處理數(shù)據(jù)的時間大于了scanner設置的超時時間,scanner超時報錯,可在客戶端代碼中設置超時時間

     Configuration conf = HBaseConfiguration.create()              
     conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,120000) 
    

    如果Mapper階段對每條數(shù)據(jù)的處理時間過長,可以將scan.setCaching(1000)的值設置小一點,如果值設置太大,則處理時間會很長就會出現(xiàn)超時錯誤。

寫在最后

很久之前寫的學習筆記了,資料來源網(wǎng)絡及項目組內(nèi)的討論,參考文獻就不一一標注了,侵刪~

如果您覺得本文對您有幫助,點個贊吧~~

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容