购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.2
WordCount示例

为了使读者快速掌握MapReduce,本节再次为大家开发和演示WordCount示例程序,并以本地运行和服务器运行的方式分别部署,使读者更深入了解MapReduce的开发、运行和部署。

前面已经介绍MapReduce程序可以运行在本地,也可以打包后运行在Hadoop集群上。之前已经开发过运行在本地的MapReduce程序,这里我们将使用打包的方式将程序打包后放到Hadoop集群上运行。

步骤01 创建Java项目并添加依赖。

创建Java项目,并添加以下依赖。注意,本次以添加的依赖为hadoop-mincluster,且设置scope的值为provided(意思是,在打包时将不会被打包到依赖的jar包中)。

 
    1.  <dependency>
    2.      <groupId>org.apache.hadoop</groupId>
    3.      <artifactId>hadoop-minicluster</artifactId>
    4.      <scope>provided</scope>
    5.  </dependency>

步骤02 开发WordCount的完整代码。

【代码4-1】WordCount.java

 
    1.  package org.hadoop;
    2.  public class WordCount extends Configured implements Tool {
    3.      public static void main(String[] args) throws Exception {
    4.          int result = ToolRunner.run(new WordCount(), args);
    5.          System.exit(result);
    6.      }
    7.      private static String server = "hdfs://server201:8020";
    8.      public int run(String[] args) throws Exception {
    9.          if (args.length != 2) {
    10.             System.err.println("usage: " + this.getClass().getSimpleName()
         + " <inPath> <outPath>");
    11.             ToolRunner.printGenericCommandUsage(System.out);
    12.             return -1;
    13.         }
    14.         Configuration config = getConf();
    15.         config.set("fs.defaultFS", server);
    16.         //指定resourcemanger的地址
    17.         config.set("yarn.resourcemanager.hostname", "server201");
    18.         config.set("dfs.replication", "1");
    19.         config.set("dfs.permissions.enabled", "false");
    20.         FileSystem fs = FileSystem.get(config);
    21.         Path dest = new Path(server + args[1]);
    22.         if (fs.exists(dest)) {
    23.             fs.delete(dest, true);
    24.         }
    25.         Job job = Job.getInstance(config,"WordCount");
    26.         job.setJarByClass(getClass());
    27.         job.setMapperClass(WordCountMapper.class);
    28.         job.setReducerClass(WordCountReducer.class);
    29.         job.setOutputKeyClass(Text.class);
    30.         job.setOutputValueClass(LongWritable.class);
    31.         FileInputFormat.addInputPath(job, new Path(server + args[0]));
    32.         FileOutputFormat.setOutputPath(job, dest);
    33.         boolean boo = job.waitForCompletion(true);
    34.         return boo ? 0 : 1;
    35.     }
    36.    public static class WordCountMapper extends Mapper<LongWritable, Text,
        Text, LongWritable> {
    37.         private LongWritable count = new LongWritable(1);
    38.         private Text text = new Text();
    39.         @Override
    40.         public void map(LongWritable key, Text value, Context context) thr
       ows IOException, InterruptedException {
    41.             String str = value.toString();
    42.             String[] strs = str.split("\\s+");
    43.             for (String s : strs) {
    44.                 text.set(s);
    45.                 context.write(text, count);
    46.             }
    47.         }
    48.     }
    49.    public static class WordCountReducer extends Reducer<Text, LongWritable,
        Text, LongWritable> {
    50.         @Override
    51.         public void reduce(Text key, Iterable<LongWritable> values,Context
        context) throws IOException, InterruptedException {
    52.             long sum = 0;
    53.             for (LongWritable w : values) {
    54.                 sum += w.get();
    55.             }
    56.             context.write(key, new LongWritable(sum));
    57.         }
    58.     }
    59. }

上例代码中,由于我们声明了完整的地址,所以可以在本地运行测试。在本地运行测试需要输入两个参数。选择IDEA的run > Edit Configurations,并在Program Arguments位置输入读取文件的地址和输出结果的目录,如图4-3所示。

图4-3

在本地环境下直接运行,并查看HDFS上的结果目录,WordCount程序已经将结果输出到指定的目录中。

 
    [hadoop@server201 ~]$ hdfs dfs -ls /out001
    Found 2 items
    -rw-r--r--   1 mrchi supergroup     0 2021-03-13 22:14 /out001/_SUCCESS
    -rw-r--r--   1 mrchi supergroup     520 2021-03-13 22:14 /out001/part-r-00000

步骤03 使用Maven打包程序。

在IDEA右侧栏的Maven视图中,单击package并运行,可以得到一个jar包,如图4-4所示。

图4-4

打完的包可以在target目录下找到,将jar包上传到server201服务器的/root目录下,并使用yarn jar执行。

使用yarn jar执行,使用以下命令:

 
    $ yarn jar chapter04-1.0.jar org.hadoop.WordCount /test/a.txt /out002

查看执行结果,即为单词统计的结果。根据处理的文件不同,这个结果文件的内容会有所不同。

 
    [root@server201 ~]# hdfs dfs -cat /out002/* | head
    ->  4
    0   2
    1   4

至此,你已经学会如何在本地及打包到服务器上运行MapReduce程序了。接下来,我们将详解MapReduce的更多细节。

注意: 在本地运行时,有可能会出现以下错误:

 
    Exception in thread "main"
    java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.Nati
veIO$Windows.access0(Ljava/lang/String;I)Z

解决方案是:将hadoop.dll文件放到windows/system32目录下即可。

可以将Mapper和Reducer开发成内部类,但这两个内部类必须使用public static修饰符。

上例的程序,在IDEA中执行package打包,将得到一个没有任何依赖,只有WordCount代码的jar文件。之后,就可以发布到Linux上并在Hadoop集群中执行。因为在Linux上已经存在了Hadoop的所有依赖包,所以不需要再将Hadoop的所有依赖都打包到jar文件中去。 ZRoLX4dYml1RYuay5i6MAG6Xy24XXGuqez5bcEKiCh+vQp4wCh0DfLDNMFph2O0r

点击中间区域
呼出菜单
上一章
目录
下一章
×