购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

3.1 DataFrame是什么

DataFrame实质上是存储在不同节点计算机中的一张关系型数据表。分布式存储最大的好处是:可以让数据在不同的工作节点(worker)上并行存储,以便在需要数据的时候并行运算,从而获得最迅捷的运行效率。

3.1.1 DataFrame与RDD的关系

RDD(Resilient Distributed Datasets)是一种分布式弹性数据集,将数据分布存储在不同节点的计算机内存中进行存储和处理。每次RDD对数据处理的最终结果都分别存放在不同的节点中。Resilient是弹性的意思,在Spark中指的是数据的存储方式,即数据在节点中进行存储时候既可以使用内存也可以使用磁盘。这为使用者提供了很大的自由,提供了不同的持久化和运行方法,是一种有容错机制的特殊数据集合。

RDD可以说是DataFrame的前身,DataFrame是RDD的发展和拓展。RDD中可以存储任何单机类型的数据,但是直接使用RDD在字段需求明显时存在算子难以复用的缺点。例如,假设RDD存的数据是一个Person类型的数据,现在要求出所有年龄段(10年一个年龄段)中最高的身高与最大的体重。使用RDD接口时,因为RDD不了解其中存储的数据的具体结构,需要用户自己去写一个很特殊化的聚合函数来完成这样的功能。那么如何改进才可以让RDD了解其中存储的数据包含哪些列并在列上进行操作呢?

根据谷歌上的解释,DataFrame是表格或二维数组状结构,其中每一列包含对一个变量的度量,每一行包含一个案例,类似于关系型数据库中的表或者R/Python中的dataframe,可以说是一个具有良好优化技术的关系表。

有了DataFrame,框架会了解RDD中的数据具有什么样的结构和类型,使用者可以说清楚自己对每一列进行什么样的操作,这样就有可能实现一个算子,用在多列上比较容易进行算子的复用。甚至,在需要同时求出每个年龄段内不同的姓氏有多少个的时候使用RDD接口,在之前的函数需要很大的改动才能满足需求时使用DataFrame接口,这时只需要添加对这一列的处理,原来的max/min相关列的处理都可保持不变。

在Apache Spark里,DataFrame优于RDD,但也包含了RDD的特性。RDD和DataFrame的共同特征是不可变性、内存运行、弹性、分布式计算能力,即DataFrame = RDD[Row] +shcema。

这里尽量避免理论化探讨,尽量讲解得深入一些,毕竟这本书是以实战为主的。分布式数据的容错性处理是涉及面较广的问题,较为常用的方法主要是两种:

·检查节点:对每个数据节点逐个进行检测,随时查询每个节点的运行情况。这样做的好处是便于操作主节点,随时了解任务的真实数据运行情况;坏处是系统进行的是分布式存储和运算,节点检测的资源耗费非常大,而且一旦出现问题,就需要将数据在不同节点中搬运,反而更加耗费时间,从而极大地拉低了执行效率。

·更新记录:运行的主节点并不总是查询每个分节点的运行状态,而是将相同的数据在不同的节点(一般情况下是3个)中进行保存,各个工作节点按固定的周期更新在主节点中运行的记录,如果在一定时间内主节点查询到数据的更新状态超时或者有异常,就在存储相同数据的不同节点上重新启动数据计算工作。其缺点在于数据量过大时,更新数据和重新启动运行任务的资源耗费也相当大。

3.1.2 DataFrame理解及特性

DataFrame是一个不可变的分布式数据集合,与RDD不同,数据被组织成命名列,就像关系数据库中的表一样,即具有定义好的行、列的分布式数据表,如图3-1所示。

图3-1 DataFrame具体展现

