Hadoop的核心数据处理框架是MapReduce,该框架能为海量的数据提供计算处理。本节就MapReduce开发相关内容进行分析,包括使用IDEA开发工具搭建MapReduce环境以及通过源码认识MapReduce编程。
Hadoop框架是基于Java语言开发的,而IntelliJ IDEA是一个常用的Java集成开发工具,因此通常选用IntelliJ IDEA作为MapReduce的编程工具。为了能够成功地进行MapReduce编程,本节将首先在本机系统(通常为Windows系统)中安装Java,再安装IntelliJ IDEA工具,然后使用IntelliJ IDEA创建一个MapReduce工程,并配置MapReduce集成环境。
JDK是Java语言的软件开发工具包,主要用于移动设备、嵌入式设备上的Java应用程序。JDK是整个Java开发的核心,包含Java的运行环境、Java工具和Java基础的类库。因为本书后续的Hadoop开发是基于Java语言的,所以需要在Windows下安装JDK。具体安装步骤如下。
1)双击“jdk-8u281-windows-x64.exe”可执行文件,单击“下一步”按钮,如图2-63所示。
图2-63 安装JDK
2)更改安装目录。单击“更改”按钮,自主选定一个目录,如图2-64所示,等待JDK安装完成即可。
图2-64 选择JDK的安装目录
3)待JDK安装完毕时,系统将弹出安装JRE的提示窗口,根据自己的需要更改JRE的安装目录即可。需要注意的是,JDK和JRE的安装目录最好在同一个文件夹下,比如都在C:\Program Files\java\目录下,如图2-65所示,单击“下一步”按钮进行JRE的安装。JRE安装完成之后单击“关闭”按钮即可。
图2-65 选择JRE的安装目录
4)配置环境变量。
安装完Java后,需要在Windows系统配置环境变量,只有配置了环境变量,Java才能正常使用。
右键单击“此电脑”桌面快捷方式,选择“属性”选项,在出现的系统设置窗口中选择“高级系统设置”选项,进入“系统属性”对话框,单击“环境变量”按钮,出现“环境变量”对话框,如图2-66和图2-67所示。
图2-66 “系统属性”对话框
图2-67 “环境变量”对话框
新建JAVA_HOME变量,在变量值中输入JDK安装路径,如图2-68所示。
图2-68 新建JAVA_HOME变量
在“系统变量”中找到Path变量,单击“编辑”按钮,在弹出的“编辑环境变量”界面单击“新建”按钮,输入变量值“%JAVA_HOME%\bin”。再次单击“新建”按钮,输入变量值“%JAVA_HOME%\jre\bin”,最后单击“确定”按钮完成配置,如图2-69和图2-70所示。
图2-69 打开“Path”变量
图2-70 配置“Path”变量
在“系统变量”下方单击“新建”按钮,输入变量名CLASSPATH,再输入变量值“.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar”,如图2-71所示。
图2-71 配置“Path”变量
测试环境变量是否配置成功。打开“命令提示符”应用(在键盘上按住win+R,输入cmd,点击“确定”按钮),输入java -version命令查看Java版本,出现如图2-72所示的信息,说明安装配置成功。
图2-72 测试Java安装
IntelliJ IDEA是一个常用的Java集成开发工具,本书将使用IDEA作为Hadoop编程的开发工具。IDEA的下载和安装步骤如下。
1)下载安装包。在官网 https://www.jetbrains.com/ 可以下载IntelliJ IDEA的安装包ideaIC-2018.3.6.exe(Community版),社区版是免费开源的,当然也可以购买发行版。
2)安装IntelliJ IDEA。双击下载的安装包“ideaIC-2018.3.6.exe”,在弹出的界面中单击“Next”按钮,弹出如图2-73所示界面。设置好安装目录后,单击“Next”按钮。
图2-73 选择安装目录
弹出如图2-74所示的界面,单击“Finish”按钮完成安装。
图2-74 安装完成
3)启动IntelliJ IDEA。双击生成的桌面图标或选择“开始”→“JetBrains”→“IntelliJ IDEA Community Edition 2018.3.6”命令,运行IntelliJ IDEA,弹出询问是否导入以前设定的对话框,选择不导入,如图2-75所示,单击“OK”按钮进入下一步。
图2-75 询问是否导入以前设定的对话框
进入选择UI主题,可以选择白色或黑色背景,单击左下角的“Skip Remaining and Set Defaults”按钮,跳过其他设置并采用默认设置,如图2-76所示。
图2-76 选择UI主题
设置完成后将出现如图2-77所示的开始界面,说明安装成功。
图2-77 运行界面
安装好IntelliJ IDEA开发工具后,即可在IDEA中创建MapReduce工程。本节将使用IDEA创建Maven项目搭建MapReduce工程。Maven是Apache基金会下的一个顶级项目,是一个用Java编写的开源项目管理工具,用于对Java项目进行项目构建、依赖管理以及信息管理。使用Maven项目搭建MapReduce工程能够有效地对工程进行管理。具体工程搭建步骤如下。
1)如图2-77所示,单击“Create New Project”选项进入如图2-78所示界面,在左侧选择“Maven”选项,在右上方单击“New”选项,在弹出的选项框中选择“jdk1.8.0_281”Java JDK,单击“OK”按钮,再单击“Next”按钮。
图2-78 选择JDK
2)接着输入如图2-79所示内容,再单击“Next”按钮,进入如图2-80所示界面,选择工程要保存的位置,单击“Finish”按钮完成创建。
图2-79 输入工程名称
图2-80 保存工程位置
3)工程创建完成后其目录结构如图2-81所示。
图2-81 工程目录结构
虽然我们创建了MapReduce工程,但是该工程并不能运行MapReduce程序,因为没有配置MapReduce环境,程序找不到相关的Hadoop jar包。因此,我们还需要配置MapReduce环境,配置步骤如下。
1)创建好工程后,需要配置MapReduce环境。在图2-81所示工程界面中选择菜单栏中的“File”→“Project Structure”命令,快捷键为“Ctrl+Alt+Shift+S”,打开如图2-82所示界面。
图2-82 工程结构设置的弹窗界面
2)单击“Libraries”选项,单击右侧的“+”选项,在弹出的选项中单击“Java”选项,如图2-83所示。
图2-83 添加jar包
3)执行步骤2)后,将弹出如图2-84所示界面。在界面里选择要添加的jar包,这里需要将hadoop-3.1.4安装包的/share/hadoop目录下的全部jar包导入,再单击“OK”按钮。
图2-84 选择Hadoop jar包
4)全部jar包导入后,单击“Apply”按钮,再单击“OK”按钮即可添加完成,如图2-85所示。
图2-85 添加完成
在理解MapReduce的基本原理后,以词频统计为例,我们需要进一步了解MapReduce各部分的执行流程。词频统计的输入与输出内容如表2-11所示。
表2-11 词频统计的输入与输出
下面通过示意图的方式,依次分析Map阶段与Reduce阶段的处理过程。
键值对(Key-Value Pair)是一种数据格式,每个键都有一个对应的值。输入文件的每一行记录经过映射处理后输出为若干组键值对,如图2-86所示。在<Hello,1>中,Hello是键,1是值,因为需要统计单词的词频数,所以这里的1代表每个单词的初始频数。在Map阶段生成键值对后,提交中间输出结果给Reduce任务。
图2-86 Map阶段的处理过程
在Map输出与Reduce输入之间存在一个Shuffle过程。Shuffle过程也被称为数据混洗过程,作用是将键相同的键值对进行汇集,并将键相同的值存入同一列表中,如图2-87所示。例如,<World,1>与<World,1>经过Shuffle后生成了<World, <1,1>>。混洗后的键值对根据键(Key)进行排序。在Reduce阶段将处理所有的键值对数据,对键相同的值进行求和汇总(将各个单词对应的初始频数进行累加),得到每个单词的词频数,最后以<单词,词频>键值对的形式输出统计结果。
图2-87 Reduce阶段的处理过程
在实际编写一个MapReduce程序时,仅了解MapReduce程序的基本工作原理与执行流程是远远不够的,还需要掌握MapReduce编程的具体规范。Hadoop官方提供了一些示例源码,十分适合初学者学习,因此,接下来将以Hadoop官方提供的示例源码中的WordCount程序为例,进行代码级别的分析和说明。
首先需要获取WordCount的源代码。进入Hadoop 3.1.4安装目录下的“share\hadoop\mapreduce\sources”目录,解压hadoop-mapreduce- examples-3.1.4-sources.jar文件,在子目录“org/apache/hadoop/examples”中找到一个名称为“WordCount.java”的代码文件,即为WordCount程序的源代码,如图2-88所示。
图2-88 WordCount源代码的存储路径
使用文本编辑器或IDE工具(IntelliJ IDEA)打开代码文件WordCount,完整的源代码如代码清单2-31所示。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.util.StringTokenizer; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
WordCount的源代码十分简单,从结构上可以分为3个部分,分别是应用程序Driver模块、Mapper模块(执行Map任务)与Reducer模块(执行Reduce任务)。下面依次对3个部分代码块进行解读。
WordCount的Driver程序如图2-89所示。Driver程序是MapReduce程序的入口,主要是main方法。main方法中实现了MapReduce程序的一些初始化设置,包括任务提交并等待程序运行完成。
图2-89 Driver程序
1)代码第69行:初始化相关Hadoop配置,通过关键字new创建一个实例即可。
2)代码第75行:新建Job并设置主类。Job实例化需要两个参数,第一个参数是Configuration的实例对象conf,第二个参数jobName:"word count"指的是MapReduce任务的任务名称。
3)代码第77~79行:设置Mapper、Combiner、Reducer。这一部分的代码为固定写法,但可以修改里面的类名。一般情况下,括号里的类名为实际任务的Mapper、Combiner、Reducer。其中,Mapper与Reducer是必须设置的类,而Combiner是可选项。因为在这个示例中Combiner和Reducer的处理逻辑是完全相同的,所以在本例的词频统计中Combiner的设置与Reducer完全相同。关于Combiner的作用,将在第5章继续讲解。
4)代码第80~81行:设置输出键值对格式。在MapReduce任务中涉及4种键值对格式:Mapper输入键值对格式<K1,V1>,Mapper输出键值对格式<K2,V2>,Reducer输入键值对格式<K2,V2>,Reducer输出键值对格式<K3,V3>。当Mapper输出键值对格式<K2,V2>和Reducer输出键值对格式<K3,V3>一样时,可以只设置Reducer输出键值对的格式。关于输入与输出的键值对格式的选择,将在后文中进一步说明。
5)代码第82~86行:设置输入与输出路径。若有必要,则这里还可以增加对输入与输出文件格式的设置。
6)代码第87行:提交任务等待运行。提交MapReduce任务,并等待任务运行结束(为固定写法)。
综合应用程序Driver模块的代码块描述,可以总结出MapReduce任务初始化的通用代码,如代码清单2-32所示。开发者可以根据具体应用需求修改其中的参数,直接使用即可。
Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setCombinerClass(MyCombiner.class); job.setMapOutputKeyClass(MyMapKeyWritable.class); job.setMapOutputValueClass(MyMapValueWritable.class); job.setOutputKeyClass(MyKeyWritable.class); job.setOutputValueClass(MyValueWritable.class); job.setInputFormatClass(MyInputFormat.class); job.setOutputFormatClass(MyOutputFormat.class); for (int i = 0; i < args.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(args[i])); } FileOutputFormat.setOutputPath(job,new Path(args[args.length - 1])); job.waitForCompletion(true);
在MapReduce程序中,主要的代码实现包括Mapper模块中的map()方法以及Reducer模块中的reduce()方法。在WordCount源码中,Mapper模块对应源码中的TokenizerMapper类,如图2-90所示。
图2-90 Mapper代码
1)自定义TokenizerMapper类(代码第36~37行),该类需要继承Mapper父类,同时需要设置输入/输出键值对格式。其中输入键值对格式需要和输入格式设置的类读取生成的键值对格式匹配,而输出键值对格式需要和Driver模块中设置的Mapper类输出的键值对格式匹配。
2)重写Mapper模块中的map()方法(代码第39~49行)。Mapper类共有3个方法,分别是setup()、map()、cleanup()。若TokenizerMapper类要使用Mapper类的方法,则需要重写Mapper类里面的方法。
Mapper任务启动后首先执行setup()方法,该方法主要用于初始化工作。
map()方法用于针对每条输入键值对执行方法中定义的处理逻辑,并按规定的键值对格式输出。map()方法的代码实现要与实际业务逻辑挂钩,由开发者自行编写。因为实际业务需求是词频统计,所以处理时将每个输入键值对(键值对组成为<行的偏移量,行字符串>)的值(行字符串)按照分隔符进行分割,得到每个单词,再对每个单词进行处理,输出<单词,1>键值对形式的中间结果。
处理完所有键值对后,再调用cleanup()方法。cleanup()方法主要用于关闭资源等操作。
在WordCount源码中,Reducer模块对应源码中的IntSumReducer类,如图2-91所示。
图2-91 Reducer代码
1)自定义IntSumReducer类(代码第52~53行),该类需要继承Reducer父类,与Mapper类一样,需要设置输入/输出键值对格式。其中输入键值对格式需要和Mapper类的输出键值对格式保持一致,输出键值对格式需要和Driver模块中设置的输出键值对格式保持一致。
2)重写Reducer模块中的reduce()方法(代码第56~65行)。Reducer也有3个方法:setup()、cleanup()、reduce()。如果IntSumReducer类需要使用Reducer类中的方法,那么需要重写Reducer类中的方法。
setup()、cleanup()方法和Mapper类的同名方法功能一致,并且setup()方法也是在最开始执行一次,而cleanup()方法在最后执行一次。
核心部分是reduce()方法的实现。reduce()方法需要实现与实际业务相关的处理逻辑。reduce()方法需要根据相同键对应的列表值全部进行累加,最后输出<单词,词频>的键值对形式的结果。
通过对WordCount源代码的解读,可以使读者对使用MapReduce编程实现词频统计有更全面的认识。在进行MapReduce编程时,开发者主要实现Mapper与Reducer两个模块,其中包括定义输入/输出的键值对格式、编写map()与reduce()方法中定义的处理逻辑等。