提到Delta Lake,就不得不提数据湖这个概念了,因为Delta Lake实质上是数据湖的一种实现。数据湖是相对于数据仓库提出来的集中式存储概念,和数据仓库中主要存储结构化的数据不同,数据湖中可以存储结构化数据(一般指以行和列来呈现的数据)、半结构化数据(如日志、XML、JSON等)、非结构化数据(如Word文档、PDF等)和二进制数据(如视频、音频、图片等)。通常来说,数据湖中以存储原始数据为主,而数据仓库中以存储原始数据处理后的结构化数据为主。
Delta Lake是一个基于数据湖的开源项目,它能够在数据湖上构建湖仓一体的数据架构。该项目提供了对ACID数据事务的支持、可扩展的元数据处理能力,并在底层兼容Spark,以支持流批一体的数据计算处理。
Delta Lake的主要特征如下:
· 基于Spark的ACID数据事务功能,提供可序列化的事务隔离级别,确保数据读写操作的一致性。
· 利用Spark的分布式和可扩展处理能力,能够处理和存储PB级以上的数据。
· 数据支持版本管控,包括支持数据回滚以及完整的历史版本审计跟踪。
· 支持高性能的数据行级的Merge、Insert、Update、Delete操作,这一点是Hive不具备的。
· 以Parquet文件作为数据存储格式,同时由Transaction Log文件记录数据的变更过程,日志格式为JSON,如图2-12所示。
图2-12
Delta Lake对其他大数据架构Connector组件的支持情况如表2-2所示。
表2-2 Delta Lake对其他Connector组件的支持情况
Delta Lake提供的API参考网址为https://docs.delta.io/latest/delta-apidoc.xhtml#。
Delta Lake的元数据由自己管理,通常不依赖于类似Hive Metastore这样的第三方外部元数据组件。在Delta Lake中,元数据和数据一起存放在自己的文件系统的目录下,并且所有的元数据操作都被抽象成了相应的Action操作,表的元数据是由Action子类实现的。Delta Lake中源码的结构(源码GitHub地址:https://github.com/delta-io/delta),如图2-13所示。
图2-13
在Metadata.java这个实现类中提供了元数据的方法调用,说明如下。
· getId:获取数据表的唯一标志ID。
· getName:获取数据表的名称。
· getDescription:获取数据表的描述。
· getFormat:获取数据表的格式。
· getPartitionColumns:获取数据表的分区字段列表。
· getConfiguration:获取数据表的属性配置信息。
· getCreatedTime:获取数据表的创建时间。
· getSchema:获取数据表的schema信息。
源码中元数据的相关操作的核心类图如图2-14所示。
图2-14
Delta Lake可以通过如下几种方式来获取表级元数据。
(1)SQL:DESCRIBE DETAIL Table_Name或者数据表所有的数据文件路径,比如DESCRIBE DETAIL eventsTable或者DESCRIBE DETAIL '/data/events/'。
(2)Python SDK,示例代码如下:
from delta.tables import * deltaTable = DeltaTable.forPath(spark, pathToTable) detailDF = deltaTable.detail()
(3)Java SDK,示例代码如下:
import io.delta.tables.*; DeltaTable deltaTable = DeltaTable.forPath(spark, pathToTable); DataFrame detailDF = deltaTable.detail();
(4)Scala SDK,示例代码如下:
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, pathToTable) val detailDF = deltaTable.detail()
通过以上方式可以获取到的表级元数据详情如表2-3所示。
表2-3 表级元数据详情
由于Delta Lake支持使用Spark来读取和写入数据,因此在Delta Lake的源码中实现了Spark提供的CatalogPlugin接口,相关的核心类图如图2-15所示。
图2-15
由于Delta Lake实现了Spark提供的CatalogPlugin接口,因此采用2.1.3节介绍的基于Spark Catalog的方式,也可以直接获取到Delta Lake的元数据信息,但是需要在Spark采集Job的代码中加入如下Spark Config的配置:
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("xxx") .master("xxxx") .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .getOrCreate()
其中,spark.sql.extensions是Delta Lake对Spark SQL支持Delta Lake SQL解析的扩展。在Delta Lake源码中,io.delta.sql.parser这个package下的DeltaSqlParser.scala,实现了Spark SQL对Delta Lake SQL Command的解析支持,相关的核心类图如图2-16所示。
图2-16
其中,org.apache.spark.sql.catalyst.parser.ParserInterface是Spark提供的、最底层的SQL解析抽象接口。