上节成功安装完Spark 3.0单机版,下面开始ML的学习,这是我们学习Spark的第一步。
经典的wordCount(统计文章中的词频)是MapReduce入门必看的例子,可以称为分布式框架的Hello World,也是大数据处理人员必须掌握的入门技能:考察对基本Spark语法的运用,还是一个简单的自然语言处理程序。源代码参照Spark官网的wordCount中的示例代码,实现了text文件的单词计数功能,单词之间按照空格来区分,形式如图2-46所示。
图2-46 wordCount统计流程
首先是数据的准备工作。这里为了简化起见,采用小数据集(本书将以小数据为主,演示Spark机器学习的使用和原理)。
在C盘创建名为wc.txt的文本文件(数据位置://DATA//D02//wc.txt),文件名也可以自行设置,内容如下:
good bad cool hadoop spark mllib good spark mllib cool spark bad
这是需要计数的数据内容,我们需要计算出文章中每个单词出现的次数,Spark代码如程序2-3所示。
代码位置://SRC//C02//wordCount.scala
程序2-3 Spark代码
下面是对程序进行分析。
(1)首先创建一个SparkSession(),目的是创建一个会话变量实例,告诉系统开始Spark计算。之后的master("local")启动本地化运算、appName("wordCount")设置本程序名称。
(2)getOrCreate()的作用是创建环境变量实例,准备开始任务。
(3)spark.read.text("c://wc.txt")的作用是读取文件。顺便提一下,此时的文件读取是按照正常顺序读取,本书后面章节会介绍如何读取特定格式的文件。这种形式读出来的格式为Spark DataFrame,并非之前的RDD形式。
(4)flatMap()是Scala中提取相关数据按行处理的一个方法。在_.split(" ")中,_是一个占位符,代表传送进来的任意一个数据,对其按" "分割。map((_, 1))对每个字符进行计数,在这个过程中并不涉及合并和计算,只是单纯地将每个数据行中的单词加1。最后的reduceByKey方法对传递进来的数据按key值相加,最终形成wordCount计算结果。
目前程序流程如图2-47所示。
图2-47 wordCount流程图
(5)collect()对程序进行启动。因为Spark编程的优化,很多方法在计算过程中属于lazy模式,操作是延迟计算的,需要等到有Action操作的时候,才会真正触发运算,所以需要一个显性启动支持。foreach(println)是打印的一个调用方法,打印出数据内容。
具体打印结果如下:
(cool,2) (spark,3) (hadoop,1) (bad,2) (good,2) (mllib,2)
可以与Spark对比的是MapReduce中wordCount程序的设计,如程序2-4所示。笔者只是为了做对比,如果有读者想深入学习MapReduce程序设计,请参考相关的专业图书。
代码位置://SRC//C02//wordCount.java
程序2-4 MapReduce中wordCount程序的设计
import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.TextOutputFormat; public class wordCount { public static class Map extends MapReduceBase implements //创建固定Map格式 Mapper<LongWritable, Text, Text, IntWritable> { //创建数据1格式 private final static IntWritable one = new IntWritable(1); //设定输入格式 private Text word = new Text(); //开始Map程序 public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { //将传入值定义为line String line = value.toString(); //格式化传入值 StringTokenizer tokenizer = new StringTokenizer(line); //开始迭代计算 while (tokenizer.hasMoreTokens()) { //设置输入值 word.set(tokenizer.nextToken()); //写入输出值 output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements //创建固定Reduce格式 Reducer<Text, IntWritable, Text, IntWritable> { //开始Reduce程序 public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { //初始化计算器 int sum = 0; //开始迭代计算输入值 while (values.hasNext()) { sum += values.next().get(); //计数器计算 } //创建输出结果 output.collect(key, new IntWritable(sum)); } } //开始主程序 public static void main(String[] args) throws Exception { //设置主程序 JobConf conf = new JobConf(wordCount.class); //设置主程序名 conf.setJobName("wordcount"); //设置输出Key格式 conf.setOutputKeyClass(Text.class); //设置输出Value格式 conf.setOutputValueClass(IntWritable.class); //设置主Map conf.setMapperClass(Map.class); //设置第一次Reduce方法 conf.setCombinerClass(Reduce.class); //设置主Reduce方法 conf.setReducerClass(Reduce.class); //设置输入格式 conf.setInputFormat(TextInputFormat.class); //设置输出格式 conf.setOutputFormat(TextOutputFormat.class); //设置输入文件路径 FileInputFormat.setInputPaths(conf, new Path(args[0])); //设置输出路径 FileOutputFormat.setOutputPath(conf, new Path(args[1])); //开始主程序 JobClient.runJob(conf); } }
Scala于2001年由瑞士洛桑联邦理工学院(EPFL)编程方法实验室研发,由Martin Odersky(马丁·奥德斯基)创建。Spark采用Scala程序设计,能够简化程序编写的过程与步骤,同时在后端对编译后的文件有较好的优化,更容易表达思路。这些都是目前使用Java语言所欠缺的。
实际上,Scala在使用中主要进行整体化考虑,而非Java的面向对象的思考方法,这一点请读者注意。