



在Hadoop开发中,通过Mapper输出给Reducer的数据已经根据key值进行了排序,即Mapper<Key1_input,Value1_input,Key2_out,Value2_out>,其中默认根据key2_out进行排序。
所以,如果希望输出的数据可以实现排序,可以通过以下方式实现:
● 开发JavaBean实现接口WritableComparable,此类是Writable的子类。
● 将JavaBean作为Mapper的Key2输出给Reducer。
● 实现JavaBean的comparaTo方法,实现比较。
在开发时,可以开发多个MapReduce处理过程,形成一个处理链,后面的MapReduce处理前面MapReduce输出的结果,直接到满足最后所需要的数据格式。
以下开发一个自定义排序的Wriable类,为大家演示自定义排序的开发过程。
之前WordCount输出的数据,默认是升序排序的。现在我们可以自定义一个排序规则,让它按倒序排序。
【代码4-6】MyText.java
1. package org.hadoop.sort;
2. public class MyText implements WritableComparable<MyText> {
3. private String text;
4. @Override
5. public int compareTo(MyText o) {
6. return o.text.compareTo(this.text);
7. }
8. @Override
9. public void write(DataOutput out) throws IOException {
10. out.writeUTF(text);
11. }
12. @Override
13. public void readFields(DataInput in) throws IOException {
14. text = in.readUTF();
15. }
16. public String getText() {
17. return text;
18. }
19. public void setText(String text) {
20. this.text = text;
21. }
22. }
然后将MyText作为Mapper的输出Key值。首先设置job输出的格式:
job.setMapOutputKeyClass(MyText.class);
job.setMapOutputValueClass(LongWritable.class);
设置为Mapper的输出:
public static class WordCountMapper extends Mapper<LongWritable, Text, MyText,
LongWritable> { ... }
设置Reducer接收MyText作为输入:
public static class WordCountReducer extends Reducer<MyText, LongWritable, Te
xt, LongWritable> { .. }
执行完成以后,查看输出的结果。
mrchiian 1
value.toString(); 1
throws 4
text.set(s); 1
由结果可以看出,已经是倒序了。
如果不希望修改Mapper输出的Key值部分,则可以定义一个Comparator,如【代码4-7】所示。
【代码4-7】MyComparator.java
1. package org.hadoop.sort;
2. public class MyComparator extends WritableComparator {
3. public MyComparator() {
4. //注意构造部分,第二个的true必须传递,否则会抛出异常
5. super(Text.class,true);
6. }
7. @Override
8. public int compare(WritableComparable a, WritableComparable b) {
9. int c = b.compareTo(a);
10. return c;
11. }
12. }
然后在job中添加一个排序对象即可:
job.setSortComparatorClass(MyComparator.class);
最后,执行并查看结果,已经可以按指定的顺序显示了。