在Hadoop开发中,通过Mapper输出给Reducer的数据已经根据key值进行了排序,即Mapper<Key1_input,Value1_input,Key2_out,Value2_out>,其中默认根据key2_out进行排序。
所以,如果希望输出的数据可以实现排序,可以通过以下方式实现:
● 开发JavaBean实现接口WritableComparable,此类是Writable的子类。
● 将JavaBean作为Mapper的Key2输出给Reducer。
● 实现JavaBean的comparaTo方法,实现比较。
在开发时,可以开发多个MapReduce处理过程,形成一个处理链,后面的MapReduce处理前面MapReduce输出的结果,直接到满足最后所需要的数据格式。
以下开发一个自定义排序的Wriable类,为大家演示自定义排序的开发过程。
之前WordCount输出的数据,默认是升序排序的。现在我们可以自定义一个排序规则,让它按倒序排序。
【代码4-6】MyText.java
1. package org.hadoop.sort; 2. public class MyText implements WritableComparable<MyText> { 3. private String text; 4. @Override 5. public int compareTo(MyText o) { 6. return o.text.compareTo(this.text); 7. } 8. @Override 9. public void write(DataOutput out) throws IOException { 10. out.writeUTF(text); 11. } 12. @Override 13. public void readFields(DataInput in) throws IOException { 14. text = in.readUTF(); 15. } 16. public String getText() { 17. return text; 18. } 19. public void setText(String text) { 20. this.text = text; 21. } 22. }
然后将MyText作为Mapper的输出Key值。首先设置job输出的格式:
job.setMapOutputKeyClass(MyText.class); job.setMapOutputValueClass(LongWritable.class);
设置为Mapper的输出:
public static class WordCountMapper extends Mapper<LongWritable, Text, MyText, LongWritable> { ... }
设置Reducer接收MyText作为输入:
public static class WordCountReducer extends Reducer<MyText, LongWritable, Te xt, LongWritable> { .. }
执行完成以后,查看输出的结果。
mrchiian 1 value.toString(); 1 throws 4 text.set(s); 1
由结果可以看出,已经是倒序了。
如果不希望修改Mapper输出的Key值部分,则可以定义一个Comparator,如【代码4-7】所示。
【代码4-7】MyComparator.java
1. package org.hadoop.sort; 2. public class MyComparator extends WritableComparator { 3. public MyComparator() { 4. //注意构造部分,第二个的true必须传递,否则会抛出异常 5. super(Text.class,true); 6. } 7. @Override 8. public int compare(WritableComparable a, WritableComparable b) { 9. int c = b.compareTo(a); 10. return c; 11. } 12. }
然后在job中添加一个排序对象即可:
job.setSortComparatorClass(MyComparator.class);
最后,执行并查看结果,已经可以按指定的顺序显示了。