一小時(shí)搞定Mapreduce程序

之前一直用hive處理數(shù)據(jù),覺得MR程序打包上傳的比較麻煩,后來偶遇hive搞不定的文件網(wǎng)上找了個(gè)MR的例子稍微改一下感覺也比較方便,主要是處理速度快。

MR程序主要是有3各類:main函數(shù)類,map重載類,reduce重載類。

第一步:maven里面添加幾個(gè)jar包:

代碼如下:


第二步:main類:主要是調(diào)度MR程序的啟動(dòng)運(yùn)行

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static void main(String[]args)throws Exception

{

Configuration conf =new Configuration();//從hadoop配置文件讀取參數(shù)

?String [] otherArgs =new GenericOptionsParser(conf,args).getRemainingArgs();//從命令行讀取參數(shù)

?if(otherArgs.length!=2)

{

System.err.println("Usage:wordcount");

System.exit(2);

}

Job job =new Job(conf,"wordcount");

job.setJarByClass(WordCount.class);

job.setMapperClass(TokenizerMapper.class);

job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job,new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true)?0:1);

}

}

第三步:Map類:主要是按行讀取文件內(nèi)容,根據(jù)自己需要處理(默認(rèn)按回車換行分割,如果要改動(dòng)需要重載某個(gè)函數(shù))

代碼如下:

import java.io.IOException; //導(dǎo)入異常處理類

import org.apache.hadoop.io.IntWritable; //導(dǎo)入整數(shù)類

import org.apache.hadoop.io.Text; //導(dǎo)入文本類

import org.apache.hadoop.mapreduce.Mapper; //導(dǎo)入Mapper類

public class TokenizerMapper extends Mapper{

? ? Text word = new Text();? ? ? ? ? ? ? ? //定義輸出鍵

? ? //統(tǒng)計(jì)每行數(shù)據(jù)每個(gè)id,aaa字符出現(xiàn)的次數(shù)

? ? public void map(Object key,Text value,Context context)throws IOException,InterruptedException

? ? {

? ? ? ? int len = 0;

? ? ? ? String id="";

? ? ? ? len = appearNumber(value.toString(),"aaa");? ? //統(tǒng)計(jì)aaa出現(xiàn)次數(shù)

? ? ? ? index = value.toString().indexOf("\"id\":\"");

? ? ? ? if(index>0)

? ? ? ? {

? ? ? ? ? ? id = value.toString().substring(index+n,index+m); //取每行id值

? ? ? ? ? ? IntWritable one = new IntWritable(len);

? ? ? ? ? ? Text id_t = new Text(id);

? ? ? ? ? ? context.write(id_t,one);

? ? ? ? }

? ? }

? ? public static int appearNumber(String srcText, String findText) {

? ? ? ? int count = 0;

? ? ? ? int index = 0;

? ? ? ? while ((index = srcText.indexOf(findText, index)) != -1) {

? ? ? ? ? ? index = index + findText.length();

? ? ? ? ? ? count++;

? ? ? ? }

? ? ? ? return count;

? ? }

}

第四步:重載reduce類

? 代碼如下:

import java.io.IOException;import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class IntSumReducer extends Reducer{

?IntWritable result = new IntWritable();

?public void reduce(Text key,Iterable values,Context context)

? ? ? ? ? ? throws IOException,InterruptedException

? ? {

? ? ? ? int sum = 0;

? ? ? ? for(IntWritable val:values) //不斷地將values中的IntWritable整數(shù)提取出來給val

? ? ? ? {

? ? ? ? ? ? sum=sum + val.get();

? ? ? ? }

? ? ? ? result.set(sum);

? ? ? ? context.write(key,result);? //每個(gè)id的aaa出現(xiàn)的次數(shù)求和輸出

? ? }

}

第五步:打成jar包。上傳hadoop,運(yùn)行

hadoop? jar? mr_test1.jar? WordCount? hdfs:///myfile/test.log? ? hdfs:///myfile/output10

PS:hdfs:///myfile/2018042319.log? 為輸入日志文件

? ? ? ? hdfs:///myfile/output10 指定的輸出目錄

結(jié)果文件在:hdfs:///myfile/output10/part-r-00000

為了驗(yàn)證reduce階段的作用,我曾把main函數(shù)中job.setReducerClass(IntSumReducer.class); 這句代碼注掉,跑出的結(jié)果為每個(gè)id每行aaa出現(xiàn)的次數(shù)。將結(jié)果文件某個(gè)id? grep出來,例如結(jié)果有17行,然后對(duì)17行的value求和,和加上job.setReducerClass(IntSumReducer.class)這句,這個(gè)id的結(jié)果完全一致。

MR程序的運(yùn)行速度對(duì)比

MR程序的運(yùn)行速度感覺比較快,像這個(gè)日志文件約10G,一開始的時(shí)候出于簡(jiǎn)單的想法寫了一個(gè)shell腳本去處理,放在內(nèi)存192G,CPU64核物理機(jī)運(yùn)行,發(fā)現(xiàn)每秒大概只能處理不到1000條數(shù)據(jù),算下來跑完需要約10小時(shí),想shell的結(jié)果和mr的結(jié)果進(jìn)行對(duì)比,所以就沒有kill。如果只是慢也就算了,shell腳本執(zhí)行完幾個(gè)小時(shí),運(yùn)維忽然通知這臺(tái)服務(wù)器重啟了,也沒有太詳細(xì)的dump信息,說大概是內(nèi)存不足。。。幸虧只是一臺(tái)節(jié)點(diǎn)機(jī),無其它定時(shí)任務(wù),所以對(duì)集群無影響。估計(jì)是shell管理內(nèi)存有硬傷,系統(tǒng)本身也有一些內(nèi)存沒來得及釋放。

上面的MR程序運(yùn)行在同樣配置的物理機(jī)集群,約40臺(tái),10G文件運(yùn)行也就幾分鐘,對(duì)比shell的結(jié)果基本相同,感覺速度快了不止一個(gè)數(shù)量級(jí)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 Netflix電影推薦的百萬美金比賽,把“推薦”變成了時(shí)下最熱門的數(shù)據(jù)挖掘算法之一。也正是由于Netflix...
    Alukar閱讀 1,578評(píng)論 0 11
  • MapReduce編程重點(diǎn)把握 MapReduce核心概念 思考幾個(gè)問題 詞頻統(tǒng)計(jì)wordcount的具體執(zhí)行過程...
    胖胖的大羅閱讀 781評(píng)論 0 1
  • 一場(chǎng)大雨嚇跑了所有行人,給我一個(gè)與湖獨(dú)處的機(jī)會(huì)。我撐起傘到湖畔散步。 腳下的灰猶豫不定,湖面的影躲躲藏藏。建筑物隱...
    陳果_周綠閱讀 276評(píng)論 0 3
  • 在那些青春年少的歲月中,多少都會(huì)有些青澀美好的記憶,有的人相識(shí),有的人分離,有的人成為朋友而有的人卻咫尺天涯。 故...
    隨峰星起閱讀 211評(píng)論 0 0
  • 蒙頂甘露茶樹花,欣逢電商發(fā)新芽。 昔日皇家貢品茶,今入尋常網(wǎng)民家。 ——素生 2017年8月16日 蒙頂山下
    楊共同學(xué)閱讀 281評(píng)論 0 2

友情鏈接更多精彩內(nèi)容