为了使读者快速掌握MapReduce,本节再次为大家开发和演示WordCount示例程序,并以本地运行和服务器运行的方式分别部署,使读者更深入了解MapReduce的开发、运行和部署。
前面已经介绍MapReduce程序可以运行在本地,也可以打包后运行在Hadoop集群上。之前已经开发过运行在本地的MapReduce程序,这里我们将使用打包的方式将程序打包后放到Hadoop集群上运行。
步骤01 创建Java项目并添加依赖。
创建Java项目,并添加以下依赖。注意,本次以添加的依赖为hadoop-mincluster,且设置scope的值为provided(意思是,在打包时将不会被打包到依赖的jar包中)。
1. <dependency> 2. <groupId>org.apache.hadoop</groupId> 3. <artifactId>hadoop-minicluster</artifactId> 4. <scope>provided</scope> 5. </dependency>
步骤02 开发WordCount的完整代码。
【代码4-1】WordCount.java
1. package org.hadoop; 2. public class WordCount extends Configured implements Tool { 3. public static void main(String[] args) throws Exception { 4. int result = ToolRunner.run(new WordCount(), args); 5. System.exit(result); 6. } 7. private static String server = "hdfs://server201:8020"; 8. public int run(String[] args) throws Exception { 9. if (args.length != 2) { 10. System.err.println("usage: " + this.getClass().getSimpleName() + " <inPath> <outPath>"); 11. ToolRunner.printGenericCommandUsage(System.out); 12. return -1; 13. } 14. Configuration config = getConf(); 15. config.set("fs.defaultFS", server); 16. //指定resourcemanger的地址 17. config.set("yarn.resourcemanager.hostname", "server201"); 18. config.set("dfs.replication", "1"); 19. config.set("dfs.permissions.enabled", "false"); 20. FileSystem fs = FileSystem.get(config); 21. Path dest = new Path(server + args[1]); 22. if (fs.exists(dest)) { 23. fs.delete(dest, true); 24. } 25. Job job = Job.getInstance(config,"WordCount"); 26. job.setJarByClass(getClass()); 27. job.setMapperClass(WordCountMapper.class); 28. job.setReducerClass(WordCountReducer.class); 29. job.setOutputKeyClass(Text.class); 30. job.setOutputValueClass(LongWritable.class); 31. FileInputFormat.addInputPath(job, new Path(server + args[0])); 32. FileOutputFormat.setOutputPath(job, dest); 33. boolean boo = job.waitForCompletion(true); 34. return boo ? 0 : 1; 35. } 36. public static class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> { 37. private LongWritable count = new LongWritable(1); 38. private Text text = new Text(); 39. @Override 40. public void map(LongWritable key, Text value, Context context) thr ows IOException, InterruptedException { 41. String str = value.toString(); 42. String[] strs = str.split("\\s+"); 43. for (String s : strs) { 44. text.set(s); 45. context.write(text, count); 46. } 47. } 48. } 49. public static class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> { 50. @Override 51. public void reduce(Text key, Iterable<LongWritable> values,Context context) throws IOException, InterruptedException { 52. long sum = 0; 53. for (LongWritable w : values) { 54. sum += w.get(); 55. } 56. context.write(key, new LongWritable(sum)); 57. } 58. } 59. }
上例代码中,由于我们声明了完整的地址,所以可以在本地运行测试。在本地运行测试需要输入两个参数。选择IDEA的run > Edit Configurations,并在Program Arguments位置输入读取文件的地址和输出结果的目录,如图4-3所示。
图4-3
在本地环境下直接运行,并查看HDFS上的结果目录,WordCount程序已经将结果输出到指定的目录中。
[hadoop@server201 ~]$ hdfs dfs -ls /out001 Found 2 items -rw-r--r-- 1 mrchi supergroup 0 2021-03-13 22:14 /out001/_SUCCESS -rw-r--r-- 1 mrchi supergroup 520 2021-03-13 22:14 /out001/part-r-00000
步骤03 使用Maven打包程序。
在IDEA右侧栏的Maven视图中,单击package并运行,可以得到一个jar包,如图4-4所示。
图4-4
打完的包可以在target目录下找到,将jar包上传到server201服务器的/root目录下,并使用yarn jar执行。
使用yarn jar执行,使用以下命令:
$ yarn jar chapter04-1.0.jar org.hadoop.WordCount /test/a.txt /out002
查看执行结果,即为单词统计的结果。根据处理的文件不同,这个结果文件的内容会有所不同。
[root@server201 ~]# hdfs dfs -cat /out002/* | head -> 4 0 2 1 4
至此,你已经学会如何在本地及打包到服务器上运行MapReduce程序了。接下来,我们将详解MapReduce的更多细节。
注意: 在本地运行时,有可能会出现以下错误:
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.Nati veIO$Windows.access0(Ljava/lang/String;I)Z
解决方案是:将hadoop.dll文件放到windows/system32目录下即可。
可以将Mapper和Reducer开发成内部类,但这两个内部类必须使用public static修饰符。
上例的程序,在IDEA中执行package打包,将得到一个没有任何依赖,只有WordCount代码的jar文件。之后,就可以发布到Linux上并在Hadoop集群中执行。因为在Linux上已经存在了Hadoop的所有依赖包,所以不需要再将Hadoop的所有依赖都打包到jar文件中去。