本节将演示如何编写Spark应用来实现词频统计的功能。
想要实现Spark应用程序的开发,可以使用Scala、Java或Python等编程语言来实现。这取决于个人的喜好,选择你擅长的语言即可。
本书所有示例主要采用Scala或Java语言来编写。本节示例是通过Java语言来编写的。
本例将使用Maven来管理应用。该应用pom.xml文件的内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.waylau.spark</groupId> <artifactId>spark-java-samples</artifactId> <version>1.0.0</version> <name>spark-java-samples</name> <packaging>jar</packaging> <organization> <name>waylau.com</name> <url>https://waylau.com</url> </organization> <properties> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <scala.version>2.13</scala.version> <spark.version>3.5.1</spark.version> </properties> <dependencies> <!-- Spark dependency --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> <scope>provided</scope> </dependency> </dependencies> </project>
应用编译完成之后,会生成一个名为spark-java-samples-1.0.0.jar的JAR文件。需要注意的是,Spark的依赖使用了provided,这意味着Spark依赖项不需要捆绑到应用程序JAR中,因为它们是由集群管理器在运行时提供的。
创建一个实现词频统计功能的应用JavaWordCountSample,代码如下:
package com.waylau.spark.java.samples.rdd; import java.util.Arrays; import java.util.List; import java.util.regex.Pattern; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.SparkSession; import scala.Tuple2; /** * Java Word Count sample * * @author <a href="https://waylau.com">Way Lau</a> * @since 2024-05-28 */ public class JavaWordCountSample { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) throws Exception { // 判断输入参数 if (args.length < 1) { System.err.println("Usage: JavaWordCount <file>"); System.exit(1); } // 初始化SparkSession SparkSession sparkSession = SparkSession .builder() // 设置应用名称 .appName("JavaWordCountSample") .getOrCreate(); // 读取文件内容,并将其转换为RDD结构的文本 JavaRDD<String> lines = sparkSession.read().textFile(args[0]).javaRDD(); // 将文本行按照空格作为分隔,转换成一个单词列表 JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)). iterator()); // 将列表中的每个单词作为键、1作为值创建JavaPairRDD JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1)); // 将相同键的值进行累加,从而得出每个单词出现的次数,即词频 JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2); // 收集结果,并打印 List<Tuple2<String, Integer>> output = counts.collect(); for (Tuple2<?, ?> tuple : output) { System.out.println(tuple._1() + ": " + tuple._2()); } // 关闭SparkSession sparkSession.stop(); } }
上述代码是一个采用Java编写的Spark程序,实现了典型的“词频统计”功能。
· SparkSession.builder()用于初始化SparkSession。SparkSession为用户提供了一个统一的切入点来使用Spark的各项功能。
· sparkSession.read().textFile(args[0]).javaRDD()方法用于读取文件内容,并将其转换为RDD结构的文本行。
· lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator())将文本行按照空格作为分隔,转换成一个单词列表。
· words.mapToPair(s -> new Tuple2<>(s, 1))将列表中的每个单词作为键、1作为值创建JavaPairRDD。
· ones.reduceByKey((i1, i2) -> i1 + i2)将相同键的值进行累加,从而得出了每个单词出现的次数,即词频。
· counts.collect()将结果收集并转换为List,最终将List的元素逐一打印出来。
应用程序执行完成之后,需要调用SparkSession的stop()方法来关闭与Spark集群的连接。
准备一份数据文件提供给JavaWordCountSample程序使用。数据文件JavaWordCountData.txt的内容如下:
You say that you love rain but you open your umbrella when it rains You say that you love the sun but you find a shadow spot when the sun shines You say that you love the wind But you close your windows when wind blows This is why I am afraid You say that you love me too
可以通过执行spark-submit命令来提交运行任务给Spark集群执行,命令如下:
spark-submit --class com.waylau.spark.java.samples.rdd.JavaWordCountSample spark-java-samples-1.0.0.jar JavaWordCountData.txt
命令说明:
· com.waylau.spark.java.samples.rdd.JavaWordCountSample为应用的主入口。
· spark-java-samples-1.0.0.jar为应用程序的编译文件。
· JavaWordCountData.txt作为应用程序的入参,也就是我们要统计的词频的文件。
最终,JavaWordCountSample程序词频统计的结果如下:
find: 1 spot: 1 it: 1 is: 1 But: 1 you: 7 shines: 1 shadow: 1 afraid: 1 rain: 1 that: 4 a: 1 am: 1 You: 4 say: 4 love: 4 when: 3 I: 1 wind: 2 This: 1 rains: 1 why: 1 umbrella: 1 blows: 1 but: 2 close: 1 sun: 2 too: 1 windows: 1 open: 1 your: 2 me: 1 the: 3