MapReduce为分布式计算模型,分布式计算最早由Google提出。MapReduce将运算的过程分为两个阶段,map和reduce阶段。用户只需要实现map和reduce两个函数即可。此处先为大家演示一个运行在本地的MapReduce程序,后续章节将会重点讲解MapReduce的开发。
前面曾经讲过,MapReduce可以直接在本地模式下运行。在项目中添加hadoop-mini-cluster依赖,即可以直接在本地IDE环境中运行MapReduce程序,而不需要依赖于Hadoop集群环境,这个特点在测试开发中非常有用。为了帮助读者开发,我们在代码中加入了开发的步骤,具体代码将会在第4章中详细讲解。
在本地运行Hadoop,需要将hadoop.dll文件放到Windows/system32目录下,此文件可以在winutils目录下找到。
【代码2-6】Demo02MapReduce.java
1. package org.hadoop; 2. /** 3. * MapReduce示例程序 4. */ 5. //1:开发一个类,继承Configured实现接口Tool 6. public class Demo05MapReduce extends Configured implements Tool { 7. //3:添加main函数 8. public static void main(String[] args) throws Exception { 9. //7:开始任务 10. System.setProperty("HADOOP_HOME", "D:/program/hadoop-3.2.2"); 11. System.setProperty("hadoop.home.dir", "D:/program/hadoop-3.2.2"); 12. int res = ToolRunner.run(new Demo05MapReduce(),args); 13. System.exit(res); 14. } 15. //2:实现run函数 16. @Override 17. public int run(String[] strings) throws Exception { 18. //6:开发run函数内部代码 19. Configuration conf =getConf(); 20. Job job = Job.getInstance(conf,"WordCount"); 21. job.setJarByClass(getClass()); 22. FileSystem fs = FileSystem.get(conf); 23. //声明输出目录 24. Path dest = new Path("D:/a/out"); 25. //如果输出目录已存在则删除 26. if(fs.exists(dest)){ 27. fs.delete(dest,true); 28. } 29. //设置Mapper及Mapper输出的类型 30. job.setMapperClass(MyMapper.class); 31. job.setMapOutputKeyClass(Text.class); 32. job.setMapOutputValueClass(LongWritable.class); 33. //设置Reduce及Reduce的输出类型 34. job.setReducerClass(MyReduce.class); 35. job.setOutputKeyClass(Text.class); 36. job.setOutputValueClass(LongWritable.class); 37. //设置输入和输出类型 38. job.setInputFormatClass(TextInputFormat.class); 39. job.setOutputFormatClass(TextOutputFormat.class); 40. //设置输入/输出目录 41. FileInputFormat.addInputPath(job,new Path("D:/a/a.txt")); 42. FileOutputFormat.setOutputPath(job,dest); 43. //开始执行任务 44. boolean boo = job.waitForCompletion(true); 45. return boo?0:1; 46. } 47. //4:开发Mapper类的实现类 48. public static class MyMapper extends Mapper<LongWritable, Text,Text,LongWritable>{ 49. @Override 50. protected void map(LongWritable key, Text value, Context context) thro ws IOException, InterruptedException { 51. Text outKey = new Text(); 52. LongWritable outValue = new LongWritable(1L); 53. if(value.getLength()>0){ 54. String[] strs = value.toString().split("\\s+"); 55. for(String str:strs){ 56. outKey.set(str); 57. context.write(outKey,outValue); 58. } 59. } 60. } 61. } 62. //5:开发Reduce程序 63. public static class MyReduce extends Reducer<Text,LongWritable,Text,Long Writable>{ 64. @Override 65. protected void reduce(Text key, Iterable<LongWritable> values, Contex t context) throws IOException, InterruptedException { 66. long sum = 0; 67. LongWritable resultValue = new LongWritable(0); 68. for(LongWritable v:values){ 69. sum+=v.get(); 70. } 71. resultValue.set(sum); 72. context.write(key,resultValue); 73. } 74. } 75. }
运行后,查看D:\a\out目录下输出的文件:
._SUCCESS.crc .part-r-00000.crc _SUCCESS part-r-00000
打开part-r-0000即为字符统计的结果,根据源文件不同,统计的结果也会不同,以下仅为参考。默认会以key排序,所以输出的数据是以字母作为顺序排列输出的。
Configuration 1 Configured 1 Context 2 Demo05MapReduce 1 Demo05MapReduce(),args); 1 Exception 2