购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

5.6 案例分析:二次排序

MapReduce在传递<key,value>对时默认根据key进行排序,而有时候除了key以外,还需要根据value或value中的某一个字段进行排序,基于这种需求进行的自定义排序称为“二次排序”。

例如有以下数据:

     A 3
     B 5
     C 1
     B 6
     A 4
     C 5

现需要对上述数据先按照第一字段进行升序排列,若第一字段相同,则按照第二字段进行降序排列,期望的输出结果如下:

     A 4
     A 3
     B 6
     B 5
     C 5
     C 1

1.设计思路

由于MapReduce中主要是对key的比较和排序,因此可以将需要排序的两个字段组合成一个复合key,而value值不变,则组合后的<key,value>对形如<(key,value),value>。

在编程时可以自定义一个MyKeyPair类,该类中包含要排序的两个字段,然后将该类作为<key,value>对中的key(Hadoop中的任何类型都可以作为key),形如<MyKeyPair,value>,相当于自定义key的类型。由于所有的key都是可序列化并且可比较的,因此自定义的key需要实现接口WritableComparable。

与按照一个字段排序相比,本次二次排序需要自定义的地方如下:

·自定义组合key类,需要实现WritableComparable接口。

·自定义分区类,按照第一个字段进行分区,需要继承Partitioner类。

·自定义分组类,按照第一个字段进行分组,需要继承WritableComparator类。

2.编写程序

(1)自定义组合key类

新建自定义组合key类MyKeyPair.java,该类需要实现Hadoop提供的org.apache.hadoop.io.WritableComparable接口,该接口继承了org.apache.hadoop.io.Writable接口和java.lang.Comparable接口,定义源码如下:

     public interface WritableComparable<T> extends Writable, Comparable<T> {
     }

然后需要实现WritableComparable接口中的序列化方法write()、反序列化方法readFields()、比较方法compareTo()。write()方法用于将数据写入输出流;readFields()方法用于从输入流读取数据;compareTo()方法用于将两个对象进行比较,以便能够进行排序。

自定义组合key类MyKeyPair.java的源码如下:

     import java.io.DataInput;
     import java.io.DataOutput;
     import java.io.IOException;
     import org.apache.hadoop.io.WritableComparable;
  
     /**
      * 自定义组合key类
      */
     public class MyKeyPair implements WritableComparable<MyKeyPair> {
      //组合key属性
      private String first;//第一个排序字段
      private int second;//第二个排序字段
      /**
       * 实现该方法,反序列化对象input中的字段
       */
      public void readFields(DataInput input) throws IOException {
       this.first = input.readUTF();
       this.second = input.readInt();
      }
      /**
       * 实现该方法,序列化对象output中的字段
       */
      public void write(DataOutput output) throws IOException {
       output.writeUTF(first);
       output.writeInt(second);
      }
      /**
       * 实现比较器
       */
      public int compareTo(MyKeyPair o) {
       //默认升序排列
       int res = this.first.compareTo(o.first);
       if (res != 0) {//若第一个字段不相等,则返回
        return res;
       } else { //若第一个字段相等,则比较第二个字段,且降序排列
        return -Integer.valueOf(this.second).compareTo(
          Integer.valueOf(o.getSecond()));
       }
      }
      /**
       * 字段的get和set方法
       */
      public int getSecond() {
       return second;
      }
      public void setSecond(int second) {
       this.second = second;
      }
      public String getFirst() {
       return first;
      }
      public void setFirst(String first) {
       this.first = first;
      }
     }

(2)自定义分区类

新建自定义分区类MyPartitioner.java,该类需要继承Hadoop提供的org.apache.hadoop.mapreduce.Partitioner类,并实现其中的抽象方法getPartition()。Partitioner类是一个抽象泛型类,用于控制对Map任务输出结果的分区,泛型的两个参数分别表示<key,value>对中key的类型和value的类型。Partitioner类的源码如下:

     /**
     * 分区类Partitioner
     */
     public abstract class Partitioner<KEY, VALUE> {
  
       /**
        * 得到分区编号
        *
        * @param key:需要分区的<key,value>对中的key
        * @param value:需要分区的<key,value>对中的value
        * @param numPartitions:分区数量(与Reduce任务数量相同)
        * @return 分区编号
        */
       public abstract int getPartition(KEY key, VALUE value, int numPartitions);
     }

关于MapReduce的分区规则可参考5.1.3节的MapReduce工作原理,此处不再赘述。

