



为了使读者快速掌握MapReduce,本节再次为大家开发和演示WordCount示例程序,并以本地运行和服务器运行的方式分别部署,使读者更深入了解MapReduce的开发、运行和部署。
前面已经介绍MapReduce程序可以运行在本地,也可以打包后运行在Hadoop集群上。之前已经开发过运行在本地的MapReduce程序,这里我们将使用打包的方式将程序打包后放到Hadoop集群上运行。
步骤01 创建Java项目并添加依赖。
创建Java项目,并添加以下依赖。注意,本次以添加的依赖为hadoop-mincluster,且设置scope的值为provided(意思是,在打包时将不会被打包到依赖的jar包中)。
1. <dependency>
2. <groupId>org.apache.hadoop</groupId>
3. <artifactId>hadoop-minicluster</artifactId>
4. <scope>provided</scope>
5. </dependency>
步骤02 开发WordCount的完整代码。
【代码4-1】WordCount.java
1. package org.hadoop;
2. public class WordCount extends Configured implements Tool {
3. public static void main(String[] args) throws Exception {
4. int result = ToolRunner.run(new WordCount(), args);
5. System.exit(result);
6. }
7. private static String server = "hdfs://server201:8020";
8. public int run(String[] args) throws Exception {
9. if (args.length != 2) {
10. System.err.println("usage: " + this.getClass().getSimpleName()
+ " <inPath> <outPath>");
11. ToolRunner.printGenericCommandUsage(System.out);
12. return -1;
13. }
14. Configuration config = getConf();
15. config.set("fs.defaultFS", server);
16. //指定resourcemanger的地址
17. config.set("yarn.resourcemanager.hostname", "server201");
18. config.set("dfs.replication", "1");
19. config.set("dfs.permissions.enabled", "false");
20. FileSystem fs = FileSystem.get(config);
21. Path dest = new Path(server + args[1]);
22. if (fs.exists(dest)) {
23. fs.delete(dest, true);
24. }
25. Job job = Job.getInstance(config,"WordCount");
26. job.setJarByClass(getClass());
27. job.setMapperClass(WordCountMapper.class);
28. job.setReducerClass(WordCountReducer.class);
29. job.setOutputKeyClass(Text.class);
30. job.setOutputValueClass(LongWritable.class);
31. FileInputFormat.addInputPath(job, new Path(server + args[0]));
32. FileOutputFormat.setOutputPath(job, dest);
33. boolean boo = job.waitForCompletion(true);
34. return boo ? 0 : 1;
35. }
36. public static class WordCountMapper extends Mapper<LongWritable, Text,
Text, LongWritable> {
37. private LongWritable count = new LongWritable(1);
38. private Text text = new Text();
39. @Override
40. public void map(LongWritable key, Text value, Context context) thr
ows IOException, InterruptedException {
41. String str = value.toString();
42. String[] strs = str.split("\\s+");
43. for (String s : strs) {
44. text.set(s);
45. context.write(text, count);
46. }
47. }
48. }
49. public static class WordCountReducer extends Reducer<Text, LongWritable,
Text, LongWritable> {
50. @Override
51. public void reduce(Text key, Iterable<LongWritable> values,Context
context) throws IOException, InterruptedException {
52. long sum = 0;
53. for (LongWritable w : values) {
54. sum += w.get();
55. }
56. context.write(key, new LongWritable(sum));
57. }
58. }
59. }
上例代码中,由于我们声明了完整的地址,所以可以在本地运行测试。在本地运行测试需要输入两个参数。选择IDEA的run > Edit Configurations,并在Program Arguments位置输入读取文件的地址和输出结果的目录,如图4-3所示。
图4-3
在本地环境下直接运行,并查看HDFS上的结果目录,WordCount程序已经将结果输出到指定的目录中。
[hadoop@server201 ~]$ hdfs dfs -ls /out001
Found 2 items
-rw-r--r-- 1 mrchi supergroup 0 2021-03-13 22:14 /out001/_SUCCESS
-rw-r--r-- 1 mrchi supergroup 520 2021-03-13 22:14 /out001/part-r-00000
步骤03 使用Maven打包程序。
在IDEA右侧栏的Maven视图中,单击package并运行,可以得到一个jar包,如图4-4所示。
图4-4
打完的包可以在target目录下找到,将jar包上传到server201服务器的/root目录下,并使用yarn jar执行。
使用yarn jar执行,使用以下命令:
$ yarn jar chapter04-1.0.jar org.hadoop.WordCount /test/a.txt /out002
查看执行结果,即为单词统计的结果。根据处理的文件不同,这个结果文件的内容会有所不同。
[root@server201 ~]# hdfs dfs -cat /out002/* | head
-> 4
0 2
1 4
至此,你已经学会如何在本地及打包到服务器上运行MapReduce程序了。接下来,我们将详解MapReduce的更多细节。
注意: 在本地运行时,有可能会出现以下错误:
Exception in thread "main"
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.Nati
veIO$Windows.access0(Ljava/lang/String;I)Z
解决方案是:将hadoop.dll文件放到windows/system32目录下即可。
可以将Mapper和Reducer开发成内部类,但这两个内部类必须使用public static修饰符。
上例的程序,在IDEA中执行package打包,将得到一个没有任何依赖,只有WordCount代码的jar文件。之后,就可以发布到Linux上并在Hadoop集群中执行。因为在Linux上已经存在了Hadoop的所有依赖包,所以不需要再将Hadoop的所有依赖都打包到jar文件中去。