购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

4.4
Partitioner分区编程

Partitioner分区编程的主要功能是将不同的分类输出到不同的文件中。这样在查询数据时,可以根据某个规定的类型查询相关的数据。

使用Partitioner分区必须给Job设置以下两个参数。

设置Reducer的数量,默认为1:

 
    job.setNumReduceTasks(2);

设置Paratiner类:

 
    job.setPartitionerClass(Xxxx.class);

Map的结果会通过Partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat输出结果,如图4-6所示。

图4-6

Mapper最终处理的键值对<key, value>,需要送到Reducer中去合并,合并的时候,有相同key的键值对会输送到同一个Reducer上。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一种方法:

 
    getPartition(Text key, Text value, int numPartitions)

输入参数是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)的数量。就是指定Mapper输出的键值对到哪一个Reducer上去。系统默认的Partitioner是HashPartitioner,它以Key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的Key值,肯定会被分配到同一个Reducer上。如果有N个Reducer,编号就为0,1,2,3,…,(N-1)。

在执行job之前,设置Reduce的个数:

 
    job.setNumReduceTasks(2);

则默认会根据Key的Hash值,将数据分别输出到2个文件中。默认使用HashPartitioner类源代码如下:

 
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
        public void configure(JobConf job) {}
        /** Use {@link Object#hashCode()} to partition. */
        public int getPartition(K2 key, V2 value,
                              int numReduceTasks) {
            return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
        }
    }

通过上面的代码算法可知:key.hashCode & Integer.MAX_VALUE是将数据转成正数,然后与设置的Reduce的个数取模。

现在先来做一个测试,在【代码4-4】中,我们先修改Reducer的个数,给job添加Reducer个数的代码如下,其中“…”为前后省略的代码。

 
    ...
    FileOutputFormat.setOutputPath(job, dest);
    //设置Reducer的个数
    job.setNumReduceTasks(2);
    boolean b = job.waitForCompletion(true);
    ...

然后再重新执行【代码4-4】,查看输出结果,输出的文件成为两个,且这两个文件中的内容不会相同。

 
    part-r-00000
    part-r-00001

现在我们可以自己开发Partitioner,修改默认使用hashCode这个规则。比如我们根据字符的个数进行输出,将字符个数大于25的输出到一个文件中,小于等于25个字符的输出到另一个文件中。现在开发Partitioner类的继承类如【代码4-5】所示。第7行代码通过判断每一行的字符个数,将数据保存到不同的文件中,通过返回int的值,可以区分保存的文件。

【代码4-5】LineCharCountPartitioner.java

 
    1.   package org.hadoop.writable;
    2.   import org.apache.hadoop.io.NullWritable;
    3.   import org.apache.hadoop.mapreduce.Partitioner;
    4.   public class LineCharCountPartitioner extends Partitioner<LineCharCountW
ritable, NullWritable> {
    5.       @Override
    6.     public int getPartition(LineCharCountWritable key, NullWritable value,
 int i) {
    7.           if(key.getCount()>25){
    8.               return 1;
    9.           }else{
    10.             return 0;
    11.         }
    12.     }
    13. }

然后将自定义的Partitioner设置到job中去即可:

 
    job.setNumReduceTasks(2);
job.setPartitionerClass(LineCharCountPartitioner.class);

运行并查看结果,发现25个字符以下的数据输入到part-r-00000文件中,25个字符以上的数据输出到part-r-00001文件中去了。 LocmTghD//Ytrznx5QXe9yFbc+EORcUgLMhHLsm6/IAZG9UWdPDpusCyzQ15PUu0

点击中间区域
呼出菜单
上一章
目录
下一章
×