(一)單表關聯(lián)--mapreduce關聯(lián)性操作

package mr;

import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;   

public class MyGL {
    private static class MyGLMapper  extends  Mapper<LongWritable, Text, Text, Text>{  
        /*輸入類型是LongWritable,Text(上下文);
                               輸出類型是Text,Text(也就是Reduce的輸入類型)*/
         public void map(LongWritable k1, Text v1, Context context)   
                             throws java.io.IOException, java.lang.InterruptedException
                                 //map()函數(shù)是固定模式的,三個參數(shù)
         {
             // 1 2
            String[]  lines= v1.toString().split("\t");  
                                              // \t 在同一個緩沖區(qū)內(nèi)橫向跳8個空格(Tab鍵);split()方法用于把一個字符串分割
                                              //成字符串數(shù)組;v1指的是一行,把第一行的兩個單詞存進lines
            if(lines.length!=2 || lines[0].trim().equals("child"))      
                return;    //child parent
        
            String word1=lines[0].trim();   //tom       ->去掉 lines[0]里面的空格符
            String word2=lines[1].trim();   //lucy  
            
            context.write(new Text(word1), new Text("1"+","+word1+","+word2));
                                          //第一個Text對應Mapper的第三個Text;第二個Text對應Mapper的第四個Text
            context.write(new Text(word2), new Text("2"+","+word1+","+word2));
            
            //tom,1+tom+lucy
             System.out.println("map......"+word1+"-"+word2);
         }
        
    }
    
    private static class  MyGLReduce extends Reducer<Text, Text, Text, Text>{
         public void reduce(Text key, Iterable<Text> values, Context context) 
                                       throws java.io.IOException, java.lang.InterruptedException
                                  //context:上下文對象,在整個wordcount運算生命周期內(nèi)存活
         {
              List<String> grandch=new ArrayList();   //泛型
              List<String> grandpa=new ArrayList();
             
                                        /*  lucy 2+tom+lucy
             lucy 1+lucy+mary
             
             2->split[1]    tom     2的話取1
             1->split[2]    mary    1的話取2
             
             k3=tom  v3=mary   把這兩個放在上下文
             */

             Iterator<Text>  it=values.iterator();  
                                                // Iterator<Text>--輸進來的第二個值
             while(it.hasNext()){ 
                String lines= it.next().toString();      
                                                                         //2,tom,lucy(對應MyGLMapper的context.write())
                String [] words=lines.split(",");    
                                                                          //劈開 string 數(shù)組 ["2","tom","lucy"]
                if(words[0].equals("1")){
                    grandpa.add(words[2]);
                }else if(words[0].equals("2")){
                    grandch.add(words[1]);
                }
                else
                    return;
                
             }
              for(String ch:grandch)
             for(String pa:grandpa)  
             context.write(new Text(ch), new Text(pa));    
             
             System.out.println("reduce......");
         }
         protected void cleanup(Context context) 
                                      throws java.io.IOException, java.lang.InterruptedException{
             
         }
            
    }

    private static String INPUT_PATH="hdfs://master:9000/input/gl.dat";
    private static String OUTPUT_PATH="hdfs://master:9000/output/c/";

    public static void main(String[] args) throws Exception {   
        
        Configuration  conf=new Configuration();
        FileSystem  fs=FileSystem.get(new URI(OUTPUT_PATH),conf);
     
        if(fs.exists(new Path(OUTPUT_PATH)))
                fs.delete(new Path(OUTPUT_PATH));
        
        Job  job=new Job(conf,"myjob");
        
        job.setJarByClass(MyGL.class);
        job.setMapperClass(MyGLMapper.class);
        job.setReducerClass(MyGLReduce.class);
        
         
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        //job.setCombinerClass(MyReduce.class);
         
        
        FileInputFormat.addInputPath(job,new Path(INPUT_PATH));
        FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
        
        job.waitForCompletion(true);

    }

}
image.png

當line[0]=1,line[1]=child;
當line[0]=2,line[2]=grandpa;
測試數(shù)據(jù):

child   parent 
Tom Lucy
Tom Jack
Jone    Lucy
Jone    Jack
Lucy    Mary
Lucy    Ben
Jack    Alice
Jack    Jesse
Terry   Alice
Terry   Jesse
Philip  Terry
Philip  Alma
Mark    Terry
Mark    Alma
最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 背景 一年多以前我在知乎上答了有關LeetCode的問題, 分享了一些自己做題目的經(jīng)驗。 張土汪:刷leetcod...
    土汪閱讀 12,912評論 0 33
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,551評論 19 139
  • 1. Java基礎部分 基礎部分的順序:基本語法,類相關的語法,內(nèi)部類的語法,繼承相關的語法,異常的語法,線程的語...
    子非魚_t_閱讀 34,671評論 18 399
  • 當陽英雄閱讀 132評論 0 0
  • 4月的太湖半程馬拉松讓我很享受跑步帶來的愉悅感和成就感,賽后意猶未盡,感覺“半馬不是馬,全馬才是馬”,好像不跑...
    胡林海閱讀 896評論 5 8

友情鏈接更多精彩內(nèi)容