倒排索引用于统计并记录某个单词在一个文件中出现的次数及位置。我们可以实现一个简单的算法,统计单词在一个文件中出现的次数。假如存在以下两个文件:
a.txt文件中的内容为:
Hello Jack Hello Jack
b.txt文件中的内容为:
Hello Mary
则统计完成以后的结果为:
单词 文件 出现次数 文件 出现次数 总出现次数 Hello a.txt, 2 b.txt, 1 3 Jack a.txt, 2 2 Mary b.txt, 1 1
处理的思路可以是先根据Word+文件名做一次统计,结果为:
单词 文件 出现次数 Hello a.txt 2 Hello b.txt 1 Jack a.txt 2 Mary b.txt 1
然后再对上面的结果进行处理,以Word为key以文件名,次数为Value进行再处理,并最终输出要求的结果。
第一个MapReduce程序,用于将两个文件中的数据先按单词+\t+文件+\t+出现次数进行统计。
【代码4-9】InverseMR1.java
1. package org.hadoop.invise; 2. public class InverseMR1 extends Configured implements Tool { 3. @Override 4. public int run(String[] args) throws Exception { 5. Configuration conf = getConf(); 6. FileSystem fs = FileSystem.get(conf); 7. Path dest = new Path("D:/a/out002"); 8. if (fs.exists(dest)) { 9. fs.delete(dest, true); 10. } 11. Job job = Job.getInstance(conf, "InverseIndex"); 12. job.setJarByClass(getClass()); 13. job.setMapperClass(IIMapper.class); 14. job.setMapOutputKeyClass(Text.class); 15. job.setMapOutputValueClass(LongWritable.class); 16. job.setReducerClass(IIReducer.class); 17. job.setOutputKeyClass(Text.class); 18. job.setOutputValueClass(NullWritable.class); 19. job.setInputFormatClass(TextInputFormat.class); 20. job.setOutputFormatClass(TextOutputFormat.class); 21. FileInputFormat.setInputPaths(job, new Path("D:/a/in")); 22. FileOutputFormat.setOutputPath(job, dest); 23. int code = job.waitForCompletion(true) ? 0 : 1; 24. return code; 25. } 26. public static class IIMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 27. private String fileName = ""; 28. private Text key = new Text(); 29. private LongWritable value = new LongWritable(0L); 30. @Override 31. public void map(LongWritable key, Text value, Context context) thr ows IOException, InterruptedException { 32. String[] strs = value.toString().split("\\s+"); 33. for (String str : strs) { 34. this.key.set(str + "\t" + fileName); 35. this.value.set(1L); 36. context.write(this.key, this.value); 37. } 38. } 39. @Override 40. protected void setup(Context context) throws IOException, Interrup tedException { 41. InputSplit split = context.getInputSplit(); 42. if (split instanceof FileSplit) { 43. FileSplit fileSplit = (FileSplit) split; 44. fileName = fileSplit.getPath().getName(); 45. } 46. } 47. } 48. public static class IIReducer extends Reducer<Text, LongWritable, Tex t, NullWritable> { 49. @Override 50. public void reduce(Text key, Iterable<LongWritable> values, Contex t context) throws IOException, InterruptedException { 51. long sum = 0L; 52. for (LongWritable l : values) { 53. sum += l.get(); 54. } 55. key.set(key.toString() + "\t" + sum); 56. context.write(key, NullWritable.get()); 57. } 58. } 59. public static void main(String[] args) throws Exception { 60. int code = ToolRunner.run(new InverseMR1(), args); 61. System.exit(code); 62. } 63. }
统计后的结果如下:
Hello a.txt 2 Hello b.txt 1 Jack a.txt 2 Mary b.txt 1
第二个MapReduce程序,用于将上面的结果再根据单词进行统计,如【代码4-10】所示。
【代码4-10】InverseMR2.java
1. package org.hadoop.inverse; 2. public class InverseMR2 extends Configured implements Tool { 3. @Override 4. public int run(String[] args) throws Exception { 5. Configuration conf = getConf(); 6. FileSystem fs = FileSystem.get(conf); 7. Path dest = new Path("D:/a/out003"); 8. if (fs.exists(dest)) { 9. fs.delete(dest, true); 10. } 11. Job job = Job.getInstance(conf, "InverseIndex2"); 12. job.setJarByClass(getClass()); 13. job.setMapperClass(IIMapper2.class); 14. job.setMapOutputKeyClass(Text.class); 15. job.setMapOutputValueClass(Text.class); 16. job.setReducerClass(IIReducer2.class); 17. job.setOutputKeyClass(Text.class); 18. job.setOutputValueClass(LongWritable.class); 19. job.setInputFormatClass(TextInputFormat.class); 20. job.setOutputFormatClass(TextOutputFormat.class); 21. FileInputFormat.setInputPaths(job, new Path("D:/a/out002")); 22. FileOutputFormat.setOutputPath(job, dest); 23. int code = job.waitForCompletion(true) ? 0 : 1; 24. return code; 25. } 26. public static class IIMapper2 extends Mapper<LongWritable, Text, Text, Text> { 27. private Text key = new Text(); 28. private Text value = new Text(); 29. @Override 30. public void map(LongWritable key, Text value, Context context) thr ows IOException, InterruptedException { 31. String[] strs = value.toString().split("\\s+"); 32. this.key.set(strs[0]);//Hello 33. this.value.set(strs[1] + "\t" + strs[2]);//a.txt,1 34. context.write(this.key, this.value); 35. } 36. } 37. public static class IIReducer2 extends Reducer<Text, Text, Text, LongW ritable> { 38. private LongWritable sum = new LongWritable(0L); 39. @Override 40. public void reduce(Text key, Iterable<Text> values, Context contex t) throws IOException, InterruptedException { 41. this.sum.set(0L); 42. String str = ""; 43. for (Text t : values) { 44. String[] strs = t.toString().split("\t"); 45. this.sum.set(this.sum.get() + Long.parseLong(strs[1])); 46. str += "\t" + t.toString(); 47. } 48. key.set(key.toString() + "\t" + str); 49. context.write(key, this.sum); 50. } 51. } 52. public static void main(String[] args) throws Exception { 53. int code = ToolRunner.run(new InverseMR2(), args); 54. System.exit(code); 55. } 56. }
执行后的结果如下:
Hello b.txt 1 a.txt 2 3 Jack a.txt 2 2 Mary b.txt 1 1