1.Map端
package Task7.productsSales;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// k2 v2
public class ProductsMapper extends Mapper<LongWritable, Text, Text, Products> {
@Override
protected void map(LongWritable key1, Text value1, Context context)
throws IOException, InterruptedException {
//存儲數(shù)據(jù)
String data = value1.toString();
//分詞
String[] words = data.split(",");
//創(chuàng)建商品對象
Products p = new Products();
//日期---將日期只保留年份,即words數(shù)組的第三位
String date = words[2];
Date d = null;
try {
d = new SimpleDateFormat("yyyy-mm-dd").parse(date);
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
SimpleDateFormat Syear = new SimpleDateFormat("yyyy");
String year = Syear.format(d);
p.setPdate(year);
//銷售數(shù)量
p.setPnum(Integer.parseInt(words[5]));
//銷售總額
p.setPtotal(Double.valueOf(words[6]));
//輸出:k2 年份 v2 銷售筆數(shù)
context.write(new Text(p.getPdate()), p);
}
}
2.Reduce端
package Task7.productsSales;
import java.io.IOException;
import java.text.DecimalFormat;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ProductsReducer extends Reducer<Text, Products, Text, Text> {
protected void reduce(Text k3, Iterable<Products> v3,Context context) throws IOException, InterruptedException {
/*
* k3 v3
*/
int total_number = 0;
double total_money = 0;
for(Products p:v3){
//總銷量
total_number = total_number+p.getPnum();
//總金額
total_money = total_money+p.getPtotal();
}
DecimalFormat df = new DecimalFormat("#.00");
String show = "銷售量為:" + Integer.toString(total_number) + "---------" + "銷售總金額為: " + df.format(total_money);
context.write(k3, new Text(show));
}
}
3.Main主類
package Task7.productsSales;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ProductsMain {
public static void main(String[] args) throws Exception {
//創(chuàng)建一個job
Job job = Job.getInstance(new Configuration());
job.setJarByClass(ProductsMain.class);
//指定job的mapper和輸出的類型 k2 v2
job.setMapperClass(ProductsMapper.class);
job.setMapOutputKeyClass(Text.class); //年份
job.setMapOutputValueClass(Products.class); //銷售量
//指定job的reducer和輸出的類型 k4 v4
job.setReducerClass(ProductsReducer.class);
job.setOutputKeyClass(Text.class); //年份
job.setOutputValueClass(Text.class); //銷售量
//指定job的輸入和輸出的路徑
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//執(zhí)行任務
job.waitForCompletion(true);
}
}
4.Product序列化
package Task7.productsSales;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
//數(shù)據(jù): 7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30
public class Products implements Writable {
private int pno;
private int bno;
private String pdate;
private int pmethod;
private int psales;
private int pnum;
private Double ptotal;
public void readFields(DataInput input) throws IOException {
// 反序列化
this.pno = input.readInt();
this.bno = input.readInt();
this.pdate = input.readUTF();
this.pmethod = input.readInt();
this.psales = input.readInt();
this.pnum = input.readInt();
this.ptotal = input.readDouble();
}
public void write(DataOutput output) throws IOException {
// 序列化
output.writeInt(this.pno);
output.writeInt(this.bno);
output.writeUTF(this.pdate);
output.writeInt(this.pmethod);
output.writeInt(this.psales);
output.writeInt(this.pnum);
output.writeDouble(this.ptotal);
}
public int getPno() {
return pno;
}
public void setPno(int pno) {
this.pno = pno;
}
public int getBno() {
return bno;
}
public void setBno(int bno) {
this.bno = bno;
}
public String getPdate() {
return pdate;
}
public void setPdate(String pdate) {
this.pdate = pdate;
}
public int getPmethod() {
return pmethod;
}
public void setPmethod(int pmethod) {
this.pmethod = pmethod;
}
public int getPsales() {
return psales;
}
public void setPsales(int psales) {
this.psales = psales;
}
public int getPnum() {
return pnum;
}
public void setPnum(int pnum) {
this.pnum = pnum;
}
public Double getPtotal() {
return ptotal;
}
public void setPtotal(Double potal) {
this.ptotal = potal;
}
}
5.在Linux終端,使用命令將要統(tǒng)計的txt文件上傳到HDFS中
hdfs dfs -put linux下的文件路徑 要上傳到hdfs中的路徑
6.在Linux終端輸入進入上述java代碼所在路徑,打包成jar包
mvn clean package
7.接著使用命令:cd target 進入target目錄
8.運行程序
hadoop jar ...具體命令忘記了。。。書上有,看書
注意此處運行后存放結(jié)果的那個文件,必須是不存在的,不能事先創(chuàng)建好
9.得到并查看結(jié)果
hdfs dfs -cat ...填寫你自己設(shè)置的顯示文件的地址

image.png