使用MapReduce計算每年的銷售量和銷售額

1.Map端

package Task7.productsSales;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable; 
import org.apache.hadoop.io.Text; 
import org.apache.hadoop.mapreduce.Mapper; 
// k2  v2  
public class ProductsMapper extends Mapper<LongWritable, Text, Text, Products> { 
@Override 
protected void map(LongWritable key1, Text value1, Context context) 
throws IOException, InterruptedException { 
//存儲數(shù)據(jù)
String data = value1.toString(); 
//分詞 
String[] words = data.split(","); 
//創(chuàng)建商品對象 
Products p = new Products(); 

//日期---將日期只保留年份,即words數(shù)組的第三位
String date = words[2];
Date d = null;
try {
    d = new SimpleDateFormat("yyyy-mm-dd").parse(date);
} catch (ParseException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}
SimpleDateFormat Syear = new SimpleDateFormat("yyyy");
String year = Syear.format(d);
p.setPdate(year);

//銷售數(shù)量
p.setPnum(Integer.parseInt(words[5])); 

//銷售總額
p.setPtotal(Double.valueOf(words[6])); 

//輸出:k2 年份 v2 銷售筆數(shù) 
context.write(new Text(p.getPdate()), p); 
} 
}

2.Reduce端

package Task7.productsSales;

import java.io.IOException;
import java.text.DecimalFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer; 
public class ProductsReducer extends Reducer<Text, Products, Text, Text> { 
protected void reduce(Text k3, Iterable<Products> v3,Context context) throws IOException, InterruptedException {
/* 
* k3  v3 
*/ 
    int total_number = 0;
    double total_money = 0;
    
for(Products p:v3){ 
//總銷量
total_number = total_number+p.getPnum();
//總金額
total_money = total_money+p.getPtotal();
} 
DecimalFormat df = new DecimalFormat("#.00");
String show = "銷售量為:" + Integer.toString(total_number) + "---------" + "銷售總金額為: " + df.format(total_money);
context.write(k3, new Text(show)); 
} 
}

3.Main主類

package Task7.productsSales;

import org.apache.hadoop.conf.Configuration; 
import org.apache.hadoop.fs.Path; 
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job; 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
public class ProductsMain { 
public static void main(String[] args) throws Exception { 
//創(chuàng)建一個job 
Job job = Job.getInstance(new Configuration()); 
job.setJarByClass(ProductsMain.class); 
//指定job的mapper和輸出的類型 k2 v2 
job.setMapperClass(ProductsMapper.class); 
job.setMapOutputKeyClass(Text.class); //年份
job.setMapOutputValueClass(Products.class); //銷售量

//指定job的reducer和輸出的類型 k4 v4 
job.setReducerClass(ProductsReducer.class); 
job.setOutputKeyClass(Text.class); //年份
job.setOutputValueClass(Text.class); //銷售量

//指定job的輸入和輸出的路徑 
FileInputFormat.setInputPaths(job, new Path(args[0])); 
FileOutputFormat.setOutputPath(job, new Path(args[1])); 

//執(zhí)行任務 
job.waitForCompletion(true); 
} 
}

4.Product序列化

package Task7.productsSales;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;

//數(shù)據(jù): 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30 
public class Products implements Writable {
    private int pno;
    private int bno;
    private String pdate;
    private int pmethod;
    private int psales;
    private int pnum;
    private Double ptotal;

    public void readFields(DataInput input) throws IOException {
        // 反序列化
        this.pno = input.readInt();
        this.bno = input.readInt();
        this.pdate = input.readUTF();
        this.pmethod = input.readInt();
        this.psales = input.readInt();
        this.pnum = input.readInt();
        this.ptotal = input.readDouble();
    }

    public void write(DataOutput output) throws IOException {
        // 序列化
        output.writeInt(this.pno);
        output.writeInt(this.bno);
        output.writeUTF(this.pdate);
        output.writeInt(this.pmethod);
        output.writeInt(this.psales);
        output.writeInt(this.pnum);
        output.writeDouble(this.ptotal);
    }

    public int getPno() {
        return pno;
    }

    public void setPno(int pno) {
        this.pno = pno;
    }

    public int getBno() {
        return bno;
    }

    public void setBno(int bno) {
        this.bno = bno;
    }

    public String getPdate() {
        return pdate;
    }

    public void setPdate(String pdate) {
        this.pdate = pdate;
    }

    public int getPmethod() {
        return pmethod;
    }

    public void setPmethod(int pmethod) {
        this.pmethod = pmethod;
    }
    
    public int getPsales() {
        return psales;
    }

    public void setPsales(int psales) {
        this.psales = psales;
    }


    public int getPnum() {
        return pnum;
    }

    public void setPnum(int pnum) {
        this.pnum = pnum;
    }
    
    public Double getPtotal() {
        return ptotal;
    }

    public void setPtotal(Double potal) {
        this.ptotal = potal;
    }

}

5.在Linux終端,使用命令將要統(tǒng)計的txt文件上傳到HDFS中

hdfs dfs -put   linux下的文件路徑  要上傳到hdfs中的路徑

6.在Linux終端輸入進入上述java代碼所在路徑,打包成jar包

mvn clean package

7.接著使用命令:cd target 進入target目錄
8.運行程序

hadoop  jar ...具體命令忘記了。。。書上有,看書
注意此處運行后存放結(jié)果的那個文件,必須是不存在的,不能事先創(chuàng)建好

9.得到并查看結(jié)果

hdfs dfs -cat ...填寫你自己設(shè)置的顯示文件的地址
image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容