DataFrame背后的思想是允许处理大量结构化数据。DataFrame包含带schema的行。schema是数据结构的说明,意为模式。schema是Spark的StructType类型,由一些域(StructFields)组成,域中明确了列名、列类型以及一个布尔类型的参数(表示该列是否可以有缺失值或null值),最后还可以可选择地明确该列关联的元数据(在机器学习库中,元数据是一种存储列信息的方式,平常很少用到)。schema提供了详细的数据结构信息,例如包含哪些列、每列的名称和类型各是什么。DataFrame由于其表格格式而具有其他元数据,这使得Spark可以在最终查询中运行某些优化。

使用一行代码即可输出schema,代码如下:

df.printSchema()
//看看schema到底长什么样子

DataFrame的另外一大特性是延迟计算(懒惰执行),即一个完整的DataFrame运行任务被分成两部分:Transformation和Action(转化操作和行动操作)。转化操作就是从一个RDD产生一个新的RDD,行动操作就是进行实际的计算。只有当执行一个行动操作时,才会执行并返回结果。下面仍然以RDD这种数据集解释一下这两种操作。

1.Transformation

Transformation用于创建RDD。在Spark中,RDD只能使用Transformation创建,同时Transformation还提供了大量的操作方法,例如map、filter、groupBy、join等。除此之外,还可以利用Transformation生成新的RDD,在有限的内存空间中生成尽可能多的数据对象。有一点要牢记,无论发生了多少次Transformation,在RDD中真正数据计算运行的操作都不可能真正运行。

2.Action

Action是数据的执行部分,通过执行count、reduce、collect等方法真正执行数据的计算部分。实际上,RDD中所有的操作都是使用Lazy模式(一种程序优化的特殊形式)进行的。运行在编译的过程中,不会立刻得到计算的最终结果,而是记住所有的操作步骤和方法,只有显式地遇到启动命令才进行计算。

这样做的好处在于大部分优化和前期工作在Transformation中已经执行完毕,当Action进行工作时只需要利用全部资源完成业务的核心工作。

Spark SQL可以使用其他RDD对象、parquet文件、json文件、hive表以及通过JDBC连接到其他关系型数据库作为数据源,来生成DataFrame对象。它还能处理存储系统HDFS、Hive表、MySQL等。

3.1.3 DataFrame与DataSet的区别

DataSet是DataFrame API的一个扩展,也是Spark最新的数据抽象。DataFrame是Dataset的特列(DataFrame=Dataset[Row]),所以可以通过as方法将DataFrame转换为Dataset。Row是一个类型,跟Car、Person这些类型一样。DataSet是强类型的,比如可以有Dataset[Car]、Dataset[Person]。

在结构化API中,DataFrame是非类型化(untyped)的,Spark只在运行(runtime)的时候检查数据的类型是否与指定的schema一致;Dataset是类型化(typed)的,在编译(compile)的时候就检查数据类型是否符合规范。

DataFrame和Dataset实质上都是一个逻辑计划,并且是懒加载的,都包含着scahema信息,只有到数据要读取的时候才会对逻辑计划进行分析和优化,并最终转化为RDD。二者的API是统一的,所以都可以采用DSL和SQL方式进行开发,都可以通过SparkSession对象进行创建或者通过转化操作得到。

提示: 在Scala API中,DataFrame是Dataset[Row]的类型别名。在Java API中,用户使用数据集<Row>来表示数据流。

3.1.4 DataFrame的缺陷

如果有不同的需求,DataFrame和DataSet是可以相互转化的,即df.as[ElementType]可以把DataFrame转化为DataSet,ds.toDF()可以把DataSet转化为DataFrame。DataFrame编译时不能进行类型转化安全检查,运行时才能确定是否有问题,如果结构未知,则不能操作数据。对于对象支持不友好(相对而论),RDD内部数据直接以Java对象存储,DataFrame内存存储的是row对象,而不能是自定义对象。

有一些特殊情况需要将DataFrame转化为RDD,比如解决一些使用SQL难以处理的统计分析、将数据写入MySQL等。 ryxb2E8D88yTC8fC//de2MTnrFf2IlsGRGd66FtcNGo8RGLQuXLv/mgrFCFiA4e7

点击中间区域
呼出菜单
上一章
目录
下一章
×