自定义分区类MyPartitioner.java的源码如下:

     import org.apache.hadoop.io.IntWritable;
     import org.apache.hadoop.mapreduce.Partitioner;
     /**
      * 自定义分区类
      */
     public class MyPartitioner extends Partitioner<MyKeyPair, IntWritable> {
      /**
       * 实现抽象方法getPartition(),自定义分区字段
       * @param myKeyPair:<key,value>对中key的类型
       * @param value:<key,value>对中value的类型
       * @param numPartitions:分区的数量(等于Reduce任务数量)
       * @return 分区编号
       */
      public int getPartition(MyKeyPair myKeyPair, IntWritable value,
        int numPartitions) {
       //将第一个字段作为分区字段
       return (myKeyPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions;
      }
     }

上述代码继承Partitioner类的同时指定了<key,value>对中key的类型为MyKeyPair,value的类型为IntWritable。

(3)自定义分组类

新建自定义分组类MyGroupComparator.java,该类需要继承Hadoop提供的org.apache.hadoop.io.WritableComparator类,并重写其中的compare()方法,以实现按照指定的字段进行分组。

自定义分组类MyGroupComparator.java的源码如下:

     import org.apache.hadoop.io.WritableComparator;
  
     /**
      * 自定义分组类
      */
     public class MyGroupComparator extends WritableComparator {
  
      protected MyGroupComparator() {
       //指定分组<key,value>对中key的类型,true为创建该类型的实例,若不指定类型将报空值错误
       super(MyKeyPair.class, true);
      }
  
      //重写compare()方法,以第一个字段进行分组
      public int compare(MyKeyPair o1, MyKeyPair o2) {
       return o1.getFirst().compareTo(o2.getFirst());
      }
     }

上述代码首先通过构造方法指定了<key,value>对中key的类型为MyKeyPair,由于MapReduce默认以<key,value>对中的key值进行分组,因此接下来重写了compare()方法,实现了按照MyKeyPair对象中的first字段进行对比,若值相等则会将当前<key,value>对分为一组。

(4)定义Mapper类

新建Mapper类MyMapper.java,将输入的数据封装为<MyKeyPair, IntWritable>形式的<key,value>对进行输出,即输出的key的类型为MyKeyPair,value的类型为IntWritable。

Mapper类MyMapper.java的源码如下:

     import org.apache.hadoop.io.IntWritable;
     import org.apache.hadoop.io.LongWritable;
     import org.apache.hadoop.io.Text;
     import org.apache.hadoop.mapreduce.Mapper;
  
     import java.io.IOException;
     import java.util.StringTokenizer;
  
     /**
      * 定义Mapper类
      */
     public class MyMapper extends
       Mapper<LongWritable, Text, MyKeyPair, IntWritable> {
      /**
       * 重写map()方法
       */
      public void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
       String line = value.toString();
       //将输入的一行数据默认按空格、制表符\t、换行符\n、回车符\r进行分割,也可以加入一个参数指定
         分隔符
       StringTokenizer itr = new StringTokenizer(line);
       String first = itr.nextToken();//得到第一个字段值
       String second = itr.nextToken();//得到第二个字段值
       //设置组合key和value ==> <(key,value),value>
       //设置MyKeyPair类型的输出key
       MyKeyPair outKey = new MyKeyPair();
       outKey.setFirst(first);
       outKey.setSecond(Integer.valueOf(second));
       //设置IntWritable类型的输出value
       IntWritable outValue = new IntWritable();
       outValue.set(Integer.valueOf(second));
       //输出<key,value>对
       context.write(outKey, outValue);
      }
     }

(5)定义Reducer类

新建Reducer类MyReducer.java,将接收到的分组后的<key,value-list>对进行循环输出。

Reducer类MyReducer.java的源码如下:

     import org.apache.hadoop.io.IntWritable;
     import org.apache.hadoop.io.Text;
     import org.apache.hadoop.mapreduce.Reducer;
     import java.io.IOException;
     /**
      * 定义Reducer类
      */
     public class MyReducer extends
       Reducer<MyKeyPair, IntWritable, Text, IntWritable> {
      /**
       * 重写reduce()方法
       */
      public void reduce(MyKeyPair key, Iterable<IntWritable> values, Context context)
        throws IOException, InterruptedException {
       //定义Text类型的输出key
       Text outKey = new Text();
       //循环输出<key,value>对
       for (IntWritable value : values) {
        outKey.set(key.getFirst());
        context.write(outKey, value);
       }
      }
     }

上述代码将MyKeyPair类型的key中的first字段值作为输出的key,输出的value从集合values中进行遍历。

(6)定义应用程序主类

新建应用程序主类MySecondSortApp.java,在该类中需要指定自定义的分区类和分组类,同时需要显式设置Map任务输出的key和value的类型。

应用程序主类MySecondSortApp.java的源码如下:

上述代码解析如下:

设置map()方法输出的key和value的类型。若省略,则默认采用 中设置的输出类型。也就是说,若map()方法和reduce()方法的输出类型一致,可以省略对map()方法输出类型的设置。若map()方法和reduce()方法实际的输出类型与此处的设置不匹配,则程序运行过程中将会报错。

在MapReduce程序运行的过程中会通过JobConf类获取map()方法的输出类型,获取map()方法输出key的类型的源码如下:

     public Class<?> getMapOutputKeyClass() {
       Class<?> retv = this.getClass("mapreduce.map.output.key.class", (Class)null,
                          Object.class);
       if (retv == null) {//没有设置map()的输出类型
           retv = this.getOutputKeyClass();
       }
       return retv;
     }

从上述源码可以看出,当没有设置map()方法的输出类型时,会调用getOutputKeyClass()方法使用reduce()方法的输出类型。

在执行MapReduce程序时,会首先从HDFS中读取数据块,然后按行拆分成<key,value>对,这个过程是由TextInputFormat类完成的。TextInputFormat类继承了抽象类FileInputFormat<K, V>,而FileInputFormat<K, V>又继承了抽象类InputFormat<K, V>。抽象类InputFormat<K, V>中定义了两个方法:getSplits()和createRecordReader()。getSplits()方法负责将HDFS数据解析为InputSplit集合,createRecordReader()方法负责将一个InputSplit解析为一个<key,value>对记录。抽象类InputFormat<K, V>的源码如下:

     public abstract class InputFormat<K, V> {
         public InputFormat() {
         }
         public abstract List<InputSplit> getSplits(JobContext var1) throws IOException,
               InterruptedException;
  
         public abstract RecordReader<K, V> createRecordReader(InputSplit var1,
               TaskAttemptContext var2) throws IOException, InterruptedException;
     }

3.程序运行

程序的打包和执行参考前面的“单词计数”和“数据去重”案例,此处不再赘述。

执行完成后,查看执行结果,如图5-11所示。

图5-11 查看二次排序程序的执行结果 DiZoJIAiv6pWvAMXlyz8egCijGeTFtww8CqTeS+6VGpTaLltmDuQN/5ZKKqoZ4BJ

点击中间区域
呼出菜单
上一章
目录
下一章
×