购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

2.7
快速MapReduce程序示例

MapReduce为分布式计算模型,分布式计算最早由Google提出。MapReduce将运算的过程分为两个阶段,map和reduce阶段。用户只需要实现map和reduce两个函数即可。此处先为大家演示一个运行在本地的MapReduce程序,后续章节将会重点讲解MapReduce的开发。

前面曾经讲过,MapReduce可以直接在本地模式下运行。在项目中添加hadoop-mini-cluster依赖,即可以直接在本地IDE环境中运行MapReduce程序,而不需要依赖于Hadoop集群环境,这个特点在测试开发中非常有用。为了帮助读者开发,我们在代码中加入了开发的步骤,具体代码将会在第4章中详细讲解。

在本地运行Hadoop,需要将hadoop.dll文件放到Windows/system32目录下,此文件可以在winutils目录下找到。

【代码2-6】Demo02MapReduce.java

 
    1.  package org.hadoop;
    2.  /**
    3.  * MapReduce示例程序
    4.  */
    5.  //1:开发一个类,继承Configured实现接口Tool
    6.  public class Demo05MapReduce extends Configured implements Tool {
    7.  //3:添加main函数
    8.  public static void main(String[] args) throws Exception {
    9.      //7:开始任务
    10.     System.setProperty("HADOOP_HOME", "D:/program/hadoop-3.2.2");
    11.     System.setProperty("hadoop.home.dir", "D:/program/hadoop-3.2.2");
    12.     int res = ToolRunner.run(new Demo05MapReduce(),args);
    13.     System.exit(res);
    14. }
    15. //2:实现run函数
    16. @Override
    17. public int run(String[] strings) throws Exception {
    18.     //6:开发run函数内部代码
    19.     Configuration conf =getConf();
    20.     Job job = Job.getInstance(conf,"WordCount");
    21.     job.setJarByClass(getClass());
    22.     FileSystem fs = FileSystem.get(conf);
    23.     //声明输出目录
    24.     Path dest = new Path("D:/a/out");
    25.     //如果输出目录已存在则删除
    26.     if(fs.exists(dest)){
    27.         fs.delete(dest,true);
    28.     }
    29.     //设置Mapper及Mapper输出的类型
    30.     job.setMapperClass(MyMapper.class);
    31.     job.setMapOutputKeyClass(Text.class);
    32.     job.setMapOutputValueClass(LongWritable.class);
    33.     //设置Reduce及Reduce的输出类型
    34.     job.setReducerClass(MyReduce.class);
    35.     job.setOutputKeyClass(Text.class);
    36.     job.setOutputValueClass(LongWritable.class);
    37.     //设置输入和输出类型
    38.     job.setInputFormatClass(TextInputFormat.class);
    39.     job.setOutputFormatClass(TextOutputFormat.class);
    40.     //设置输入/输出目录
    41.     FileInputFormat.addInputPath(job,new Path("D:/a/a.txt"));
    42.     FileOutputFormat.setOutputPath(job,dest);
    43.     //开始执行任务
    44.     boolean boo = job.waitForCompletion(true);
    45.     return boo?0:1;
    46. }
    47. //4:开发Mapper类的实现类
    48. public static class MyMapper extends
    Mapper<LongWritable, Text,Text,LongWritable>{
    49.     @Override
    50.     protected void map(LongWritable key, Text value, Context context) thro
ws   IOException, InterruptedException {
    51.     Text outKey = new Text();
    52.     LongWritable outValue = new LongWritable(1L);
    53.     if(value.getLength()>0){
    54.         String[] strs = value.toString().split("\\s+");
    55.         for(String str:strs){
    56.             outKey.set(str);
    57.            context.write(outKey,outValue);
    58.         }
    59.     }
    60. }
    61. }
    62. //5:开发Reduce程序
    63. public static class MyReduce extends Reducer<Text,LongWritable,Text,Long
Writable>{
    64.     @Override
    65.     protected void reduce(Text key, Iterable<LongWritable> values, Contex
t context) throws IOException, InterruptedException {
    66.         long sum = 0;
    67.         LongWritable resultValue = new LongWritable(0);
    68.         for(LongWritable v:values){
    69.             sum+=v.get();
    70.         }
    71.         resultValue.set(sum);
    72.         context.write(key,resultValue);
    73.     }
    74. }
    75. }

运行后,查看D:\a\out目录下输出的文件:

 
    ._SUCCESS.crc
    .part-r-00000.crc
    _SUCCESS
    part-r-00000

打开part-r-0000即为字符统计的结果,根据源文件不同,统计的结果也会不同,以下仅为参考。默认会以key排序,所以输出的数据是以字母作为顺序排列输出的。 MBl0t8/o62aqK2c37EkPXGL3K0G/NNZrUOOp/Imnbd0+DeNMZ6w48TIpI8RNyC4K

 
    Configuration     1
    Configured 1
    Context 2
    Demo05MapReduce 1
    Demo05MapReduce(),args);  1
    Exception     2
点击中间区域
呼出菜单
上一章
目录
下一章
×