Partitioner分区编程的主要功能是将不同的分类输出到不同的文件中。这样在查询数据时,可以根据某个规定的类型查询相关的数据。
使用Partitioner分区必须给Job设置以下两个参数。
设置Reducer的数量,默认为1:
job.setNumReduceTasks(2);
设置Paratiner类:
job.setPartitionerClass(Xxxx.class);
Map的结果会通过Partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat输出结果,如图4-6所示。
图4-6
Mapper最终处理的键值对<key, value>,需要送到Reducer中去合并,合并的时候,有相同key的键值对会输送到同一个Reducer上。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一种方法:
getPartition(Text key, Text value, int numPartitions)
输入参数是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)的数量。就是指定Mapper输出的键值对到哪一个Reducer上去。系统默认的Partitioner是HashPartitioner,它以Key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的Key值,肯定会被分配到同一个Reducer上。如果有N个Reducer,编号就为0,1,2,3,…,(N-1)。
在执行job之前,设置Reduce的个数:
job.setNumReduceTasks(2);
则默认会根据Key的Hash值,将数据分别输出到2个文件中。默认使用HashPartitioner类源代码如下:
@InterfaceAudience.Public @InterfaceStability.Stable public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> { public void configure(JobConf job) {} /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; } }
通过上面的代码算法可知:key.hashCode & Integer.MAX_VALUE是将数据转成正数,然后与设置的Reduce的个数取模。
现在先来做一个测试,在【代码4-4】中,我们先修改Reducer的个数,给job添加Reducer个数的代码如下,其中“…”为前后省略的代码。
... FileOutputFormat.setOutputPath(job, dest); //设置Reducer的个数 job.setNumReduceTasks(2); boolean b = job.waitForCompletion(true); ...
然后再重新执行【代码4-4】,查看输出结果,输出的文件成为两个,且这两个文件中的内容不会相同。
part-r-00000 part-r-00001
现在我们可以自己开发Partitioner,修改默认使用hashCode这个规则。比如我们根据字符的个数进行输出,将字符个数大于25的输出到一个文件中,小于等于25个字符的输出到另一个文件中。现在开发Partitioner类的继承类如【代码4-5】所示。第7行代码通过判断每一行的字符个数,将数据保存到不同的文件中,通过返回int的值,可以区分保存的文件。
【代码4-5】LineCharCountPartitioner.java
1. package org.hadoop.writable; 2. import org.apache.hadoop.io.NullWritable; 3. import org.apache.hadoop.mapreduce.Partitioner; 4. public class LineCharCountPartitioner extends Partitioner<LineCharCountW ritable, NullWritable> { 5. @Override 6. public int getPartition(LineCharCountWritable key, NullWritable value, int i) { 7. if(key.getCount()>25){ 8. return 1; 9. }else{ 10. return 0; 11. } 12. } 13. }
然后将自定义的Partitioner设置到job中去即可:
job.setNumReduceTasks(2); job.setPartitionerClass(LineCharCountPartitioner.class);
运行并查看结果,发现25个字符以下的数据输入到part-r-00000文件中,25个字符以上的数据输出到part-r-00001文件中去了。