



Partitioner分区编程的主要功能是将不同的分类输出到不同的文件中。这样在查询数据时,可以根据某个规定的类型查询相关的数据。
使用Partitioner分区必须给Job设置以下两个参数。
设置Reducer的数量,默认为1:
job.setNumReduceTasks(2);
设置Paratiner类:
job.setPartitionerClass(Xxxx.class);
Map的结果会通过Partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat输出结果,如图4-6所示。
图4-6
Mapper最终处理的键值对<key, value>,需要送到Reducer中去合并,合并的时候,有相同key的键值对会输送到同一个Reducer上。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一种方法:
getPartition(Text key, Text value, int numPartitions)
输入参数是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)的数量。就是指定Mapper输出的键值对到哪一个Reducer上去。系统默认的Partitioner是HashPartitioner,它以Key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的Key值,肯定会被分配到同一个Reducer上。如果有N个Reducer,编号就为0,1,2,3,…,(N-1)。
在执行job之前,设置Reduce的个数:
job.setNumReduceTasks(2);
则默认会根据Key的Hash值,将数据分别输出到2个文件中。默认使用HashPartitioner类源代码如下:
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
public void configure(JobConf job) {}
/** Use {@link Object#hashCode()} to partition. */
public int getPartition(K2 key, V2 value,
int numReduceTasks) {
return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
通过上面的代码算法可知:key.hashCode & Integer.MAX_VALUE是将数据转成正数,然后与设置的Reduce的个数取模。
现在先来做一个测试,在【代码4-4】中,我们先修改Reducer的个数,给job添加Reducer个数的代码如下,其中“…”为前后省略的代码。
...
FileOutputFormat.setOutputPath(job, dest);
//设置Reducer的个数
job.setNumReduceTasks(2);
boolean b = job.waitForCompletion(true);
...
然后再重新执行【代码4-4】,查看输出结果,输出的文件成为两个,且这两个文件中的内容不会相同。
part-r-00000
part-r-00001
现在我们可以自己开发Partitioner,修改默认使用hashCode这个规则。比如我们根据字符的个数进行输出,将字符个数大于25的输出到一个文件中,小于等于25个字符的输出到另一个文件中。现在开发Partitioner类的继承类如【代码4-5】所示。第7行代码通过判断每一行的字符个数,将数据保存到不同的文件中,通过返回int的值,可以区分保存的文件。
【代码4-5】LineCharCountPartitioner.java
1. package org.hadoop.writable;
2. import org.apache.hadoop.io.NullWritable;
3. import org.apache.hadoop.mapreduce.Partitioner;
4. public class LineCharCountPartitioner extends Partitioner<LineCharCountW
ritable, NullWritable> {
5. @Override
6. public int getPartition(LineCharCountWritable key, NullWritable value,
int i) {
7. if(key.getCount()>25){
8. return 1;
9. }else{
10. return 0;
11. }
12. }
13. }
然后将自定义的Partitioner设置到job中去即可:
job.setNumReduceTasks(2);
job.setPartitionerClass(LineCharCountPartitioner.class);
运行并查看结果,发现25个字符以下的数据输入到part-r-00000文件中,25个字符以上的数据输出到part-r-00001文件中去了。