购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

2.3 实战:通过Spark进行词频统计

本节将演示如何编写Spark应用来实现词频统计的功能。

想要实现Spark应用程序的开发,可以使用Scala、Java或Python等编程语言来实现。这取决于个人的喜好,选择你擅长的语言即可。

本书所有示例主要采用Scala或Java语言来编写。本节示例是通过Java语言来编写的。

2.3.1 初始化应用

本例将使用Maven来管理应用。该应用pom.xml文件的内容如下:

     <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.waylau.spark</groupId>
        <artifactId>spark-java-samples</artifactId>
        <version>1.0.0</version>
        <name>spark-java-samples</name>
        <packaging>jar</packaging>
        <organization>
            <name>waylau.com</name>
            <url>https://waylau.com</url>
        </organization>

        <properties>
            <maven.compiler.source>17</maven.compiler.source>
            <maven.compiler.target>17</maven.compiler.target>
            <scala.version>2.13</scala.version>
            <spark.version>3.5.1</spark.version>
        </properties>
        <dependencies>
            <!-- Spark dependency -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>

     </project>

应用编译完成之后,会生成一个名为spark-java-samples-1.0.0.jar的JAR文件。需要注意的是,Spark的依赖使用了provided,这意味着Spark依赖项不需要捆绑到应用程序JAR中,因为它们是由集群管理器在运行时提供的。

2.3.2 创建Spark应用程序

创建一个实现词频统计功能的应用JavaWordCountSample,代码如下:

     package com.waylau.spark.java.samples.rdd;

     import java.util.Arrays;
     import java.util.List;
     import java.util.regex.Pattern;

     import org.apache.spark.api.java.JavaPairRDD;
     import org.apache.spark.api.java.JavaRDD;
     import org.apache.spark.sql.SparkSession;

     import scala.Tuple2;

     /**
      * Java Word Count sample
      *
      * @author <a href="https://waylau.com">Way Lau</a>
      * @since 2024-05-28
      */
     public class JavaWordCountSample {
        private static final Pattern SPACE = Pattern.compile(" ");

        public static void main(String[] args) throws Exception {
            // 判断输入参数
            if (args.length < 1) {
                System.err.println("Usage: JavaWordCount <file>");
                System.exit(1);
            }

            // 初始化SparkSession
            SparkSession sparkSession = SparkSession
                   .builder()
                   // 设置应用名称
                   .appName("JavaWordCountSample")
                   .getOrCreate();

            // 读取文件内容,并将其转换为RDD结构的文本
            JavaRDD<String> lines = sparkSession.read().textFile(args[0]).javaRDD();

            // 将文本行按照空格作为分隔,转换成一个单词列表
            JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).
iterator());

            // 将列表中的每个单词作为键、1作为值创建JavaPairRDD
            JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));

            // 将相同键的值进行累加,从而得出每个单词出现的次数,即词频
            JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);

            // 收集结果,并打印
            List<Tuple2<String, Integer>> output = counts.collect();
            for (Tuple2<?, ?> tuple : output) {
                System.out.println(tuple._1() + ": " + tuple._2());
            }

            // 关闭SparkSession
            sparkSession.stop();

        }
     }
 

上述代码是一个采用Java编写的Spark程序,实现了典型的“词频统计”功能。

· SparkSession.builder()用于初始化SparkSession。SparkSession为用户提供了一个统一的切入点来使用Spark的各项功能。

· sparkSession.read().textFile(args[0]).javaRDD()方法用于读取文件内容,并将其转换为RDD结构的文本行。

· lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator())将文本行按照空格作为分隔,转换成一个单词列表。

· words.mapToPair(s -> new Tuple2<>(s, 1))将列表中的每个单词作为键、1作为值创建JavaPairRDD。

· ones.reduceByKey((i1, i2) -> i1 + i2)将相同键的值进行累加,从而得出了每个单词出现的次数,即词频。

· counts.collect()将结果收集并转换为List,最终将List的元素逐一打印出来。

应用程序执行完成之后,需要调用SparkSession的stop()方法来关闭与Spark集群的连接。

2.3.3 准备数据文件

准备一份数据文件提供给JavaWordCountSample程序使用。数据文件JavaWordCountData.txt的内容如下:

     You say that you love rain
     but you open your umbrella when it rains
     You say that you love the sun
     but you find a shadow spot when the sun shines
     You say that you love the wind
     But you close your windows when wind blows
     This is why I am afraid
     You say that you love me too

2.3.4 运行程序

可以通过执行spark-submit命令来提交运行任务给Spark集群执行,命令如下:

     spark-submit --class com.waylau.spark.java.samples.rdd.JavaWordCountSample
spark-java-samples-1.0.0.jar JavaWordCountData.txt

命令说明:

· com.waylau.spark.java.samples.rdd.JavaWordCountSample为应用的主入口。

· spark-java-samples-1.0.0.jar为应用程序的编译文件。

· JavaWordCountData.txt作为应用程序的入参,也就是我们要统计的词频的文件。

最终,JavaWordCountSample程序词频统计的结果如下: PMa36shIoulEwzRkWkVUxHThUil51Mi4ASjPWInUII5vPnLLvDCv/dn9zdvKzlV/

     find: 1
     spot: 1
     it: 1
     is: 1
     But: 1
     you: 7
     shines: 1
     shadow: 1
     afraid: 1
     rain: 1
     that: 4
     a: 1
     am: 1
     You: 4
     say: 4
     love: 4
     when: 3
     I: 1
     wind: 2
     This: 1
     rains: 1
     why: 1
     umbrella: 1
     blows: 1
     but: 2
     close: 1
     sun: 2
     too: 1
     windows: 1
     open: 1
     your: 2
     me: 1
     the: 3
点击中间区域
呼出菜单
上一章
目录
下一章
×