MapReduce 案例之倒排索引

1. 倒排索引

倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛地應用于全文搜索引擎。 它主要是用來存儲某個單詞(或詞組) 在一個文檔或一組文檔中的存儲位置的映射,即提供了一種根據(jù)內(nèi)容來查找文檔的方式。由于不是根據(jù)文檔來確定文檔所包含的內(nèi)容,而是進行相反的操作,因而稱為倒排索引( Inverted Index)。

2. 實例描述

通常情況下,倒排索引由一個單詞(或詞組)以及相關的文檔列表組成,文檔列表中的文檔或者是標識文檔的 ID 號,或者是指文檔所在位置的 URL。如下圖所示:

image.png

從上圖可以看出,單詞 1 出現(xiàn)在{文檔 1,文檔 5,文檔 13, ……}中,單詞 2 出現(xiàn)在{文檔 2,文檔 3,文檔 5, ……}中,而單詞 3 出現(xiàn)在{文檔 2,文檔 10,文檔 16, ……}中。在實際應用中,還需要給每個文檔添加一個權(quán)值,用來指出每個文檔與搜索內(nèi)容的相關度,如下圖所示:

image.png

最常用的是使用詞頻作為權(quán)重,即記錄單詞在文檔中出現(xiàn)的次數(shù)。以英文為例,如下圖所示,索引文件中的“ MapReduce”一行表示:“ MapReduce”這個單詞在文本 T0 中 出現(xiàn)過 1 次,T1 中出現(xiàn)過 1 次,T2 中出現(xiàn)過 2 次。


image.png

3. 設計思路

3.1 Map過程

首先使用默認的 TextInputFormat 類對輸入文件進行處理,得到文本中每行的偏移量及其內(nèi)容。顯然, Map 過程首先必須分析輸入的key/value對,得到倒排索引中需要的三個信息:單詞、文檔 URL 和詞頻,如下圖所示。

image.png

這里存在兩個問題:第一, key/value對只能有兩個值,需要根據(jù)情況將其中兩個值合并成一個值,作為 key 或 value 值;
第二,通過一個 Reduce 過程無法同時完成詞頻統(tǒng)計和生成文檔列表,所以必須增加一個 Combine 過程完成詞頻統(tǒng)計。
這里將單詞和 URL 組成 key 值(如“ MapReduce: file1.txt”),將詞頻作為value,這樣做的好處是可以利用 MapReduce 框架自帶的Map 端排序,將同一文檔的相同單詞的詞頻組成列表,傳遞給 Combine 過程,實現(xiàn)類似于 WordCount 的功能。

3.2 Combine 過程

經(jīng)過 map 方法處理后, Combine 過程將 key 值相同 value 值累加,得到一個單詞在文檔中的詞頻。 如果直接將圖所示的輸出作為 Reduce 過程的輸入,在 Shuffle 過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、 URL 和詞頻組成)應該交由同一個Reducer 處理,但當前的 key 值無法保證這一點,所以必須修改 key 值和 value 值。這次將單詞作為 key 值, URL 和詞頻組成 value 值(如“ file1.txt: 1”)。這樣做的好處是可以利用 MapReduce 框架默認的 HashPartitioner 類完成 Shuffle 過程,將相同單詞的所有記錄發(fā)送給同一個 Reducer 進行處理。

image.png

3.3 Reduce 過程

經(jīng)過上述兩個過程后, Reduce 過程只需將相同 key 值的 value 值組合成倒排索引文件所需的格式即可,剩下的事情就可以直接交給 MapReduce 框架進行處理了。

image.png

3.4 程序代碼

  • pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.itcast</groupId>
  <artifactId>invertedIndex</artifactId>
  <version>1.0-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>invertedIndex</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.6.4</version>
    </dependency>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-mapreduce-client-core</artifactId>
      <version>2.6.4</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>2.4</version>
        <configuration>
          <archive>
            <manifest>
              <addClasspath>true</addClasspath>
              <classpathPrefix>lib/</classpathPrefix>
              <mainClass>cn.itcast.hadoop.mrwc.WordCountDriver</mainClass>
            </manifest>
          </archive>
        </configuration>
      </plugin>
    </plugins>
  </build>

