在Hadoop中,LongWritable、Text都是被序列化的类,它们都是org.apache.hadoop.io.Writable的子类。也只有被序列化的类,才可以在Mapper和Reducer之间传递。在实际开发中,为了适应不同业务的需求,有时必须自己开发Writable类的子类,以实现Hadoop中的个性化开发。以下是JDK的序列化与Hadoop序列化的比较。
● JDK的序列化接口——java.io.Serializable:用于将对象转换成字节流输出,即为序列化,再将字节流转换成对象,即为反序列化。
● Hadoop的序列化接口——org.apache.hadoop.io.Writable:它的特点是紧凑(高效地使用存储空间)、快速(读/写开销小)、可扩展(可以透明的读取数据)、互操作(支持多语言)。
Writable接口的两个主要方法:write(DataOutput)将成员变量按顺序写出,readFields(DataInput)顺序读取成员变量的值。以下是Writable的源代码:
package org.apache.hadoop.io; public interface Writable { void write(DataOutput out) throws IOException; void readFields(DataInput in) throws IOException; }
Writable接口的基本实现,就是在write/readFields中顺序写出和读取成员变量的值。以下代码是一个实现了Writable接口的具体类,此类中省略了setters和getters方法,请注意write和readFields中的代码,必须按顺序读写数据。
【代码4-2】WritableDemo.java
1. package org.hadoop; 2. public class WritableDemo implements Writable { 3. private String name; 4. private Integer age; 5. @Override 6. public void write(DataOutput out) throws IOException { 7. out.writeUTF(name); 8. out.writeInt(age); 9. } 10. @Override 11. public void readFields(DataInput in) throws IOException { 12. name = in.readUTF(); 13. age = in.readInt(); 14. } 15. }
在自定义的类实现接口Writable以后,就可以将这个类作为Key或是Value放到Mapper或是Reducer中当作参数。
现在我们来自己定义一个序列化类,用于统计文本文件中每一行字符的数量。根据这个要求,我们需要定义Mapper的输出类型为自定义的这个Writable,而Mapper的输出则为Reduce的输入。
步骤01 创建目标读取文件。
首先定义一个文本文件,并输入若干行内容。如创建一个a.txt文件并存储在D:/目录下。
Hello This Is First Line . This Example show how to implements Writable
步骤02 创建Writable类的子类。
由于我们的工作是要读取这一行的数据和计算这一行有多少字符,所以应该定义两个成员变量,如【代码4-3】所示。注意,在【代码4-3】中,我们实现的是Writable的子类WritableComparable<T>,因为我们要将此类作为输出的key值,则输出的key必须实现排序的功能,此类WritableComparable的comparaTo方法可以实现排序。
【代码4-3】LineCharCountWritable.java
1. package org.hadoop.writable; 2. public class LineCharCountWritable implements 3. WritableComparable<LineCharCountWritable> { 4. private String line; 5. private Integer count; 6. @Override 7. public void write(DataOutput out) throws IOException { 8. out.writeUTF(line); 9. out.writeInt(count); 10. } 11. @Override 12. public void readFields(DataInput in) throws IOException { 13. line = in.readUTF(); 14. count = in.readInt(); 15. } 16. @Override 17. public int compareTo(LineCharCountWritable o) { 18. return this.count - o.getCount(); 19. } 20. }
步骤03 在MapReduce中使用自定义Writable。
在项目中使用自定义的Writable子类,参见【代码4-4】。
【代码4-4】LineCharCountMR.java
1. package org.hadoop.writable; public class LineCharCountMR extends Configured implements Tool { 2. public static void main(String[] args) throws Exception { 3. int run = ToolRunner.run(new LineCharCountMR(), args); 4. System.exit(run); 5. } 6. @Override 7. public int run(String[] args) throws Exception { 8. if (args.length < 2) { 9. System.out.println("参数错误,使用方法:LineCharCountMR <Input> <O utput>"); 10. ToolRunner.printGenericCommandUsage(System.out); 11. return 1; 12. } 13. Configuration config = getConf(); 14. FileSystem fs = FileSystem.get(config); 15. Path dest = new Path(args[1]); 16. if (fs.exists(dest)) { 17. fs.delete(dest, true); 18. } 19. Job job = Job.getInstance(config, "LineChar"); 20. job.setJarByClass(getClass()); 21. job.setMapperClass(LineMapper.class); 22. job.setMapOutputKeyClass(LineCharCountWritable.class); 23. job.setMapOutputValueClass(NullWritable.class); 24. job.setReducerClass(LineReducer.class); 25. job.setOutputKeyClass(Text.class); 26. job.setOutputValueClass(NullWritable.class); 27. FileInputFormat.addInputPath(job, new Path(args[0])); 28. FileOutputFormat.setOutputPath(job, dest); 29. boolean b = job.waitForCompletion(true); 30. return b ? 0 : 1; 31. } 32. //注意最后一个参数为NullWritable,可以理解为Null 33. public static class LineMapper extends Mapper<LongWritable, Text, Lin eCharCountWritable, NullWritable> { 34. private LineCharCountWritable countWritable = new LineCharCountWri table(); 35. @Override 36. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { 37. String line = value.toString(); 38. if (StringUtils.isBlank(line)) { 39. return; 40. } 41. Integer charCount = line.length(); 42. countWritable.setLine(line); 43. countWritable.setCount(charCount); 44. context.write(countWritable, NullWritable.get()); 45. } 46. } 47. public static class LineReducer extends Reducer<LineCharCountWritable, NullWritable, Text, NullWritable> { 48. private Text text = new Text(); 49. @Override 50. protected void reduce(LineCharCountWritable key, Iterable<NullWrit able> values, Context context) throws IOException, InterruptedException { 51. text.set(key.getLine() + "\t" + key.getCount()); 52. context.write(text, NullWritable.get()); 53. } 54. } 55. }
步骤04 运行项目。
现在运行项目,在IDEA的run > Edit Configuratin中添加两个参数,如图4-5所示。由于代码是在IDEA中直接运行,即运行在本地,所以传递本地的目录即可,如果运行在Hadoop集群中,请传递HDFS上的文件和目录。
图4-5
运行完成后查看输出的结果,如下所示:
This Example show how 21 to implements Writable 22 Hello This Is First Line . 26
从上面输出的结果可以看出,已经按每行字符数量进行了排序,而这是comparaTo的功能。这个例子演示了如何自定义Writable实现序列化的功能。