package mr;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MyGL {
private static class MyGLMapper extends Mapper<LongWritable, Text, Text, Text>{
/*輸入類型是LongWritable,Text(上下文);
輸出類型是Text,Text(也就是Reduce的輸入類型)*/
public void map(LongWritable k1, Text v1, Context context)
throws java.io.IOException, java.lang.InterruptedException
//map()函數(shù)是固定模式的,三個參數(shù)
{
// 1 2
String[] lines= v1.toString().split("\t");
// \t 在同一個緩沖區(qū)內(nèi)橫向跳8個空格(Tab鍵);split()方法用于把一個字符串分割
//成字符串數(shù)組;v1指的是一行,把第一行的兩個單詞存進lines
if(lines.length!=2 || lines[0].trim().equals("child"))
return; //child parent
String word1=lines[0].trim(); //tom ->去掉 lines[0]里面的空格符
String word2=lines[1].trim(); //lucy
context.write(new Text(word1), new Text("1"+","+word1+","+word2));
//第一個Text對應Mapper的第三個Text;第二個Text對應Mapper的第四個Text
context.write(new Text(word2), new Text("2"+","+word1+","+word2));
//tom,1+tom+lucy
System.out.println("map......"+word1+"-"+word2);
}
}
private static class MyGLReduce extends Reducer<Text, Text, Text, Text>{
public void reduce(Text key, Iterable<Text> values, Context context)
throws java.io.IOException, java.lang.InterruptedException
//context:上下文對象,在整個wordcount運算生命周期內(nèi)存活
{
List<String> grandch=new ArrayList(); //泛型
List<String> grandpa=new ArrayList();
/* lucy 2+tom+lucy
lucy 1+lucy+mary
2->split[1] tom 2的話取1
1->split[2] mary 1的話取2
k3=tom v3=mary 把這兩個放在上下文
*/
Iterator<Text> it=values.iterator();
// Iterator<Text>--輸進來的第二個值
while(it.hasNext()){
String lines= it.next().toString();
//2,tom,lucy(對應MyGLMapper的context.write())
String [] words=lines.split(",");
//劈開 string 數(shù)組 ["2","tom","lucy"]
if(words[0].equals("1")){
grandpa.add(words[2]);
}else if(words[0].equals("2")){
grandch.add(words[1]);
}
else
return;
}
for(String ch:grandch)
for(String pa:grandpa)
context.write(new Text(ch), new Text(pa));
System.out.println("reduce......");
}
protected void cleanup(Context context)
throws java.io.IOException, java.lang.InterruptedException{
}
}
private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
private static String OUTPUT_PATH="hdfs://master:9000/output/c/";
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
FileSystem fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
if(fs.exists(new Path(OUTPUT_PATH)))
fs.delete(new Path(OUTPUT_PATH));
Job job=new Job(conf,"myjob");
job.setJarByClass(MyGL.class);
job.setMapperClass(MyGLMapper.class);
job.setReducerClass(MyGLReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//job.setCombinerClass(MyReduce.class);
FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
job.waitForCompletion(true);
}
}

image.png
當line[0]=1,line[1]=child;
當line[0]=2,line[2]=grandpa;
測試數(shù)據(jù):
child parent
Tom Lucy
Tom Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Ben
Jack Alice
Jack Jesse
Terry Alice
Terry Jesse
Philip Terry
Philip Alma
Mark Terry
Mark Alma