购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.3
自定义Writable

在Hadoop中,LongWritable、Text都是被序列化的类,它们都是org.apache.hadoop.io.Writable的子类。也只有被序列化的类,才可以在Mapper和Reducer之间传递。在实际开发中,为了适应不同业务的需求,有时必须自己开发Writable类的子类,以实现Hadoop中的个性化开发。以下是JDK的序列化与Hadoop序列化的比较。

● JDK的序列化接口——java.io.Serializable:用于将对象转换成字节流输出,即为序列化,再将字节流转换成对象,即为反序列化。

● Hadoop的序列化接口——org.apache.hadoop.io.Writable:它的特点是紧凑(高效地使用存储空间)、快速(读/写开销小)、可扩展(可以透明的读取数据)、互操作(支持多语言)。

Writable接口的两个主要方法:write(DataOutput)将成员变量按顺序写出,readFields(DataInput)顺序读取成员变量的值。以下是Writable的源代码:

 
    package org.apache.hadoop.io;
    public interface Writable {
        void write(DataOutput out) throws IOException;
        void readFields(DataInput in) throws IOException;
    }

Writable接口的基本实现,就是在write/readFields中顺序写出和读取成员变量的值。以下代码是一个实现了Writable接口的具体类,此类中省略了setters和getters方法,请注意write和readFields中的代码,必须按顺序读写数据。

【代码4-2】WritableDemo.java

 
    1.  package org.hadoop;
    2.  public class WritableDemo implements Writable {
    3.      private String name;
    4.      private Integer age;
    5.      @Override
    6.      public void write(DataOutput out) throws IOException {
    7.         out.writeUTF(name);
    8.         out.writeInt(age);
    9.     }
    10.    @Override
    11.    public void readFields(DataInput in) throws IOException {
    12.        name = in.readUTF();
    13.        age = in.readInt();
    14.    }
    15. }

在自定义的类实现接口Writable以后,就可以将这个类作为Key或是Value放到Mapper或是Reducer中当作参数。

现在我们来自己定义一个序列化类,用于统计文本文件中每一行字符的数量。根据这个要求,我们需要定义Mapper的输出类型为自定义的这个Writable,而Mapper的输出则为Reduce的输入。

步骤01 创建目标读取文件。

首先定义一个文本文件,并输入若干行内容。如创建一个a.txt文件并存储在D:/目录下。

 
    Hello This Is First Line .
    This Example show how
    to implements Writable

步骤02 创建Writable类的子类。

由于我们的工作是要读取这一行的数据和计算这一行有多少字符,所以应该定义两个成员变量,如【代码4-3】所示。注意,在【代码4-3】中,我们实现的是Writable的子类WritableComparable<T>,因为我们要将此类作为输出的key值,则输出的key必须实现排序的功能,此类WritableComparable的comparaTo方法可以实现排序。

【代码4-3】LineCharCountWritable.java

 
    1.  package org.hadoop.writable;
    2.  public class LineCharCountWritable implements
    3.                  WritableComparable<LineCharCountWritable> {
    4.      private String line;
    5.      private Integer count;
    6.      @Override
    7.      public void write(DataOutput out) throws IOException {
    8.          out.writeUTF(line);
    9.          out.writeInt(count);
    10.     }
    11.     @Override
    12.     public void readFields(DataInput in) throws IOException {
    13.         line = in.readUTF();
    14.         count = in.readInt();
    15.     }
    16.     @Override
    17.     public int compareTo(LineCharCountWritable o) {
    18.         return this.count - o.getCount();
    19.     }
    20. }

步骤03 在MapReduce中使用自定义Writable。

在项目中使用自定义的Writable子类,参见【代码4-4】。

【代码4-4】LineCharCountMR.java

 
    1.   package org.hadoop.writable;
    public class LineCharCountMR extends Configured implements Tool {
    2.       public static void main(String[] args) throws Exception {
    3.           int run = ToolRunner.run(new LineCharCountMR(), args);
    4.           System.exit(run);
    5.       }
    6.       @Override
    7.       public int run(String[] args) throws Exception {
    8.           if (args.length < 2) {
    9.               System.out.println("参数错误,使用方法:LineCharCountMR <Input> <O
utput>");
    10.             ToolRunner.printGenericCommandUsage(System.out);
    11.             return 1;
    12.         }
    13.         Configuration config = getConf();
    14.         FileSystem fs = FileSystem.get(config);
    15.         Path dest = new Path(args[1]);
    16.         if (fs.exists(dest)) {
    17.             fs.delete(dest, true);
    18.         }
    19.         Job job = Job.getInstance(config, "LineChar");
    20.         job.setJarByClass(getClass());
    21.         job.setMapperClass(LineMapper.class);
    22.         job.setMapOutputKeyClass(LineCharCountWritable.class);
    23.         job.setMapOutputValueClass(NullWritable.class);
    24.         job.setReducerClass(LineReducer.class);
    25.         job.setOutputKeyClass(Text.class);
    26.         job.setOutputValueClass(NullWritable.class);
    27.         FileInputFormat.addInputPath(job, new Path(args[0]));
    28.         FileOutputFormat.setOutputPath(job, dest);
    29.         boolean b = job.waitForCompletion(true);
    30.         return b ? 0 : 1;
    31.     }
    32.     //注意最后一个参数为NullWritable,可以理解为Null
    33.     public static class LineMapper extends Mapper<LongWritable, Text, Lin
eCharCountWritable, NullWritable> {
    34.         private LineCharCountWritable countWritable = new LineCharCountWri
table();
    35.         @Override
    36.         protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
    37.             String line = value.toString();
    38.             if (StringUtils.isBlank(line)) {
    39.                 return;
    40.             }
    41.             Integer charCount = line.length();
    42.             countWritable.setLine(line);
    43.             countWritable.setCount(charCount);
    44.             context.write(countWritable, NullWritable.get());
    45.         }
    46.     }
    47.    public static class LineReducer extends Reducer<LineCharCountWritable,
 NullWritable, Text, NullWritable> {
    48.         private Text text = new Text();
    49.         @Override
    50.         protected void reduce(LineCharCountWritable key, Iterable<NullWrit
able> values, Context context) throws IOException, InterruptedException {
    51.             text.set(key.getLine() + "\t" + key.getCount());
    52.             context.write(text, NullWritable.get());
    53.         }
    54.     }
    55. }

步骤04 运行项目。

现在运行项目,在IDEA的run > Edit Configuratin中添加两个参数,如图4-5所示。由于代码是在IDEA中直接运行,即运行在本地,所以传递本地的目录即可,如果运行在Hadoop集群中,请传递HDFS上的文件和目录。

图4-5

运行完成后查看输出的结果,如下所示:

 
    This Example show how      21
    to implements Writable 22
    Hello This Is First Line .     26

从上面输出的结果可以看出,已经按每行字符数量进行了排序,而这是comparaTo的功能。这个例子演示了如何自定义Writable实现序列化的功能。 bjzagC7eWMyI4oh55EoTfwVU6QA4dF7ZFMcAIeL3gKHT/l4GC5hJZWCxejCvkv1T

点击中间区域
呼出菜单
上一章
目录
下一章
×