



MapReduce为分布式计算模型,分布式计算最早由Google提出。MapReduce将运算的过程分为两个阶段,map和reduce阶段。用户只需要实现map和reduce两个函数即可。此处先为大家演示一个运行在本地的MapReduce程序,后续章节将会重点讲解MapReduce的开发。
前面曾经讲过,MapReduce可以直接在本地模式下运行。在项目中添加hadoop-mini-cluster依赖,即可以直接在本地IDE环境中运行MapReduce程序,而不需要依赖于Hadoop集群环境,这个特点在测试开发中非常有用。为了帮助读者开发,我们在代码中加入了开发的步骤,具体代码将会在第4章中详细讲解。
在本地运行Hadoop,需要将hadoop.dll文件放到Windows/system32目录下,此文件可以在winutils目录下找到。
【代码2-6】Demo02MapReduce.java
1. package org.hadoop;
2. /**
3. * MapReduce示例程序
4. */
5. //1:开发一个类,继承Configured实现接口Tool
6. public class Demo05MapReduce extends Configured implements Tool {
7. //3:添加main函数
8. public static void main(String[] args) throws Exception {
9. //7:开始任务
10. System.setProperty("HADOOP_HOME", "D:/program/hadoop-3.2.2");
11. System.setProperty("hadoop.home.dir", "D:/program/hadoop-3.2.2");
12. int res = ToolRunner.run(new Demo05MapReduce(),args);
13. System.exit(res);
14. }
15. //2:实现run函数
16. @Override
17. public int run(String[] strings) throws Exception {
18. //6:开发run函数内部代码
19. Configuration conf =getConf();
20. Job job = Job.getInstance(conf,"WordCount");
21. job.setJarByClass(getClass());
22. FileSystem fs = FileSystem.get(conf);
23. //声明输出目录
24. Path dest = new Path("D:/a/out");
25. //如果输出目录已存在则删除
26. if(fs.exists(dest)){
27. fs.delete(dest,true);
28. }
29. //设置Mapper及Mapper输出的类型
30. job.setMapperClass(MyMapper.class);
31. job.setMapOutputKeyClass(Text.class);
32. job.setMapOutputValueClass(LongWritable.class);
33. //设置Reduce及Reduce的输出类型
34. job.setReducerClass(MyReduce.class);
35. job.setOutputKeyClass(Text.class);
36. job.setOutputValueClass(LongWritable.class);
37. //设置输入和输出类型
38. job.setInputFormatClass(TextInputFormat.class);
39. job.setOutputFormatClass(TextOutputFormat.class);
40. //设置输入/输出目录
41. FileInputFormat.addInputPath(job,new Path("D:/a/a.txt"));
42. FileOutputFormat.setOutputPath(job,dest);
43. //开始执行任务
44. boolean boo = job.waitForCompletion(true);
45. return boo?0:1;
46. }
47. //4:开发Mapper类的实现类
48. public static class MyMapper extends
Mapper<LongWritable, Text,Text,LongWritable>{
49. @Override
50. protected void map(LongWritable key, Text value, Context context) thro
ws IOException, InterruptedException {
51. Text outKey = new Text();
52. LongWritable outValue = new LongWritable(1L);
53. if(value.getLength()>0){
54. String[] strs = value.toString().split("\\s+");
55. for(String str:strs){
56. outKey.set(str);
57. context.write(outKey,outValue);
58. }
59. }
60. }
61. }
62. //5:开发Reduce程序
63. public static class MyReduce extends Reducer<Text,LongWritable,Text,Long
Writable>{
64. @Override
65. protected void reduce(Text key, Iterable<LongWritable> values, Contex
t context) throws IOException, InterruptedException {
66. long sum = 0;
67. LongWritable resultValue = new LongWritable(0);
68. for(LongWritable v:values){
69. sum+=v.get();
70. }
71. resultValue.set(sum);
72. context.write(key,resultValue);
73. }
74. }
75. }
运行后,查看D:\a\out目录下输出的文件:
._SUCCESS.crc
.part-r-00000.crc
_SUCCESS
part-r-00000
打开part-r-0000即为字符统计的结果,根据源文件不同,统计的结果也会不同,以下仅为参考。默认会以key排序,所以输出的数据是以字母作为顺序排列输出的。
Configuration 1
Configured 1
Context 2
Demo05MapReduce 1
Demo05MapReduce(),args); 1
Exception 2