购买
下载掌阅APP,畅读海量书库
立即打开
畅读海量书库
扫码下载掌阅APP

2.4 Apache Hudi中的元数据采集

Apache Hudi和Delta Lake一样,是一款基于数据湖的开源项目,也能够在数据湖上构建湖仓一体的数据架构。通过访问网址https://hudi.apache.org/即可进入Hudi的官方首页,如图2-17所示。

图2-17

Apache Hudi(简称Hudi)的主要特征如下:

· 支持表、事务,以及快速进行Insert、Update、Delete等操作。

· 支持索引,数据存储压缩比高,并且支持常见的开源文件存储格式。

· 支持基于Spark、Flink的分布式流式数据处理。

· 支持Apache Spark、Flink、Presto、Trino、Hive等SQL查询引擎。

2.4.1 基于Spark Catalog采集元数据

由于Hudi支持使用Spark来读取和写入数据,因此在Hudi的源码中实现了Spark提供的CatalogPlugin接口,相关的核心类图如图2-18所示。

图2-18

由于Hudi和Delta Lake一样,也实现了Spark提供的CatalogPlugin接口,因此采用2.1.3节中基于Spark Catalog的方式,也可以直接获取Hudi的元数据信息,但是需要在Spark采集Job的代码中加入如下Spark Config的配置信息:

     import org.apache.spark.sql.SparkSession
  
     val spark = SparkSession
       .builder()
       .appName("xxx")
       .master("xxxx")
       .config("spark.sql.extensions","org.apache.spark.sql.hudi.HoodieSparkSessionE
xtension")
       .config("spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.hudi.catalog.HoodieCatalog")
       .getOrCreate()

其中,spark.sql.extensions和Delta Lake的设置一样,Hudi也对Spark SQL做了SQL解析扩展。在Hudi的源码中,org.apache.spark.sql.parser这个package下的HoodieCommonSqlParser.scala代码实现了Spark SQL对Hudi SQL Command的解析支持,相关的核心类图如图2-19所示。

图2-19

从这里可以看到,Hudi底层对Spark SQL支持的实现方式和Delta Lake对Spark SQL支持的实现方式非常类似,都需要用代码实现Spark提供的、最底层的org.apache.spark.sql.catalyst.parser.ParserInterface这个SQL解析接口类,从而达到支持Hudi的目的。

2.4.2 Hudi Timeline Meta Server

通常情况下,是通过追踪数据湖中的数据文件的方式来管理元数据的,无论是Delta Lake还是Hudi,底层均通过跟踪文件操作的方式来提取元数据。

1.Hudi中常见的Action操作

在Hudi中,对元数据的操作和Delta Lake的实现很类似,底层都是抽象成了相应的Action操作,只是Action操作的类型略微有些不同。Hudi中常见的Action操作说明如下。

· COMMITS:以原子提交的方式批量提交数据记录写入数据表中。

· CLEANS:在后台以异步的方式删除已经过时的旧的数据文件。

· DELTA_COMMIT:以原子提交的方式将数据记录写入Hudi中的MergeOnRead类型表,部分数据或者全部数据会以delta日志的方式直接写入。

· COMPACTION:在后台以异步的方式合并Hudi数据结构差异,比如将日志文件合并到数据文件中。

· ROLLBACK:回滚操作,同时删除已经生成的数据文件。

· SAVEPOINT:将数据文件组标记为已保存,这样CLEANS操作就不会删除这些文件,在需要恢复数据时,可以将数据恢复到某个历史时间点。

2.Timeline Meta Server

数据湖之所以不能直接用Hive Meta Store来管理元数据,是因为Hive Meta Store的元数据管理没有办法实现数据湖特有的数据跟踪能力。数据湖管理文件的管理粒度非常细,需要记录和跟踪哪些文件是新增操作,哪些文件是失效操作,哪些数据是新增的,哪些数据是更新的,而且还需要具备原子的事务性、支持回滚等操作。Hudi为了管理好元数据,记录数据的变更过程,设计了Timeline Meta Server,Timeline记录了在不同时刻对表执行所有操作的日志,有助于提供表的即时视图,同时也有效地支持按到达的先后时间顺序来检索数据和回滚数据。

(1)在Timeline中,抽象了三个概念,如表2-9所示。

表2-9 Timeline中抽象的三个概念

(2)图2-20展示了Hudi数据表在10:00~10:20发生的Upsert和Insert操作,大概每5分钟发生一次数据提交,并且同时在Hudi的Timeline上留下了元数据提交、Hudi后台数据清理和压缩的动作痕迹。当数据存在延迟时,Upsert操作会把新数据直接加入过去时间段的文件夹和桶中。这样,在Timeline的协助下,当查询10:00后新的增量数据时,能够非常有效地直接读取更改后的文件,而无须扫描7:00后的所有时间段的数据,从而加快数据查询操作。

图2-20

Timeline分为Active Timeline(活动的Timeline)和Archived Timeline(已经归档的Timeline)。Hudi会在后台异步地不断将Active Timeline归档到Archived Timeline中。

(3)Hudi Timeline Meta Server总体架构设计如图2-21所示。

(4)在Hudi的GitHub源码(GitHub地址:https://github.com/apache/hudi/tree/master/hudi-timeline-service)中,有一个专门的hudi-timeline-service子工程,这个子工程代码实现的就是Timeline Server的功能。

(5)Timeline Server的相关启动配置信息如表2-10所示。

图2-21

表2-10 Timeline Server的相关启动配置信息

3.Marker

在Hudi中抽象出了一个Marker(标记)的概念,翻译过来就是标记的意思。数据的写入操作可能在完成之前出现写入失败的情况,从而在存储中留下部分损坏的数据文件,而标记则用于跟踪和清除失败的写入操作。写入操作开始时会创建一个标记,表示正在进行文件写入。写入提交成功后,标记将被删除。如果写入操作中途失败,则会留下一个标记,表示这个写入的文件不完整。使用标记主要有如下两个目的。

· 正在删除重复/部分数据文件:标记有助于有效地识别写入的部分数据文件,与稍后成功写入的数据文件对比,这些文件包含重复的数据,并且在提交完成时会清除这些重复的数据文件。

· 回滚失败的提交:如果写入操作失败,则下一个写入请求将会在继续进行新的写入之前,先回滚该失败的提交。回滚是在标记的帮助下完成的,标记用于识别整体失败但已经提交的一部分写入的数据文件。

加入标记来跟踪每次提交的数据文件,那么Hudi将不得不列出文件系统中的所有文件,将其与Timeline中看到的文件关联起来做对比,然后删除部分写入失败的文件,这在一个像Hudi这样庞大的分布式系统中,性能的开销将会非常大。

如图2-22所示,标记创建请求都会基于Timeline Server先在队列中排队,等待接受线程池中工作线程的异步处理,对于每个批处理的间隔时长,Timeline服务会从队列中获取标记创建请求,循环写入下一个文件中,并且确保一致性和正确性。批处理间隔和批处理并发都可以通过表2-10所示的配置项进行配置。

图2-22

2.4.3 基于Hive Meta DB采集元数据

虽然Hudi元数据存储是通过Timeline来管理的,但是Hudi在设计时就考虑将自身元数据同步到Hive Meta Store中,如图2-23所示,其实就是将Hudi的Timeline中的元数据异步更新到Hive Meta Store中存储。

图2-23

1.HoodieMetaSyncOperations接口定义的方法

在Hudi的源码中定义了org.apache.hudi.sync.common.HoodieMetaSyncOperations.java这个接口抽象,用来作为元数据同步到类似Hive Meta DB这样的第三方外部元数据存储库。该接口定义的方法如下。

· createTable:在第三方外部元数据库上创建表。

· tableExists:判断第三方外部元数据库上是否有指定的表。

· dropTable:删除第三方外部元数据库上指定的表。

· addPartitionsToTable:给第三方外部元数据库上指定表添加分区数据信息。

· updatePartitionsToTable:更新第三方外部元数据库上指定表的分区数据信息。

· dropPartitions:删除第三方外部元数据库上指定表的分区数据信息。

· getAllPartitions:获取第三方外部元数据库上指定表的分区数据信息。

· getPartitionsByFilter:通过指定的过滤条件获取第三方外部元数据库上指定表的分区数据信息。

· databaseExists:判断第三方外部元数据存储中是否有指定的数据库。

· createDatabase:在第三方外部元数据存储中创建指定的数据库。

· getMetastoreSchema:获取第三方外部元数据库上指定表的Schema信息。

· getStorageSchema:从Hudi的表存储中获取Schema信息。

· updateTableSchema:更新第三方外部元数据库上指定表的Schema信息。

· getMetastoreFieldSchemas :从第三方外部元数据库上获取指定表的列信息。

· getStorageFieldSchemas:从Hudi的字段存储中获取列信息。

· updateTableComments:更新第三方外部元数据库上指定表的注释描述信息。

· getLastCommitTimeSynced:获取最后一次同步提交的时间戳。

· getLastCommitCompletionTimeSynced:获取最后一次同步提交处理完成的时间戳。

· updateLastCommitTimeSynced:更新最后一次同步提交的时间戳。

· updateTableProperties:更新第三方外部元数据库上指定表的属性配置信息。

· updateSerdeProperties:更新第三方外部元数据库上指定表的SerDe属性配置信息。

· getLastReplicatedTime:获取指定表最后一次复制的时间戳。

· updateLastReplicatedTimeStamp:更新指定表最后一次复制的时间戳。

· deleteLastReplicatedTimeStamp:删除指定表最后一次复制的时间戳。

org.apache.hudi.sync.common.HoodieMetaSyncOperations.java这个接口的上层实现类图如图2-24所示,从图中可以看到Hudi在源码中实现了将自身元数据同步到Hive、Datahub、AWS等第三方外部元数据存储。如果需要支持自定义的外部元数据存储同步,也可以自己实现org.apache.hudi.sync.common.HoodieSyncClient.java这个类。

图2-24

2.部署Hudi使用的配置项

Hudi在部署时提供了如下配置项,用于将自身的元数据同步到Hive Meta Store DB。

· hoodie.datasource.meta.sync.enable:默认值为false。设置为true时,表示启用将Hudi中的数据表同步到外部元数据存储中。

· hoodie.datasource.hive_sync.mode:无默认值。元数据同步模式,支持HMS、JDBC、HiveSQL三种模式。

· hoodie.datasource.hive_sync.enable:默认值为false。设置为true时,表示启用将Hudi元数据同步到Hive Meta Store中。

· hoodie.datasource.hive_sync.jdbcurl:默认值为jdbc:hive2://localhost:10000。指Hive Metastore的JDBC地址。

· hoodie.datasource.hive_sync.metastore.uris:默认值为thrift://localhost:9083。指Hive Meta Store的Thrift协议的URL地址。

将Hudi的元数据同步到Hive Meta Store中后,我们就可以通过2.1.1节中的基于Hive Meta DB采集的方式采集到Hudi中的元数据。 jVwgaV9srvsAQhZRg4yoWEnXHFKaYbZD1dz6/URleMhnjV1GTxk7iuSegIIkJosH

点击中间区域
呼出菜单
上一章
目录
下一章
×