购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

2.2 经典的wordCount

上节成功安装完Spark 3.0单机版,下面开始ML的学习,这是我们学习Spark的第一步。

2.2.1 Spark 3.0实现wordCount

经典的wordCount(统计文章中的词频)是MapReduce入门必看的例子,可以称为分布式框架的Hello World,也是大数据处理人员必须掌握的入门技能:考察对基本Spark语法的运用,还是一个简单的自然语言处理程序。源代码参照Spark官网的wordCount中的示例代码,实现了text文件的单词计数功能,单词之间按照空格来区分,形式如图2-46所示。

图2-46 wordCount统计流程

首先是数据的准备工作。这里为了简化起见,采用小数据集(本书将以小数据为主,演示Spark机器学习的使用和原理)。

在C盘创建名为wc.txt的文本文件(数据位置://DATA//D02//wc.txt),文件名也可以自行设置,内容如下:

good bad cool
hadoop spark mllib
good spark mllib
cool spark bad

这是需要计数的数据内容,我们需要计算出文章中每个单词出现的次数,Spark代码如程序2-3所示。

代码位置://SRC//C02//wordCount.scala

程序2-3 Spark代码

下面是对程序进行分析。

(1)首先创建一个SparkSession(),目的是创建一个会话变量实例,告诉系统开始Spark计算。之后的master("local")启动本地化运算、appName("wordCount")设置本程序名称。

(2)getOrCreate()的作用是创建环境变量实例,准备开始任务。

(3)spark.read.text("c://wc.txt")的作用是读取文件。顺便提一下,此时的文件读取是按照正常顺序读取,本书后面章节会介绍如何读取特定格式的文件。这种形式读出来的格式为Spark DataFrame,并非之前的RDD形式。

(4)flatMap()是Scala中提取相关数据按行处理的一个方法。在_.split(" ")中,_是一个占位符,代表传送进来的任意一个数据,对其按" "分割。map((_, 1))对每个字符进行计数,在这个过程中并不涉及合并和计算,只是单纯地将每个数据行中的单词加1。最后的reduceByKey方法对传递进来的数据按key值相加,最终形成wordCount计算结果。

目前程序流程如图2-47所示。

图2-47 wordCount流程图

(5)collect()对程序进行启动。因为Spark编程的优化,很多方法在计算过程中属于lazy模式,操作是延迟计算的,需要等到有Action操作的时候,才会真正触发运算,所以需要一个显性启动支持。foreach(println)是打印的一个调用方法,打印出数据内容。

具体打印结果如下:

(cool,2)
(spark,3)
(hadoop,1)
(bad,2)
(good,2)
(mllib,2)

2.2.2 MapReduce实现wordCount

可以与Spark对比的是MapReduce中wordCount程序的设计,如程序2-4所示。笔者只是为了做对比,如果有读者想深入学习MapReduce程序设计,请参考相关的专业图书。

代码位置://SRC//C02//wordCount.java

程序2-4 MapReduce中wordCount程序的设计

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class wordCount {

  public static class Map extends MapReduceBase implements
   //创建固定Map格式
    Mapper<LongWritable, Text, Text, IntWritable> {
   //创建数据1格式
    private final static IntWritable one = new IntWritable(1);
   //设定输入格式
    private Text word = new Text();
   //开始Map程序
    public void map(LongWritable key, Text value,
           OutputCollector<Text, IntWritable> output, Reporter reporter)
           throws IOException {
      //将传入值定义为line
      String line = value.toString();
      //格式化传入值
      StringTokenizer tokenizer = new StringTokenizer(line);
      //开始迭代计算
      while (tokenizer.hasMoreTokens()) {
        //设置输入值
        word.set(tokenizer.nextToken());
        //写入输出值
        output.collect(word, one);
      }
    }
  }

   public static class Reduce extends MapReduceBase implements
     //创建固定Reduce格式
     Reducer<Text, IntWritable, Text, IntWritable> {
     //开始Reduce程序
     public void reduce(Text key, Iterator<IntWritable> values,
                OutputCollector<Text, IntWritable> output, Reporter reporter)
                throws IOException {
       //初始化计算器
       int sum = 0;
       //开始迭代计算输入值

       while (values.hasNext()) {
         sum += values.next().get();                //计数器计算
       }
       //创建输出结果
       output.collect(key, new IntWritable(sum));
    }
  }
 //开始主程序
  public static void main(String[] args) throws Exception {
     //设置主程序
     JobConf conf = new JobConf(wordCount.class);
     //设置主程序名
     conf.setJobName("wordcount");
     //设置输出Key格式
     conf.setOutputKeyClass(Text.class);
     //设置输出Value格式
     conf.setOutputValueClass(IntWritable.class);
     //设置主Map
     conf.setMapperClass(Map.class);
     //设置第一次Reduce方法
     conf.setCombinerClass(Reduce.class);
     //设置主Reduce方法
     conf.setReducerClass(Reduce.class);
    //设置输入格式
     conf.setInputFormat(TextInputFormat.class);
    //设置输出格式
     conf.setOutputFormat(TextOutputFormat.class);
    //设置输入文件路径
     FileInputFormat.setInputPaths(conf, new Path(args[0]));
     //设置输出路径

     FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    //开始主程序
     JobClient.runJob(conf);
  }
}

Scala于2001年由瑞士洛桑联邦理工学院(EPFL)编程方法实验室研发,由Martin Odersky(马丁·奥德斯基)创建。Spark采用Scala程序设计,能够简化程序编写的过程与步骤,同时在后端对编译后的文件有较好的优化,更容易表达思路。这些都是目前使用Java语言所欠缺的。

实际上,Scala在使用中主要进行整体化考虑,而非Java的面向对象的思考方法,这一点请读者注意。 9wBPQSG+biXEnSBWvbbX2n/eqnPSRz3SC+2BZu3efmk7fKqNTEjKRLLNZ88IxL2Q

点击中间区域
呼出菜单
上一章
目录
下一章
×