</project>
  • Map程序

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>{

    private static Text keyInfo = new Text();// 存儲單詞和 URL 組合  
    private static final Text valueInfo = new Text("1");// 存儲詞頻,初始化為1  

    @Override  
    protected void map(LongWritable key, Text value, Context context)  
            throws IOException, InterruptedException {  

        String line = value.toString();  
        String[] fields = line.split(" ");// 得到字段數(shù)組  

        FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到這行數(shù)據(jù)所在的文件切片  
        String fileName = fileSplit.getPath().getName();// 根據(jù)文件切片得到文件名  

        for (String field : fields) {  
            // key值由單詞和URL組成,如“MapReduce:file1”  
            keyInfo.set(field + ":" + fileName);  
            context.write(keyInfo, valueInfo);  
        }  
    }  

}

  • combine程序

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{

    private static Text info = new Text();  

    // 輸入: <MapReduce:file3 {1,1,...}>  
    // 輸出:<MapReduce file3:2>  
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException {  
        int sum = 0;// 統(tǒng)計詞頻  
        for (Text value : values) {  
            sum += Integer.parseInt(value.toString());  
        }  

        int splitIndex = key.toString().indexOf(":");  
        // 重新設置 value 值由 URL 和詞頻組成  
        info.set(key.toString().substring(splitIndex + 1) + ":" + sum);  
        // 重新設置 key 值為單詞  
        key.set(key.toString().substring(0, splitIndex));  

        context.write(key, info);  
    }  

}

  • reduce程序

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
    private static Text result = new Text();  

    // 輸入:<MapReduce file3:2>  
    // 輸出:<MapReduce file1:1;file2:1;file3:2;>  
    @Override  
    protected void reduce(Text key, Iterable<Text> values, Context context)  
            throws IOException, InterruptedException {  
        // 生成文檔列表  
        String fileList = new String();  
        for (Text value : values) {  
            fileList += value.toString() + ";";  
        }  

        result.set(fileList);  
        context.write(key, result);  
    }  

}

  • 主程序

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class InvertedIndexRunner {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(InvertedIndexRunner.class);

        job.setMapperClass(InvertedIndexMapper.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(InvertedIndexReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\ziliao\\data\\InvertedIndex\\input"));
        // 指定處理完成之后的結(jié)果所保存的位置
        FileOutputFormat.setOutputPath(job, new Path("D:\\ziliao\\data\\InvertedIndex\\output"));

        // 向 yarn 集群提交這個 job
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);
    }

}

按權(quán)重排序



/**
 * Created by Administrator on 2018/8/15.
 */
public class FileCount implements Comparable<FileCount> {

    private String filename;
    private long count;

    //按照總流量倒序排
    public int compareTo(FileCount bean) {
        return bean.count>this.count?1:-1;
    }

    public FileCount(String filename, long count) {
        this.filename = filename;
        this.count = count;
    }

    @Override
    public String toString() {
        return filename + ":" + count;
    }

}

新reduce程序

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
    private static Text result = new Text();

    // 輸入:<MapReduce file3:2>
    // 輸出:<MapReduce file1:1;file2:1;file3:2;>
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        // 生成文檔列表
        String fileList = new String();
        List<FileCount> FileCountList = new ArrayList<FileCount>();

        for (Text value : values) {
            String[] arr = value.toString().split(":");
            FileCount FileCount = new FileCount(arr[0],Long.parseLong(arr[1]));
            FileCountList.add(FileCount);
        }

        Collections.sort(FileCountList);

        for(FileCount FileCount : FileCountList)
        {
            fileList += FileCount.toString() + ";";
        }
        result.set(fileList);
        context.write(key, result);
    }

}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 1. 倒排索引 倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛地應用于全文搜索引擎。 它主要是用來存儲某個單詞(...
    __元昊__閱讀 834評論 0 0
  • 1. 倒排索引 倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛地應用于全文搜索引擎。 它主要是用來存儲某個單詞(...
    weare_b646閱讀 441評論 0 0
  • 1. 倒排索引 倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛地應用于全文搜索引擎。 它主要是用來存儲某個單詞(...
    piziyang12138閱讀 440評論 0 0
  • 1. 倒排索引 倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛地應用于全文搜索引擎。 它主要是用來存儲某個單詞(...
    數(shù)據(jù)萌新閱讀 2,123評論 0 0
  • 1.數(shù)據(jù)準備 2.上傳HDFS 3.執(zhí)行Mapreduce分布式并行計算 3.1業(yè)務邏輯處理。 業(yè)務理解:通俗理解...
    起個什么呢稱呢閱讀 1,482評論 0 1

友情鏈接更多精彩內(nèi)容