前言
本文是個人之前紀錄的MapReduce學習筆記,主要涉及到MapReduce基本概念、Hadoop 經(jīng)典示例WordCount的使用解析、hdfs與hbase的簡單了解使用?,F(xiàn)在整理了一下分享出來,希望對別人有所幫助。
學習MapReduce一定要理解這種Map、Reduce的編程模型以及Mapper、Reducer數(shù)據(jù)處理的原理,否則只是一味的復制粘貼可能比較難上手。
同時學習大數(shù)據(jù)的知識,一定要將自己對分布式的理解研究透徹。
一、概念理解
- MapReduce 是一種線性的可伸縮的編程模型,用于大規(guī)模數(shù)據(jù)集(大于1TB)的并行運算
- 在MapReduce里,Map處理的是原始數(shù)據(jù),每條數(shù)據(jù)之間互相沒有關系(這一點一定要注意)。Reduce階段,以key為標識,對同一個key下的value進行統(tǒng)計,類似{key,[value1,value2……]}
- 可以把MapReduce理解為,把一堆雜亂無章的數(shù)據(jù)按照某種特征歸納起來,然后處理并得到最后的結(jié)果。
Map面對的是雜亂無章的互不相關的數(shù)據(jù),它解析每個數(shù)據(jù),從中提取出key和value,也就是提取了數(shù)據(jù)的特征。
經(jīng)過MapReduce的Shuffle階段之后,在Reduce階段看到的都是已經(jīng)歸納好的數(shù)據(jù)了,在此基礎上我們可以做進一步的處理以便得到結(jié)果。 - 缺點:不適用于實時計算,實時計算一般最低都是要求秒級響應的,MR很難滿足這個要求,實時計算一般采用storm等流式計算系統(tǒng)
-
MapReduce計算流程
MapReduce計算流程--來源網(wǎng)絡
二、編程模型
- 每個應用程序稱為一個作業(yè)(Job),每個Job是由一系列的Mapper和Reducer來完成
- 任務過程分為兩個階段,map和reduce階段,兩個階段都是使用鍵值對(key/value)作為輸入輸出的
- 每個Mapper處理一個Split,每個split對應一個map線程。Split中的數(shù)據(jù)作為map的輸入,map的輸出一定在map端
- Map方法:Map(k1,v1) -> list(k2,v2) ,并行應用于每一個輸入的數(shù)據(jù)集,每一次調(diào)用都會產(chǎn)生一個(k2,v2)的隊列 。
- Reduce方法:Reduce(k2,list(v2)) -> list(k3,v3)。收集map端輸出隊列l(wèi)ist(k2,v2)中有相同key的數(shù)據(jù)對,把它們聚集在一起,輸出時形成目的數(shù)據(jù) list(k3,v3)。
- 新舊版本API的區(qū)別:
- 新的api放在:org.apache.hadoop.mapreduce,舊版api放在:org.apache.hadoop.mapred
- 新API使用虛類,舊版使用的是接口,虛類更加利于擴展
三、運行機制
-
輸入分片(input split)
map計算之前,MapReduce會根據(jù)輸入文件計算輸入分片(input -> spliting),每個input split針對一個map任務。split存儲的并不是數(shù)據(jù),而是一個分片長度和一個記錄數(shù)據(jù)的位置的數(shù)組
-
map階段
map階段的操作一般都是在數(shù)據(jù)存儲節(jié)點上操作,所以有時候為了能夠減輕數(shù)據(jù)傳輸?shù)木W(wǎng)絡壓力,可以先combiner階段處理一下數(shù)據(jù),在進行reduce
-
combiner階段
此階段是可選的,不是必須經(jīng)過的一個階段,combiner其實也是一種reduce操作,可以說combiner是一種本地化的reduce操作,是map運算的后續(xù)操作,可以減輕網(wǎng)絡傳輸?shù)膲毫?。但是combiner的使用需要注意不要影響到reduce的最終結(jié)果,比如計算平均值的時候如果使用combiner就會影響最終的結(jié)果,但是計算總數(shù)的話則對最終結(jié)果沒影響
-
shuffle階段
將map的輸出作為reduce的輸入,這個過程就是shuffle,是MapReduce優(yōu)化的重要階段。
-
reduce階段
reducer階段,輸入是shuffle階段的輸出,對每個不同的鍵和該鍵對應的值的數(shù)據(jù)流進行獨立、并行的處理。
四、WordCount--官方提供的example
代碼
package com.smile.test;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
private static final String INPUT_PATH = "/user/cdh/yjq/input/words.txt";
//hdfs輸出路徑
private static final String OUTPUT_PATH = "/user/cdh/yjq/output/";
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
// Text 實現(xiàn)了BinaryComparable類可以作為key值
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 解析鍵值對
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
@SuppressWarnings("deprecation")
public static void main(String[] args) throws Exception {
String[] paths = {INPUT_PATH,OUTPUT_PATH};
//獲得Configuration配置 Configuration: core-default.xml, core-site.xml
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, paths).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
// 設置Mapper類
job.setMapperClass(TokenizerMapper.class);
// 設置Combiner類
job.setCombinerClass(IntSumReducer.class);
// 設置Reduce類
job.setReducerClass(IntSumReducer.class);
// 設置輸出key的類型,注意跟reduce的輸出類型保持一致
job.setOutputKeyClass(Text.class);
// 設置輸出value的類型,注意跟reduce的輸出類型保持一致
job.setOutputValueClass(IntWritable.class);
// 設置輸入路徑
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
// 設置輸出路徑
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
解析
MapReduce的輸出路徑一定要保證文件夾不存在,最好的解決方法時在代碼中添加判斷,執(zhí)行之前刪除output文件夾(具體方法見下面的hdfs操作)
MapReduce可以沒有輸出,但必須設置輸出路徑
-
MapReduce的輸入路徑可以直接寫hdfs的目錄路徑,然后放在集群下執(zhí)行,
hadoop jar **.jar java類名 參數(shù)1 參數(shù)2 ... -
Mapper
//map public void map(Object key, Text value, Context context)前面兩個參數(shù)分別是輸入的key,value,Context context可以記錄輸入的key和value,context也可以記錄map運算的狀態(tài)
map中的context記錄了map執(zhí)行的上下文,在mapper類中,context可以存儲一些job conf的信息,也就是說context是作為參數(shù)傳遞的載體。比如runner中configuration的set信息[conf.set(Str, strValue)],map中可以get到[context.getConfiguration().get(Str)]//setup protected void setup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context) //cleanup protected void cleanup(Mapper<ImmutableBytesWritable, Result, Text, Text>.Context context)MapReduce框架內(nèi)的setup和cleanup方法只會執(zhí)行一次,所以一些相關變量或者是資源的初始化和釋放最好是在setup中執(zhí)行,如果放在map中執(zhí)行,則在解析每一行數(shù)據(jù)的時候都會執(zhí)行一次,嚴重影響程序運行效率。
-
Reducer
public void reduce(Text key, Iterable<IntWritable> values, Context context)reduce的輸入也是key/value形式,不過是values,也就是一個key對應的一組value,例如key,value1;key,value2...
reducer不是必須的,如果用不到reducer階段可以不寫reduce會接收到不同map傳遞過來的數(shù)據(jù) ,并且每個map傳遞過來的數(shù)據(jù)都是有序的。如果reduce端接收到的數(shù)據(jù)量比較小,那么會存儲在內(nèi)存中,如果超出緩沖區(qū)大小一定比例,則會合并后寫到磁盤上
-
調(diào)用 runner
Configuration conf = new Configuration(); //連接hbase,操作hbase Configuration conf = HBaseConfiguration.create();MapReduce運行之前都要初始化Configuration,主要是讀取MapReduce系統(tǒng)配置,如core-site.xml、hdfs-site.xml、mapred-site.xml、hbase-site.xml
scan.setCaching(500);增加緩存讀取條數(shù)(一次RPC調(diào)用返回多行紀錄,也就是每次從服務器端讀取的行數(shù)),加快scanner讀取速度,但耗費內(nèi)存增加,設太大會響應慢、超時或者OOM。
setBatch(int batch)設置獲取紀錄的列個數(shù),默認無限制,也就是返回所有的列。實際上就是控制一次next()傳輸多少個columns,如batch為5表示每個result實例返回5個columns
setBatch使用場景為,用客戶端的scanner緩存進行批量交互從而提高性能時,非常大的行可能無法放入客戶端的內(nèi)存,這時需要用HBase客戶端API中進行batching處理。
scan.setCacheBlocks(false);
默認是true,分內(nèi)存,緩存和磁盤,三種方式,一般數(shù)據(jù)的讀取為內(nèi)存->緩存->磁盤;
setCacheBlocks不適合MapReduce工作:
MR程序為非熱點數(shù)據(jù),不需要緩存,因為Blockcache is LRU,也就是最近最少訪問算法(扔掉最少訪問的),那么,前一個請求(比如map讀?。┳x入Blockcache的所有記錄在后一個請求(新的map讀取)中都沒有用,就必須全部被swap,那么RegionServer要不斷的進行無意義的swapping data,也就是無意義的輸入和輸出BlockCache,增加了無必要的IO。而普通讀取時局部查找,或者查找最熱數(shù)據(jù)時,會有提升性能的幫助。
runner方法中可以寫定義多個job,job會順序執(zhí)行。
五、常用hadoop fs命令 (類似Linux的文件操作命令,可類比學習使用)
-help
功能:輸出這個命令參數(shù)手冊
-ls
功能:顯示目錄信息
示例: hadoop fs -ls /yjq
-mkdir
功能:在hdfs上創(chuàng)建目錄
示例:hadoop fs -mkdir -p /yjq/test
-moveFromLocal
功能:從本地剪切粘貼到hdfs
示例:hadoop fs -moveFromLocal /home/cdh/a.txt /yjq/test
-moveToLocal
功能:從hdfs剪切粘貼到本地
示例:hadoop fs -moveToLocal /yjq/test/a.txt /home/cdh/
-copyFromLocal
功能:從本地文件系統(tǒng)中拷貝文件到hdfs路徑去
示例:hadoop fs -copyFromLocal /home/cdh/a.txt /yjq/test
-copyToLocal
功能:從hdfs拷貝到本地
示例:hadoop fs -copyToLocal /yjq/test/a.txt /home/cdh/
-get
功能:等同于copyToLocal,從hdfs下載文件到本地路徑(.表示當前路徑)
示例:hadoop fs -get /yjq/test/a.txt .
-getmerge
功能:合并下載多個文件
示例:將目錄下所有的TXT文件下載到本地,并合并成一個文件
hadoop fs -getmerge /yjq/test/*.txt /home/cdh/test.txt
-put
功能:等同于copyFromLocal
示例:hadoop fs -put /home/cdh/a.txt /yjq/test
-cp
功能:從hdfs的一個路徑拷貝hdfs的另一個路徑
示例: hadoop fs -cp /yjq/test1/a.txt /yjq/test2/
-mv
功能:在hdfs目錄中移動文件
示例: hadoop fs -mv /yjq/test1/a.txt /yjq/test2/
-appendToFile
功能:追加一個文件到已經(jīng)存在的文件末尾(本地文件追加到hdfs)
示例:Hadoop fs -appendToFile /home/cdh/a.txt /yjq/test1/a.txt
-cat
功能:顯示文件內(nèi)容
示例:hadoop fs -cat /yjq/test1/a.txt
-tail
功能:顯示一個文件的末尾
示例:hadoop fs -tail /yjq/test1/a.txt
-text
功能:以字符形式打印一個文件的內(nèi)容
示例:hadoop fs -text /yjq/test1/a.txt
-chgrp、-chmod、-chown
功能:修改文件所屬權限
示例:
hadoop fs -chmod 666 /yjq/test1/a.txt
# cdh為用戶名,hadoop為用戶組
hadoop fs -chown cdh:group /yjq/test1/a.txt
-rm
功能:刪除文件或文件夾
示例:hadoop fs -rm -r /yjq/test/a.txt
-df
功能:統(tǒng)計文件系統(tǒng)的可用空間信息
示例:hadoop fs -df -h /
-du
功能:統(tǒng)計文件夾的大小信息
示例:
hadoop fs -du -s -h /yjq/*
-count
功能:統(tǒng)計一個指定目錄下的文件節(jié)點數(shù)量
示例:hadoop fs -count /yjq/
六、HBase 相關操作
- 簡介
- HBase是一個分布式的、面向列的開源數(shù)據(jù)庫
- 表由行和列組成,列劃分為多個列族/列簇(column family)
- RowKey:是Byte array,是表中每條記錄的“主鍵”,方便快速查找,Rowkey的設計非常重要。
- Column Family:列族,擁有一個名稱(string),包含一個或者多個相關列
- Column:屬于某一個columnfamily,familyName:columnName,每條記錄可動態(tài)添加
- Hbase--圖片來源網(wǎng)絡
-
編碼
Configuration conf = HBaseConfiguration.create();會自動讀取hbase-site.xml配置文件
Scan scan = new Scan(); scan.setCaching(1000); scan.setStartRow(getBytes(startDate)); scan.setStopRow(getBytes(endDate)); TableMapReduceUtil.initTableMapperJob(HB_TABLE_NAME, scan, NewsStreamUrlMapper.class, Text.class, Text.class, job);參數(shù):hbase table name,scan,mapper class,outputKeyClass,outputValueClass,job
七、hdfs操作
-
運算之前清除hdfs上的文件夾
FileSystem fs = FileSystem.get(new Configuration()); Path outputDir = new Path(OUTPUT_PATH); //運算之前如果文件夾存在則清除文件夾 if(fs.exists(outputDir)) fs.delete(outputDir, true); -
HDFS讀流程
- 客戶端向NameNode發(fā)起讀數(shù)據(jù)請求
- NameNode找出距離最近的DataNode節(jié)點信息
- 客戶端從DataNode分塊下載文件
-
HDFS寫流程
- 客戶端向NameNode發(fā)起寫數(shù)據(jù)請求
- 分塊寫入DataNode節(jié)點,DataNode自動完成副本備份
- DataNode向NameNode匯報存儲完成,NameNode通知客戶端
八、多表操作
MultiTableInputFormat 支持多個mapper的輸出混合到一個shuffle,一個reducer,其中每個mapper擁有不同的inputFormat和mapper處理類。
所有的mapper需要輸出相同的數(shù)據(jù)類型,對于輸出value,需要標記該value來源,以便reducer識別
List<Scan> scans = new ArrayList<Scan>();
Scan scan1 = new Scan();
scan1.setCaching(100);
scan1.setCacheBlocks(false);
scan1.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inTable.getBytes());
scans.add(scan1);
Scan scan2 = new Scan();
scan2.setCaching(100);
scan2.setCacheBlocks(false);
scan2.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, inPhoneImsiTable.getBytes());
scans.add(scan2);
TableMapReduceUtil.initTableMapperJob(scans, ReadHbaseMapper.class, Text.class,Result.class, job);
九、錯誤處理
-
ScannerTimeoutException:org.apache.hadoop.hbase.client.ScannerTimeoutException
這是當從服務器傳輸數(shù)據(jù)到客戶端的時間,或者客戶端處理數(shù)據(jù)的時間大于了scanner設置的超時時間,scanner超時報錯,可在客戶端代碼中設置超時時間
Configuration conf = HBaseConfiguration.create() conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,120000)如果Mapper階段對每條數(shù)據(jù)的處理時間過長,可以將scan.setCaching(1000)的值設置小一點,如果值設置太大,則處理時間會很長就會出現(xiàn)超時錯誤。
寫在最后
很久之前寫的學習筆記了,資料來源網(wǎng)絡及項目組內(nèi)的討論,參考文獻就不一一標注了,侵刪~
如果您覺得本文對您有幫助,點個贊吧~~

