



倒排索引用于统计并记录某个单词在一个文件中出现的次数及位置。我们可以实现一个简单的算法,统计单词在一个文件中出现的次数。假如存在以下两个文件:
a.txt文件中的内容为:
Hello Jack
Hello Jack
b.txt文件中的内容为:
Hello Mary
则统计完成以后的结果为:
单词 文件 出现次数 文件 出现次数 总出现次数
Hello a.txt, 2 b.txt, 1 3
Jack a.txt, 2 2
Mary b.txt, 1 1
处理的思路可以是先根据Word+文件名做一次统计,结果为:
单词 文件 出现次数
Hello a.txt 2
Hello b.txt 1
Jack a.txt 2
Mary b.txt 1
然后再对上面的结果进行处理,以Word为key以文件名,次数为Value进行再处理,并最终输出要求的结果。
第一个MapReduce程序,用于将两个文件中的数据先按单词+\t+文件+\t+出现次数进行统计。
【代码4-9】InverseMR1.java
1. package org.hadoop.invise;
2. public class InverseMR1 extends Configured implements Tool {
3. @Override
4. public int run(String[] args) throws Exception {
5. Configuration conf = getConf();
6. FileSystem fs = FileSystem.get(conf);
7. Path dest = new Path("D:/a/out002");
8. if (fs.exists(dest)) {
9. fs.delete(dest, true);
10. }
11. Job job = Job.getInstance(conf, "InverseIndex");
12. job.setJarByClass(getClass());
13. job.setMapperClass(IIMapper.class);
14. job.setMapOutputKeyClass(Text.class);
15. job.setMapOutputValueClass(LongWritable.class);
16. job.setReducerClass(IIReducer.class);
17. job.setOutputKeyClass(Text.class);
18. job.setOutputValueClass(NullWritable.class);
19. job.setInputFormatClass(TextInputFormat.class);
20. job.setOutputFormatClass(TextOutputFormat.class);
21. FileInputFormat.setInputPaths(job, new Path("D:/a/in"));
22. FileOutputFormat.setOutputPath(job, dest);
23. int code = job.waitForCompletion(true) ? 0 : 1;
24. return code;
25. }
26. public static class IIMapper extends Mapper<LongWritable, Text, Text,
LongWritable> {
27. private String fileName = "";
28. private Text key = new Text();
29. private LongWritable value = new LongWritable(0L);
30. @Override
31. public void map(LongWritable key, Text value, Context context) thr
ows IOException, InterruptedException {
32. String[] strs = value.toString().split("\\s+");
33. for (String str : strs) {
34. this.key.set(str + "\t" + fileName);
35. this.value.set(1L);
36. context.write(this.key, this.value);
37. }
38. }
39. @Override
40. protected void setup(Context context) throws IOException, Interrup
tedException {
41. InputSplit split = context.getInputSplit();
42. if (split instanceof FileSplit) {
43. FileSplit fileSplit = (FileSplit) split;
44. fileName = fileSplit.getPath().getName();
45. }
46. }
47. }
48. public static class IIReducer extends Reducer<Text, LongWritable, Tex
t, NullWritable> {
49. @Override
50. public void reduce(Text key, Iterable<LongWritable> values, Contex
t context) throws IOException, InterruptedException {
51. long sum = 0L;
52. for (LongWritable l : values) {
53. sum += l.get();
54. }
55. key.set(key.toString() + "\t" + sum);
56. context.write(key, NullWritable.get());
57. }
58. }
59. public static void main(String[] args) throws Exception {
60. int code = ToolRunner.run(new InverseMR1(), args);
61. System.exit(code);
62. }
63. }
统计后的结果如下:
Hello a.txt 2
Hello b.txt 1
Jack a.txt 2
Mary b.txt 1
第二个MapReduce程序,用于将上面的结果再根据单词进行统计,如【代码4-10】所示。
【代码4-10】InverseMR2.java
1. package org.hadoop.inverse;
2. public class InverseMR2 extends Configured implements Tool {
3. @Override
4. public int run(String[] args) throws Exception {
5. Configuration conf = getConf();
6. FileSystem fs = FileSystem.get(conf);
7. Path dest = new Path("D:/a/out003");
8. if (fs.exists(dest)) {
9. fs.delete(dest, true);
10. }
11. Job job = Job.getInstance(conf, "InverseIndex2");
12. job.setJarByClass(getClass());
13. job.setMapperClass(IIMapper2.class);
14. job.setMapOutputKeyClass(Text.class);
15. job.setMapOutputValueClass(Text.class);
16. job.setReducerClass(IIReducer2.class);
17. job.setOutputKeyClass(Text.class);
18. job.setOutputValueClass(LongWritable.class);
19. job.setInputFormatClass(TextInputFormat.class);
20. job.setOutputFormatClass(TextOutputFormat.class);
21. FileInputFormat.setInputPaths(job, new Path("D:/a/out002"));
22. FileOutputFormat.setOutputPath(job, dest);
23. int code = job.waitForCompletion(true) ? 0 : 1;
24. return code;
25. }
26. public static class IIMapper2 extends Mapper<LongWritable, Text, Text,
Text> {
27. private Text key = new Text();
28. private Text value = new Text();
29. @Override
30. public void map(LongWritable key, Text value, Context context) thr
ows IOException, InterruptedException {
31. String[] strs = value.toString().split("\\s+");
32. this.key.set(strs[0]);//Hello
33. this.value.set(strs[1] + "\t" + strs[2]);//a.txt,1
34. context.write(this.key, this.value);
35. }
36. }
37. public static class IIReducer2 extends Reducer<Text, Text, Text, LongW
ritable> {
38. private LongWritable sum = new LongWritable(0L);
39. @Override
40. public void reduce(Text key, Iterable<Text> values, Context contex
t) throws IOException, InterruptedException {
41. this.sum.set(0L);
42. String str = "";
43. for (Text t : values) {
44. String[] strs = t.toString().split("\t");
45. this.sum.set(this.sum.get() + Long.parseLong(strs[1]));
46. str += "\t" + t.toString();
47. }
48. key.set(key.toString() + "\t" + str);
49. context.write(key, this.sum);
50. }
51. }
52. public static void main(String[] args) throws Exception {
53. int code = ToolRunner.run(new InverseMR2(), args);
54. System.exit(code);
55. }
56. }
执行后的结果如下:
Hello b.txt 1 a.txt 2 3
Jack a.txt 2 2
Mary b.txt 1 1