之前一直用hive處理數(shù)據(jù),覺得MR程序打包上傳的比較麻煩,后來偶遇hive搞不定的文件網(wǎng)上找了個(gè)MR的例子稍微改一下感覺也比較方便,主要是處理速度快。
MR程序主要是有3各類:main函數(shù)類,map重載類,reduce重載類。
第一步:maven里面添加幾個(gè)jar包:
代碼如下:

第二步:main類:主要是調(diào)度MR程序的啟動(dòng)運(yùn)行
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String[]args)throws Exception
{
Configuration conf =new Configuration();//從hadoop配置文件讀取參數(shù)
?String [] otherArgs =new GenericOptionsParser(conf,args).getRemainingArgs();//從命令行讀取參數(shù)
?if(otherArgs.length!=2)
{
System.err.println("Usage:wordcount");
System.exit(2);
}
Job job =new Job(conf,"wordcount");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
第三步:Map類:主要是按行讀取文件內(nèi)容,根據(jù)自己需要處理(默認(rèn)按回車換行分割,如果要改動(dòng)需要重載某個(gè)函數(shù))
代碼如下:
import java.io.IOException; //導(dǎo)入異常處理類
import org.apache.hadoop.io.IntWritable; //導(dǎo)入整數(shù)類
import org.apache.hadoop.io.Text; //導(dǎo)入文本類
import org.apache.hadoop.mapreduce.Mapper; //導(dǎo)入Mapper類
public class TokenizerMapper extends Mapper{
? ? Text word = new Text();? ? ? ? ? ? ? ? //定義輸出鍵
? ? //統(tǒng)計(jì)每行數(shù)據(jù)每個(gè)id,aaa字符出現(xiàn)的次數(shù)
? ? public void map(Object key,Text value,Context context)throws IOException,InterruptedException
? ? {
? ? ? ? int len = 0;
? ? ? ? String id="";
? ? ? ? len = appearNumber(value.toString(),"aaa");? ? //統(tǒng)計(jì)aaa出現(xiàn)次數(shù)
? ? ? ? index = value.toString().indexOf("\"id\":\"");
? ? ? ? if(index>0)
? ? ? ? {
? ? ? ? ? ? id = value.toString().substring(index+n,index+m); //取每行id值
? ? ? ? ? ? IntWritable one = new IntWritable(len);
? ? ? ? ? ? Text id_t = new Text(id);
? ? ? ? ? ? context.write(id_t,one);
? ? ? ? }
? ? }
? ? public static int appearNumber(String srcText, String findText) {
? ? ? ? int count = 0;
? ? ? ? int index = 0;
? ? ? ? while ((index = srcText.indexOf(findText, index)) != -1) {
? ? ? ? ? ? index = index + findText.length();
? ? ? ? ? ? count++;
? ? ? ? }
? ? ? ? return count;
? ? }
}
第四步:重載reduce類
? 代碼如下:
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class IntSumReducer extends Reducer{
?IntWritable result = new IntWritable();
?public void reduce(Text key,Iterable values,Context context)
? ? ? ? ? ? throws IOException,InterruptedException
? ? {
? ? ? ? int sum = 0;
? ? ? ? for(IntWritable val:values) //不斷地將values中的IntWritable整數(shù)提取出來給val
? ? ? ? {
? ? ? ? ? ? sum=sum + val.get();
? ? ? ? }
? ? ? ? result.set(sum);
? ? ? ? context.write(key,result);? //每個(gè)id的aaa出現(xiàn)的次數(shù)求和輸出
? ? }
}
第五步:打成jar包。上傳hadoop,運(yùn)行
hadoop? jar? mr_test1.jar? WordCount? hdfs:///myfile/test.log? ? hdfs:///myfile/output10
PS:hdfs:///myfile/2018042319.log? 為輸入日志文件
? ? ? ? hdfs:///myfile/output10 指定的輸出目錄
結(jié)果文件在:hdfs:///myfile/output10/part-r-00000
為了驗(yàn)證reduce階段的作用,我曾把main函數(shù)中job.setReducerClass(IntSumReducer.class); 這句代碼注掉,跑出的結(jié)果為每個(gè)id每行aaa出現(xiàn)的次數(shù)。將結(jié)果文件某個(gè)id? grep出來,例如結(jié)果有17行,然后對(duì)17行的value求和,和加上job.setReducerClass(IntSumReducer.class)這句,這個(gè)id的結(jié)果完全一致。
MR程序的運(yùn)行速度對(duì)比
MR程序的運(yùn)行速度感覺比較快,像這個(gè)日志文件約10G,一開始的時(shí)候出于簡(jiǎn)單的想法寫了一個(gè)shell腳本去處理,放在內(nèi)存192G,CPU64核物理機(jī)運(yùn)行,發(fā)現(xiàn)每秒大概只能處理不到1000條數(shù)據(jù),算下來跑完需要約10小時(shí),想shell的結(jié)果和mr的結(jié)果進(jìn)行對(duì)比,所以就沒有kill。如果只是慢也就算了,shell腳本執(zhí)行完幾個(gè)小時(shí),運(yùn)維忽然通知這臺(tái)服務(wù)器重啟了,也沒有太詳細(xì)的dump信息,說大概是內(nèi)存不足。。。幸虧只是一臺(tái)節(jié)點(diǎn)機(jī),無其它定時(shí)任務(wù),所以對(duì)集群無影響。估計(jì)是shell管理內(nèi)存有硬傷,系統(tǒng)本身也有一些內(nèi)存沒來得及釋放。
上面的MR程序運(yùn)行在同樣配置的物理機(jī)集群,約40臺(tái),10G文件運(yùn)行也就幾分鐘,對(duì)比shell的結(jié)果基本相同,感覺速度快了不止一個(gè)數(shù)量級(jí)。