1. 倒排索引
倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛地應用于全文搜索引擎。 它主要是用來存儲某個單詞(或詞組) 在一個文檔或一組文檔中的存儲位置的映射,即提供了一種根據(jù)內(nèi)容來查找文檔的方式。由于不是根據(jù)文檔來確定文檔所包含的內(nèi)容,而是進行相反的操作,因而稱為倒排索引( Inverted Index)。
2. 實例描述
通常情況下,倒排索引由一個單詞(或詞組)以及相關的文檔列表組成,文檔列表中的文檔或者是標識文檔的 ID 號,或者是指文檔所在位置的 URL。如下圖所示:

從上圖可以看出,單詞 1 出現(xiàn)在{文檔 1,文檔 5,文檔 13, ……}中,單詞 2 出現(xiàn)在{文檔 2,文檔 3,文檔 5, ……}中,而單詞 3 出現(xiàn)在{文檔 2,文檔 10,文檔 16, ……}中。在實際應用中,還需要給每個文檔添加一個權(quán)值,用來指出每個文檔與搜索內(nèi)容的相關度,如下圖所示:

最常用的是使用詞頻作為權(quán)重,即記錄單詞在文檔中出現(xiàn)的次數(shù)。以英文為例,如下圖所示,索引文件中的“ MapReduce”一行表示:“ MapReduce”這個單詞在文本 T0 中 出現(xiàn)過 1 次,T1 中出現(xiàn)過 1 次,T2 中出現(xiàn)過 2 次。

3. 設計思路
3.1 Map過程
首先使用默認的 TextInputFormat 類對輸入文件進行處理,得到文本中每行的偏移量及其內(nèi)容。顯然, Map 過程首先必須分析輸入的key/value對,得到倒排索引中需要的三個信息:單詞、文檔 URL 和詞頻,如下圖所示。

這里存在兩個問題:第一, key/value對只能有兩個值,需要根據(jù)情況將其中兩個值合并成一個值,作為 key 或 value 值;
第二,通過一個 Reduce 過程無法同時完成詞頻統(tǒng)計和生成文檔列表,所以必須增加一個 Combine 過程完成詞頻統(tǒng)計。
這里將單詞和 URL 組成 key 值(如“ MapReduce: file1.txt”),將詞頻作為value,這樣做的好處是可以利用 MapReduce 框架自帶的Map 端排序,將同一文檔的相同單詞的詞頻組成列表,傳遞給 Combine 過程,實現(xiàn)類似于 WordCount 的功能。
3.2 Combine 過程
經(jīng)過 map 方法處理后, Combine 過程將 key 值相同 value 值累加,得到一個單詞在文檔中的詞頻。 如果直接將圖所示的輸出作為 Reduce 過程的輸入,在 Shuffle 過程時將面臨一個問題:所有具有相同單詞的記錄(由單詞、 URL 和詞頻組成)應該交由同一個Reducer 處理,但當前的 key 值無法保證這一點,所以必須修改 key 值和 value 值。這次將單詞作為 key 值, URL 和詞頻組成 value 值(如“ file1.txt: 1”)。這樣做的好處是可以利用 MapReduce 框架默認的 HashPartitioner 類完成 Shuffle 過程,將相同單詞的所有記錄發(fā)送給同一個 Reducer 進行處理。

3.3 Reduce 過程
經(jīng)過上述兩個過程后, Reduce 過程只需將相同 key 值的 value 值組合成倒排索引文件所需的格式即可,剩下的事情就可以直接交給 MapReduce 框架進行處理了。

3.4 程序代碼
- pom文件
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itcast</groupId>
<artifactId>invertedIndex</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>invertedIndex</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>cn.itcast.hadoop.mrwc.WordCountDriver</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>
- Map程序
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class InvertedIndexMapper extends Mapper<LongWritable, Text, Text, Text>{
private static Text keyInfo = new Text();// 存儲單詞和 URL 組合
private static final Text valueInfo = new Text("1");// 存儲詞頻,初始化為1
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(" ");// 得到字段數(shù)組
FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到這行數(shù)據(jù)所在的文件切片
String fileName = fileSplit.getPath().getName();// 根據(jù)文件切片得到文件名
for (String field : fields) {
// key值由單詞和URL組成,如“MapReduce:file1”
keyInfo.set(field + ":" + fileName);
context.write(keyInfo, valueInfo);
}
}
}
- combine程序
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class InvertedIndexCombiner extends Reducer<Text, Text, Text, Text>{
private static Text info = new Text();
// 輸入: <MapReduce:file3 {1,1,...}>
// 輸出:<MapReduce file3:2>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
int sum = 0;// 統(tǒng)計詞頻
for (Text value : values) {
sum += Integer.parseInt(value.toString());
}
int splitIndex = key.toString().indexOf(":");
// 重新設置 value 值由 URL 和詞頻組成
info.set(key.toString().substring(splitIndex + 1) + ":" + sum);
// 重新設置 key 值為單詞
key.set(key.toString().substring(0, splitIndex));
context.write(key, info);
}
}
- reduce程序
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
private static Text result = new Text();
// 輸入:<MapReduce file3:2>
// 輸出:<MapReduce file1:1;file2:1;file3:2;>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 生成文檔列表
String fileList = new String();
for (Text value : values) {
fileList += value.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}
- 主程序
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class InvertedIndexRunner {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(InvertedIndexRunner.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setCombinerClass(InvertedIndexCombiner.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("D:\\ziliao\\data\\InvertedIndex\\input"));
// 指定處理完成之后的結(jié)果所保存的位置
FileOutputFormat.setOutputPath(job, new Path("D:\\ziliao\\data\\InvertedIndex\\output"));
// 向 yarn 集群提交這個 job
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
按權(quán)重排序
/**
* Created by Administrator on 2018/8/15.
*/
public class FileCount implements Comparable<FileCount> {
private String filename;
private long count;
//按照總流量倒序排
public int compareTo(FileCount bean) {
return bean.count>this.count?1:-1;
}
public FileCount(String filename, long count) {
this.filename = filename;
this.count = count;
}
@Override
public String toString() {
return filename + ":" + count;
}
}
新reduce程序
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class InvertedIndexReducer extends Reducer<Text, Text, Text, Text>{
private static Text result = new Text();
// 輸入:<MapReduce file3:2>
// 輸出:<MapReduce file1:1;file2:1;file3:2;>
@Override
protected void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
// 生成文檔列表
String fileList = new String();
List<FileCount> FileCountList = new ArrayList<FileCount>();
for (Text value : values) {
String[] arr = value.toString().split(":");
FileCount FileCount = new FileCount(arr[0],Long.parseLong(arr[1]));
FileCountList.add(FileCount);
}
Collections.sort(FileCountList);
for(FileCount FileCount : FileCountList)
{
fileList += FileCount.toString() + ";";
}
result.set(fileList);
context.write(key, result);
}
}