MapReduce在传递<key,value>对时默认根据key进行排序,而有时候除了key以外,还需要根据value或value中的某一个字段进行排序,基于这种需求进行的自定义排序称为“二次排序”。
例如有以下数据:
A 3 B 5 C 1 B 6 A 4 C 5
现需要对上述数据先按照第一字段进行升序排列,若第一字段相同,则按照第二字段进行降序排列,期望的输出结果如下:
A 4 A 3 B 6 B 5 C 5 C 1
由于MapReduce中主要是对key的比较和排序,因此可以将需要排序的两个字段组合成一个复合key,而value值不变,则组合后的<key,value>对形如<(key,value),value>。
在编程时可以自定义一个MyKeyPair类,该类中包含要排序的两个字段,然后将该类作为<key,value>对中的key(Hadoop中的任何类型都可以作为key),形如<MyKeyPair,value>,相当于自定义key的类型。由于所有的key都是可序列化并且可比较的,因此自定义的key需要实现接口WritableComparable。
与按照一个字段排序相比,本次二次排序需要自定义的地方如下:
·自定义组合key类,需要实现WritableComparable接口。
·自定义分区类,按照第一个字段进行分区,需要继承Partitioner类。
·自定义分组类,按照第一个字段进行分组,需要继承WritableComparator类。
(1)自定义组合key类
新建自定义组合key类MyKeyPair.java,该类需要实现Hadoop提供的org.apache.hadoop.io.WritableComparable接口,该接口继承了org.apache.hadoop.io.Writable接口和java.lang.Comparable接口,定义源码如下:
public interface WritableComparable<T> extends Writable, Comparable<T> { }
然后需要实现WritableComparable接口中的序列化方法write()、反序列化方法readFields()、比较方法compareTo()。write()方法用于将数据写入输出流;readFields()方法用于从输入流读取数据;compareTo()方法用于将两个对象进行比较,以便能够进行排序。
自定义组合key类MyKeyPair.java的源码如下:
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 自定义组合key类 */ public class MyKeyPair implements WritableComparable<MyKeyPair> { //组合key属性 private String first;//第一个排序字段 private int second;//第二个排序字段 /** * 实现该方法,反序列化对象input中的字段 */ public void readFields(DataInput input) throws IOException { this.first = input.readUTF(); this.second = input.readInt(); } /** * 实现该方法,序列化对象output中的字段 */ public void write(DataOutput output) throws IOException { output.writeUTF(first); output.writeInt(second); } /** * 实现比较器 */ public int compareTo(MyKeyPair o) { //默认升序排列 int res = this.first.compareTo(o.first); if (res != 0) {//若第一个字段不相等,则返回 return res; } else { //若第一个字段相等,则比较第二个字段,且降序排列 return -Integer.valueOf(this.second).compareTo( Integer.valueOf(o.getSecond())); } } /** * 字段的get和set方法 */ public int getSecond() { return second; } public void setSecond(int second) { this.second = second; } public String getFirst() { return first; } public void setFirst(String first) { this.first = first; } }
(2)自定义分区类
新建自定义分区类MyPartitioner.java,该类需要继承Hadoop提供的org.apache.hadoop.mapreduce.Partitioner类,并实现其中的抽象方法getPartition()。Partitioner类是一个抽象泛型类,用于控制对Map任务输出结果的分区,泛型的两个参数分别表示<key,value>对中key的类型和value的类型。Partitioner类的源码如下:
/** * 分区类Partitioner */ public abstract class Partitioner<KEY, VALUE> { /** * 得到分区编号 * * @param key:需要分区的<key,value>对中的key * @param value:需要分区的<key,value>对中的value * @param numPartitions:分区数量(与Reduce任务数量相同) * @return 分区编号 */ public abstract int getPartition(KEY key, VALUE value, int numPartitions); }
关于MapReduce的分区规则可参考5.1.3节的MapReduce工作原理,此处不再赘述。
自定义分区类MyPartitioner.java的源码如下:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.Partitioner; /** * 自定义分区类 */ public class MyPartitioner extends Partitioner<MyKeyPair, IntWritable> { /** * 实现抽象方法getPartition(),自定义分区字段 * @param myKeyPair:<key,value>对中key的类型 * @param value:<key,value>对中value的类型 * @param numPartitions:分区的数量(等于Reduce任务数量) * @return 分区编号 */ public int getPartition(MyKeyPair myKeyPair, IntWritable value, int numPartitions) { //将第一个字段作为分区字段 return (myKeyPair.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } }
上述代码继承Partitioner类的同时指定了<key,value>对中key的类型为MyKeyPair,value的类型为IntWritable。
(3)自定义分组类
新建自定义分组类MyGroupComparator.java,该类需要继承Hadoop提供的org.apache.hadoop.io.WritableComparator类,并重写其中的compare()方法,以实现按照指定的字段进行分组。
自定义分组类MyGroupComparator.java的源码如下:
import org.apache.hadoop.io.WritableComparator; /** * 自定义分组类 */ public class MyGroupComparator extends WritableComparator { protected MyGroupComparator() { //指定分组<key,value>对中key的类型,true为创建该类型的实例,若不指定类型将报空值错误 super(MyKeyPair.class, true); } //重写compare()方法,以第一个字段进行分组 public int compare(MyKeyPair o1, MyKeyPair o2) { return o1.getFirst().compareTo(o2.getFirst()); } }
上述代码首先通过构造方法指定了<key,value>对中key的类型为MyKeyPair,由于MapReduce默认以<key,value>对中的key值进行分组,因此接下来重写了compare()方法,实现了按照MyKeyPair对象中的first字段进行对比,若值相等则会将当前<key,value>对分为一组。
(4)定义Mapper类
新建Mapper类MyMapper.java,将输入的数据封装为<MyKeyPair, IntWritable>形式的<key,value>对进行输出,即输出的key的类型为MyKeyPair,value的类型为IntWritable。
Mapper类MyMapper.java的源码如下:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.StringTokenizer; /** * 定义Mapper类 */ public class MyMapper extends Mapper<LongWritable, Text, MyKeyPair, IntWritable> { /** * 重写map()方法 */ public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //将输入的一行数据默认按空格、制表符\t、换行符\n、回车符\r进行分割,也可以加入一个参数指定 分隔符 StringTokenizer itr = new StringTokenizer(line); String first = itr.nextToken();//得到第一个字段值 String second = itr.nextToken();//得到第二个字段值 //设置组合key和value ==> <(key,value),value> //设置MyKeyPair类型的输出key MyKeyPair outKey = new MyKeyPair(); outKey.setFirst(first); outKey.setSecond(Integer.valueOf(second)); //设置IntWritable类型的输出value IntWritable outValue = new IntWritable(); outValue.set(Integer.valueOf(second)); //输出<key,value>对 context.write(outKey, outValue); } }
(5)定义Reducer类
新建Reducer类MyReducer.java,将接收到的分组后的<key,value-list>对进行循环输出。
Reducer类MyReducer.java的源码如下:
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * 定义Reducer类 */ public class MyReducer extends Reducer<MyKeyPair, IntWritable, Text, IntWritable> { /** * 重写reduce()方法 */ public void reduce(MyKeyPair key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定义Text类型的输出key Text outKey = new Text(); //循环输出<key,value>对 for (IntWritable value : values) { outKey.set(key.getFirst()); context.write(outKey, value); } } }
上述代码将MyKeyPair类型的key中的first字段值作为输出的key,输出的value从集合values中进行遍历。
(6)定义应用程序主类
新建应用程序主类MySecondSortApp.java,在该类中需要指定自定义的分区类和分组类,同时需要显式设置Map任务输出的key和value的类型。
应用程序主类MySecondSortApp.java的源码如下:
上述代码解析如下:
设置map()方法输出的key和value的类型。若省略,则默认采用 中设置的输出类型。也就是说,若map()方法和reduce()方法的输出类型一致,可以省略对map()方法输出类型的设置。若map()方法和reduce()方法实际的输出类型与此处的设置不匹配,则程序运行过程中将会报错。
在MapReduce程序运行的过程中会通过JobConf类获取map()方法的输出类型,获取map()方法输出key的类型的源码如下:
public Class<?> getMapOutputKeyClass() { Class<?> retv = this.getClass("mapreduce.map.output.key.class", (Class)null, Object.class); if (retv == null) {//没有设置map()的输出类型 retv = this.getOutputKeyClass(); } return retv; }
从上述源码可以看出,当没有设置map()方法的输出类型时,会调用getOutputKeyClass()方法使用reduce()方法的输出类型。
在执行MapReduce程序时,会首先从HDFS中读取数据块,然后按行拆分成<key,value>对,这个过程是由TextInputFormat类完成的。TextInputFormat类继承了抽象类FileInputFormat<K, V>,而FileInputFormat<K, V>又继承了抽象类InputFormat<K, V>。抽象类InputFormat<K, V>中定义了两个方法:getSplits()和createRecordReader()。getSplits()方法负责将HDFS数据解析为InputSplit集合,createRecordReader()方法负责将一个InputSplit解析为一个<key,value>对记录。抽象类InputFormat<K, V>的源码如下:
public abstract class InputFormat<K, V> { public InputFormat() { } public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException; public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException; }
程序的打包和执行参考前面的“单词计数”和“数据去重”案例,此处不再赘述。
执行完成后,查看执行结果,如图5-11所示。
图5-11 查看二次排序程序的执行结果