搭好项目框架,接下来就是我们的核心工作——填充代码。我们会用一个最简单的示例来说明Flink代码怎样编写:统计一段文字中每个单词出现的频次。这就是传说中的WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的Hello World。
我们首先在src/main路径下新建一个源码目录scala,本书源码将位于src/main/scala目录下。在这个目录下新建一个包,命名为com.atguigu.chapter02,在这个包下我们将编写Flink入门的WordCount程序。
我们已经知道,尽管Flink自身的定位是流式处理引擎,但它同样拥有批处理的能力。所以接下来,我们会针对不同的处理模式、不同的输入数据形式,分别讲述WordCount代码的实现。
对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一个文本文档,然后读取这个文件处理数据就可以了。
(1)在工程根目录下新建一个input文件夹,并在其下创建文本文件words.txt。
(2)在words.txt中输入一些文字,例如:
(3)在com.atguigu.chapter02包下新建Scala的单例对象(object)BatchWordCount,在静态main方法中编写测试代码。
我们进行单词频次统计的基本思路是:先逐行读入文件数据,然后将每行文字都拆分成单词,接着按照单词分组,统计每组数据的个数,就是对应单词的频次。
具体代码实现如下:
代码说明和注意事项:
① Flink在执行应用程序前应该获取执行环境对象,也就是运行时上下文环境。
② Flink同时提供了Java和Scala两种语言的API,有些类在两套API中名称是一样的,所以在引入包时,如果有Java和Scala两种选择,要注意选用对应开发语言的包。
③ 在导入类时,需要执行import org.apache.flink.api.scala._,这里使用下画线的方式导入类是因为要将该包下的所有隐式类型转换都导入。
④ 直接调用ExecutionEnvironment对象的readTextFile()方法,可以从文件中读取数据。
⑤ 我们的目标是将每个单词对应的个数都统计出来,所以调用flatMap()方法可以对一行文字进行分词转换。将文件中每行文字都拆分成单词后,要转换成(word,count)形式的二元组,初始count都为1。
⑥ 在分组时调用了groupBy()方法,它不能使用分组选择器,只能采用位置索引或类属性名称进行分组。
⑦ 在分组之后调用sum()方法进行聚合,同样只能指定聚合字段的位置索引或类属性名称。
(4)运行程序,控制台会打印出如下结果:
可以看到,我们将文档中的所有单词的频次全部统计出来,以二元组的形式在控制台打印输出了。
需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是将数据看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现,所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时将执行模式设为BATCH来进行批处理。
这样,DataSet API就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只要维护一套DataStream API就可以了。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。
我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用DataStream API来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。
DataStream API作为“数据流”的处理接口,又怎样处理批数据呢?
回忆上一章中我们讲到的在Flink的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流,所以批处理其实就可以看作有界流的处理。
对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。
下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。
1.读取文件
我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次,整体思路与之前的批处理非常类似,代码模式也基本一致。
(1)在com.atguigu.chapter02包下新建Scala的单例对象BoundedStreamWordCount,在静态main方法中编写测试代码。具体代码实现如下:
主要观察与批处理程序BatchWordCount的不同:
· 创建执行环境的不同,流处理程序使用的是StreamExecutionEnvironment。
· 每步处理转换之后,得到的数据对象类型都不同。
· 分组操作调用的是keyBy()方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key是什么。
· 代码末尾需要调用env的execute()方法,开始执行任务。
(2)运行程序,控制台输出结果如下:
我们可以看到,这与批处理的结果是完全不同的。批处理针对每个单词,只会输出一个最终的统计个数;而在流处理的打印结果中,“hello”这个单词每出现一次,就会有一个频次统计数据输出。这就是流处理的特点,数据逐个处理,每来一条数据就会处理输出一次。我们通过打印结果,可以清晰地看到单词“hello”数量增长的过程。
看到这里大家可能又会有新的疑惑:我们读取文件,第一行应该是“hello flink”,怎么这里输出的第一个单词是“world”呢?每个输出的结果二元组,前面都有一个数字,这又是什么呢?
我们可以先做个简单的解释。Flink是一个分布式处理引擎,所以我们的程序应该也是分布式运行的。在开发环境里,会通过多线程来模拟Flink集群运行。这里结果前的数字,其实就指示了本地执行的不同线程,对应着Flink运行时不同的并行资源。这样第一个乱序的问题也就解决了:既然是并行执行的,不同线程的输出结果,自然也就无法保持输入的顺序了。
另外需要说明,这里显示的编号为1~4,是由于运行电脑的CPU是4核的,所以默认模拟的并行线程有4个。这段代码在不同的运行环境,得到的结果会是不同的。关于Flink程序并行执行的数量,可以通过设定“并行度”(Parallelism)来配置,我们会在后续章节详细讲解这些内容。
2.读取文本流
在实际的生产环境中,真正的数据流其实是无界的,有开始却没有结束,这就要求我们需要保持一个监听事件的状态,持续地处理捕获的数据。
为了模拟这种场景,我们就不再通过读取文件来获取数据了,而是监听数据发送端主机的指定端口,统计发送来的文本数据中出现过的单词的个数。在具体实现上,我们只要对BoundedStreamWordCount代码中读取数据的步骤稍做修改,就可以实现对真正无界流的处理。
(1)将BoundedStreamWordCount代码中读取文件数据的readTextFile()方法,替换成读取socket文本流的socketTextStream()方法。具体代码实现如下:
代码说明和注意事项如下:
· socket文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码中指定了主机“hadoop102”的7777端口作为发送数据的socket端口,读者可以根据测试环境自行配置。
· 在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者传入程序运行参数的方式来指定。
· socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。
(2)在Linux环境的主机hadoop102上,执行下列命令,发送数据进行测试。
(3)启动StreamWordCount程序。
我们会发现程序启动之后没有任何输出,也不会退出。这是正常的,因为Flink的流处理是事件驱动的,当前程序会一直处于监听状态,只有接收到数据才会执行任务、输出统计结果。
(4)从hadoop102发送如下数据:
可以看到控制台输出结果如下:
我们会发现,输出的结果与之前读取文件的流处理非常相似,而且可以非常明显地看到,每输入一条数据,就有一次对应的输出。具体对应关系是:输入“hello flink”,就会输出两条统计结果(flink,1)和(hello,1);之后再输入“hello world”,同样会将hello和world的个数统计输出,hello的个数会对应增长为2。