创建环境之后,就可以构建数据处理的业务逻辑了,如图5-2所示,本节将主要讲解Flink的数据源(Source)。想要处理数据,先得有数据,所以首要任务就是把数据读进来。
图5-2 数据源(Source)
Flink可以从各种来源获取数据,然后构建DataStream进行转换处理。一般将数据的输入来源称为数据源,而读取数据的算子就是源算子。所以,Source就是我们整个处理程序的输入端。
Flink代码中通用的添加Source的方式,是调用执行环境的addSource()方法:
方法传入一个对象参数,需要实现SourceFunction接口,返回一个DataStream。
这里可能会有些麻烦:传入的参数是一个“源函数”,需要实现SourceFunction接口。这是何方神圣,又该怎么实现呢?
自己去实现它显然不会是一件容易的事。好在Flink直接提供了很多预实现的接口,此外还有很多外部连接工具也帮我们实现了对应的SourceFunction接口,通常情况下足以应对我们的实际需求。接下来我们就详细展开讲解。
为了更好地理解,我们先构建一个实际应用场景。比如网站的访问操作,可以抽象成一个三元组(用户名,用户访问的url,用户访问url的时间戳),所以在这里,我们可以创建一个类Event,将用户行为包装成它的一个对象。Event包含了以下一些字段,如表5-1所示。
表5-1 Event类字段设计
具体代码如下:
这里需要注意,使用case class关键字创建的类称为样例类。样例类是一种特殊类,它可以用来快速定义一个用于保存数据的类(类似于Java的POJO类)。样例类有以下几个重要特点:
(1)样例类对象的创建不需要new,直接引用赋值即可。如Event("Mary","/home",1000L)的方式就可以直接创建一个Event对象。
(2)样例类为用户实现了重写后的toString()方法,对样例类对象进行打印会直接打印出类的属性信息而不是对象的引用地址。
(3)样例类还为用户实现了其他一些,如equals、copy、hashCode、unApply等方法。
最简单的读取数据的方式,就是在代码中直接创建一个集合,然后调用执行环境的fromCollection方法进行读取。相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。
我们也可以不构建集合,直接将元素列举出来,调用fromElements方法进行读取数据:
在实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式是读取日志文件。这也是批处理中最常见的读取方式。
说明:
· 参数可以是目录,也可以是文件。
· 路径可以是相对路径,也可以是绝对路径。
· 相对路径是从系统属性user.dir获取路径:在IDEA下是project的根目录,standalone模式下是集群节点根目录。
· 也可以从HDFS目录下读取,使用路径hdfs://...,由于Flink没有提供hadoop相关依赖,需要pom中添加相关依赖。
不论从集合还是文件读取数据,我们读取的其实都是有界数据。在流处理的场景中,数据往往是无界的。这时又从哪里读取呢?
一个简单的方式,就是我们之前用到的读取socket文本流。这种方式由于吞吐量小、稳定性较差,一般也是用于测试。
那对于真正的流数据,实际项目应该怎样读取呢?
Kafka作为分布式消息传输队列,是一个高吞吐、易于扩展的消息系统。而消息队列的传输方式,恰恰和流处理是完全一致的。所以可以说Kafka和Flink是天生一对,是当前处理流式数据的双子星。在如今的实时流处理应用中,由Kafka进行数据的收集和传输,Flink进行分析计算,这样的架构已经成为众多企业的首选,如图5-3所示。
图5-3 Flink与Kafka
遗憾的是,与Kafka的连接比较复杂,Flink内部并没有提供预实现的方法。所以我们只能采用通用的调用addSource()方法的方式,传入一个SourceFunction的实现类了。
好在Kafka与Flink非常契合,所以Flink官方提供了连接工具flink-connector-kafka,直接帮我们实现了一个消费者FlinkKafkaConsumer,它就是用来读取Kafka数据的SourceFunction。
所以想要以Kafka作为数据源获取数据,我们只需要引入Kafka连接器的依赖即可。Flink官方提供的是一个通用的Kafka连接器,它会自动跟踪最新版本的Kafka客户端。目前,最新版本只支持0.10.0版本以上的Kafka,读者使用时可以根据自己安装的Kafka版本选定连接器的依赖版本。这里我们需要导入的依赖如下。
然后调用env.addSource(),传入FlinkKafkaConsumer的对象实例即可。
创建FlinkKafkaConsumer时需要传入以下三个参数。
· 第一个参数topic,定义了从哪些主题中读取数据。可以是一个topic,也可以是topic列表,还可以是匹配所有想要读取的topic的正则表达式。当从多个topic中读取数据时,Kafka连接器将会处理所有topic的分区,将这些分区的数据放到一条数据流中去。
· 第二个参数是一个DeserializationSchema或者KeyedDeserializationSchema。Kafka消息被存储为原始的字节数据,所以需要反序列化成Java或者Scala对象。上面代码中使用的SimpleStringSchema,是一个内置的DeserializationSchema,它只是将字节数组简单地反序列化成字符串。DeserializationSchema和KeyedDeserializationSchema是公共接口,所以我们也可以自定义反序列化逻辑。
· 第三个参数是一个Properties对象,设置了Kafka客户端的一些属性。
大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,我们想要读取的数据源来自某个外部系统,而Flink既没有预实现的方法,也没有提供连接器,又该怎么办呢?
那就只好自定义实现SourceFunction接口了。
接下来我们创建一个自定义的数据源,实现SourceFunction接口。主要重写两个关键方法:run()和cancel()。
· run()方法:使用运行时上下文对象(SourceContext)向下游发送数据。
· cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果。
代码如下:
我们先来自定义一下数据源:
这个数据源,我们后面会频繁使用,之后的代码若涉及ClickSource数据源,使用上面的代码即可。
下面的代码我们来读取自定义的数据源。有了自定义的SourceFunction,接下来只要调用addSource()即可:
下面是完整的代码:
这里要注意的是SourceFunction接口定义的数据源,并行度只能设置为1,如果数据源设置为大于1的并行度,则会抛出异常。示例程序如下:
输出的异常如下:
所以如果我们想要自定义并行的数据源,需要使用ParallelSourceFunction,示例程序如下:
输出结果如下:
我们已经了解了Flink怎样从不同的来源读取数据。在之前的代码中,我们的数据都是定义好的Event类型,而且在第5.2.1节中特意说明了对这个类的要求。那么还有没有其他更灵活的类型可以使用呢?Flink支持的数据类型到底有哪些?
1.Flink的类型系统
为什么会出现“不支持”的数据类型呢?因为Flink作为一个分布式处理框架,处理的是以数据对象作为元素的流。如果用水流来类比,那么我们要处理的数据元素就是随着水流漂动的物体。在这条流动的河里,可能漂浮着小木块,也可能行驶着内部错综复杂的大船。要分布式地处理这些数据,就不可避免地要面对数据的网络传输、状态的落盘和故障恢复等问题,这就需要对数据进行序列化和反序列化。小木块是容易序列化的;而大船想要在序列化之后进行传输,就需要将它拆解、清晰地知道其中每个零件的类型。
为了方便地处理数据,Flink有一整套类型系统。Flink使用“类型信息”(TypeInformation)来统一表示数据类型。TypeInformation类是Flink中所有类型描述符的基类。它涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化器、反序列化器和比较器。
2.Flink支持的数据类型
简单来说,对于常见的Java和Scala数据类型,Flink都是支持的。Flink对支持不同的类型进行了划分,这些类型可以在Types工具类中找到。
(1)基本类型。
所有Java基本类型及其包装类,再加上Void、String、Date、BigDecimal和BigInteger。
(2)数组类型。
基本类型数组(PRIMITIVE_ARRAY)和对象数组(OBJECT_ARRAY)。
(3)复合数据类型。
· Java元组类型(TUPLE):这是Flink内置的元组类型,是Java API的一部分。最多25个字段,也就是从Tuple0~Tuple25,不支持空字段。
· Scala样例类及Scala元组:不支持空字段。
· 行类型(ROW):可以认为是具有任意个字段的元组,并支持空字段。
· POJO:Flink自定义的,类似于Java bean模式的类。
(4)辅助类型。
Option、Either、List、Map等。
(5)泛型类型(GENERIC)。
Flink支持所有的Java类和Scala类。不过如果没有按照上面POJO类型的要求来定义,就会被Flink当作泛型类来处理。Flink会把泛型类型当作黑盒,无法获取它们内部的属性;它们也不是由Flink本身序列化的,而是由Kryo序列化的。
在这些类型中,元组类型和POJO类型最为灵活,因为它们支持创建复杂类型。而相比之下,POJO还支持在键的定义中直接使用字段名,这会让代码的可读性大大提升。所以,在项目实践中,往往会将流处理程序中的元素类型定为Flink的POJO类型。
Flink对POJO类型的要求如下:
· 类是公共的(public)和独立的(standalone,也就是说没有非静态的内部类)。
· 类有一个公共的无参构造方法。
· 类中的所有字段是public且非final修饰的;或者有一个公共的getter和setter方法,这些方法需要符合Java bean的命名规范。
Scala的样例类就类似于Java中的POJO类,所以我们看到,之前的Event,就是我们创建的一个Scala样例类,使用起来非常方便。