Hadoop支持多种语言开发MapReduce程序,但是对Java语言最为支持,为其提供了很多方便的Java API接口。
那么如何使用Java来编写一个MapReduce程序呢?编写一个MapReduce程序需要新建三个类:Mapper类、Reducer类、程序执行主类。当然,Mapper类和Reducer类也可以作为内部类放在程序执行主类中。
新建一个自定义Mapper类MyMapper.java,该类需要继承MapReduce API提供的Mapper类并重写Mapper类中的map()方法,例如以下代码:
public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> { //重写map()方法 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //业务逻辑省略 } }
上述代码中的map()方法有三个参数,解析如下:
·LongWritable key:输入文件中每一行的起始位置。即从输入文件中解析出的<key,value>对中的key值。
·Text value:输入文件中每一行的内容。即从输入文件中解析出的<key,value>对中的value值。
·Context context:程序上下文。
MapReduce框架会自动调用map()方法并向其传入所需参数的值。每传入一个<key,value>对将调用一次map()方法。
Mapper是MapReduce提供的泛型类,继承Mapper需要传入四个泛型参数,前两个参数为输入key和value的数据类型,后两个参数为输出key和value的数据类型。
Mapper类的定义源码如下:
public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {}
新建一个自定义Reducer类MyReducer.java,该类需要继承MapReduce API提供的Reducer类并重写Reducer类中的reduce()方法,例如以下代码:
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> { //重写reduce()方法 public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //业务逻辑省略 } }
上述代码中的reduce ()方法有三个参数,解析如下:
·Text key:Map任务输出的key值。即接收到的<key,value-list>对中的key值。
·Iterable<IntWritable> values:Map任务输出的value值的集合(相同key的集合)。即接收到的<key,value-list>对中的value-list集合。
·Context context:程序上下文。
MapReduce框架会自动调用reduce()方法并向其传入所需参数的值。每传入一个<key,value-list>对将调用一次reduce()方法。
与Mapper类似,Reducer也是MapReduce提供的泛型类,继承Reducer同样需要传入四个泛型参数,前两个参数为输入key和value的数据类型,后两个参数为输出key和value的数据类型。
Reducer类的定义源码如下:
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {}
程序执行主类为MapReduce程序的入口类,主要用于启动一个MapReduce作业。
新建一个程序执行主类MyMRApplication.java,在该类的main()方法中添加任务的配置信息,并指定任务的自定义Mapper类和Reducer类,代码结构如下:
/**程序入口类**/ public class MyMRApplication { public static void main(String[] args) throws Exception{ //构建Configuration实例 Configuration conf = new Configuration(); //其他配置信息代码省略 //获得Job实例 Job job = Job.getInstance(conf, "My job name"); //其他job配置代码省略 //设置Mapper和Reducer处理类 job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); //设置输入和输出目录代码省略 //提交任务代码省略 } }
提交程序之前需要启动Hadoop集群,包括HDFS和YARN。因为HDFS存储了MapReduce程序的数据来源,而YARN则负责MapReduce任务的执行、调度以及集群的资源管理。
将包含自定义的Mapper类、Reducer类和程序执行主类的Java项目打包为jar包并上传到HDFS的NameNode节点,然后执行以下命令提交任务到Hadoop集群:
$ hadoop jar MyMRApplication.jar com.hadoop.mr. MyMRApplication
上述命令中的MyMRApplication.jar为程序打包后的jar文件,com.hadoop.mr为程序执行主类MyMRApplication.java所在的包的名称。
在5.6节的案例分析中,会对MapReduce的程序编写与任务提交进行详细讲解。