通俗来说,Hive是一个基于Hadoop大数据框架的数据仓库工具,提供了类似SQL的HQL查询语言,降低了数据分析的使用门槛,让会使用传统SQL语言的数据分析人员可以无缝使用Hive进行数据分析。
Hive将元数据存储在单独的默认数据库中,也可以在部署时由用户来指定存储在哪种数据库中,通常支持存储在MySQL、SQL Server、Derby、PostgreSQL、Oracle等数据库中。
Hive在进行架构设计时,已经将Hive的元数据设计为一个单独的模块,并且提供标准的API服务,如图2-1所示。在大数据的生态体系中,很多大数据组件都是围绕着Hive的元数据来构建生态的。
图2-1
由于Hive在部署时将元数据单独存储在指定的数据库中,因此从技术实现上来说,肯定可以直接从Hive元数据存储的数据库中获取需要的元数据信息。Hive的元数据是由Hive自己管理的,它存储着数据仓库中各种表和分区的所有结构信息,包括字段和字段类型信息、读写数据所需的串行器和解串器,以及存储数据的相应HDFS文件路径等。Hive表结构的任何变更,都会自动触发Hive元数据的修改。
在Hive的官方网站的网址https://cwiki.apache.org/confluence/display/hive/design#Design-HiveDataModel中提供了如图2-2所示的Hive架构设计图。
图2-2
从图2-2中可以看到,Hive的Metastore所处的位置,以及和Hive中的其他组件的交互关系。在进行HQL查询时,SQL的编译和执行都需要获取元数据信息。
Hive中包含以下几种数据模型。
· Tables(表):和关系数据库中表的概念很像,表的创建除需要指定字段外,还需要指定数据的存储介质。在Hive中,表几乎都是存储在HDFS(Hadoop Distributed File System,Hadoop分布式文件系统)中。
· Partitions(分区):每个表都可以有分区,分区的作用是方便快速地进行数据检索,一个表可以有一个或者多个分区。
· Buckets(桶):每个分区中的数据又可以进行分桶,每个Bucket以文件的形式存储在分区的目录下。Bucket是最终真正用来存储数据的。
Hive元数据数据库中常见的关键表信息说明如下。
(1)DBS:存储着Hive中数据库的相关基础信息,该表中的常见字段如下。
· DB_ID:数据库ID。
· DESC:数据库描述信息。
· DB_LOCATION_URI:数据库数据的存储路径。
· NAME:数据库的名称。
· OWNER_NAME:数据库所有者的用户名。
· OWNER_TYPE:数据库所有者的类型。
(2)DATABASE_PARAMS:存储着Hive中数据库参数的相关信息,一个数据库可以存在多个参数,该表中的常见字段说明如下。
· DB_ID:数据库ID,对应到DBS表中的DB_ID字段。
· PARAM_KEY:数据库的参数名。
· PARAM_VALUE:数据库的参数名对应的值。
(3)TBLS:存储着Hive数据库中数据表的相关基础信息,该表中的常见字段说明如下。
· TBL_ID:数据表的ID。
· CREATE_TIME:创建时间,数据以UNIX时间戳的形式展现。
· DB_ID:数据库ID,对应到Dbs表中的DB_ID字段。
· LAST_ACCESS_TIME:最后一次访问时间,数据以UNIX时间戳的形式展现。
· OWNER:数据表的所有者。
· RETENTION:保留字段。
· SD_ID:数据表数据的存储ID,对应SDS表的SD_ID字段。
· TBL_NAME:数据表的表名称。
· TBL_TYPE:数据表的表类型,表的常见类型包括MANAGED_TABLE、EXTERNAL_TABLE、INDEX_TABLE和VIRTUAL_VIEW。
· VIEW_EXPANDED_TEXT:数据表类型为视图的详细SQL查询语句,视图并不是真正存在的一张物理表,一般是通过SELECT查询映射出来的一个逻辑虚拟表,比如select * from table_xx。
· VIEW_ORIGINAL_TEXT:数据表类型为视图的原始SQL查询语句。
(4)COLUMNS_V2:存储着数据表的字段信息,该表中的常见字段说明如下。
· CD_ID:字段ID。
· COMMENT:字段注释。
· COLUMN_NAME:字段名。
· TYPE_NAME:字段类型。
· INTEGER_IDX:字段展示顺序。
(5)TABLE_PARAMS:存储着Hive数据库中数据表参数或者属性的相关基础信息,该表中的常见字段说明如下。
· TBL_ID:数据表的ID。
· PARAM_KEY:数据表的参数或者属性名。
· PARAM_VALUE:数据表的参数值或者属性值。
(6)TBL_PRIVS:存储着表或者视图的授权信息,该表中的常见字段说明如下。
· TBL_GRANT_ID:授权ID,字段类型为数值型。
· CREATE_TIME:授权创建的时间,数据以UNIX时间戳的形式展现。
· GRANT_OPTION:授权选项。
· GRANTOR:执行授权操作的对象。
· GRANTOR_TYPE:执行授权操作的对象类型。
· PRINCIPAL_NAME:被授权的对象。
· PRINCIPAL_TYPE:被授权的对象类型。
· TBL_PRIV:被授予的权限。
· TBL_ID:被授权的表的ID,对应TBLS表中的TBL_ID字段。
(7)SERDES:存储着数据序列化的相关配置信息,该表中的常见字段说明如下。
· SERDE_ID:序列化配置ID。
· NAME:序列化配置名,值可以为空。
· SLIB:序列化类名称,例如org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe。
(8)SERDE_PARAMS:存储着数据序列化的属性或者参数信息,该表中的常见字段说明如下。
· SERDE_ID:序列化配置ID。
· PARAM_KEY:属性名或者参数名。
· PARAM_VALUE:属性值或者参数值。
(9)SDS:存储着数据表的数据文件存储相关信息,该表中的常见字段说明如下。
· SD_ID:存储ID。
· CD_ID:字段ID,对应CDS表中的CD_ID字段。
· INPUT_FORMAT:数据文件的输入格式,例如org.apache.hadoop.mapred.SequenceFileInputFormat、org.apache.hadoop.mapred.TextInputFormat等。
· IS_COMPRESSED:数据文件是否压缩,值为true或者false。
· IS_STOREDASSUBDIRECTORIES:是否包含子目录存储,值为true或者false。
· LOCATION:数据文件的存储路径。
· NUM_BUCKETS:数据存储的分桶数量,如果为-1,则代表没有限制。
· OUTPUT_FORMAT:数据文件的输出格式,例如org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat、org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat等。
· SERDE_ID:序列化ID,对应SERDES表的SERDE_ID字段。
(10)SD_PARAMS:存储着数据表的存储相关属性或者参数信息,该表中的常见字段说明如下。
· SD_ID:存储ID,对应SDS表的SD_ID字段。
· PARAM_KEY:存储的属性名或者参数名。
· PARAM_VALUE:存储的属性值或者参数值。
(11)PARTITIONS:存储着数据表分区的相关信息,该表中的常见字段说明如下。
· PART_ID:分区ID。
· CREATE_TIME:分区的创建时间,数据以UNIX时间戳的形式展现。
· LAST_ACCESS_TIME:最后一次访问的时间,数据以UNIX时间戳的形式展现。
· PART_NAME:分区的名称。
· SD_ID:分区的存储ID,对应SDS表中的SD_ID字段。
· TBL_ID:分区关联的数据表ID,对应TBLS表中的TBL_ID字段。
(12)PARTITION_KEYS:存储着数据表分区的字段信息,该表中的常见字段说明如下。
· TBL_ID:数据表ID,对应TBLS表中的TBL_ID字段。
· PKEY_COMMENT:分区字段的注释。
· PKEY_NAME:分区字段名。
· PKEY_TYPE:分区字段类型。
· INTEGER_IDX:分区字段的顺序。
(13)PARTITION_PARAMS:存储分区的属性或者参数信息,该表中的常见字段说明如下。
· PART_ID:分区ID,对应PARTITIONS表中的PART_ID字段。
· PARAM_KEY:分区的属性名或者参数名。
· PARAM_VALUE:分区的属性值或者参数值。
Hive元数据中常见的关键表之间的关联关系如图2-3所示。
图2-3
根据图2-3的关联关系,我们就可以用SQL语句查询到所需要的元数据的数据信息。
Hive Catalog是Hive提供的一个重要的组件,专门用于元数据的管理。它管理着所有Hive库表的结构、存储位置、分区等相关信息。同时,Hive Catalog提供了RESTful API或者Client包供用户来查询或者修改元数据信息,其底层核心的JAR包为hive-standalone-metastore.jar。在该JAR包中的org.apache.hadoop.hive.metastore.IMetaStoreClient.java接口中定义了对Hive元数据的管理的抽象。其核心类图如图2-4所示,org.apache.hadoop.hive.metastore.HiveMetaStoreClient是org.apache.hadoop.hive.metastore.ImetaStoreClient接口的实现类。
图2-4
org.apache.hadoop.hive.metastore.HiveMetaStoreClient作为客户端,在连接Hive Meta服务时,需要传入对应的Hive连接配置,这个配置被定义在org.apache.hadoop.hive.conf.HiveConf.java中。
如图2-5所示为通过客户端获取Hive元数据时和服务端的交互过程。
图2-5
HiveConf的定义以及HiveMetaStoreClient的初始化代码如下:
... //初始化HiveConf配置 org.apache.hadoop.hive.conf.HiveConf hiveConf = new org.apache.hadoop.hive.conf.HiveConf(getClass()); hiveConf.set("hive.metastore.uris","thrift://your-ip:port"); //初始化HiveMetaStoreClient org.apache.hadoop.hive.metastore. IMetaStoreClient client= new org.apache.hadoop.hive.metastore.HiveMetaStoreClient(hiveConf); ...
在org.apache.hadoop.hive.metastore.ImetaStoreClient.java中定义的、获取Hive元数据的核心方法如下。
· getDatabases:获取指定的数据库。
· getAllDatabases:获取所有的数据库。
· getAllTables:获取指定数据库下所有的表。
· getTables:获取指定的表。
· getTableMeta:获取表的元数据信息。
· listTableNamesByFilter:通过过滤条件获取所有的表名。
· dropTable:删除指定的表。
· truncateTable:清除指定表中的所有数据。
· tableExists:判断指定的表是否存在。
· getPartition:获取分区。
· listPartitions:获取指定表的所有分区信息。
· listPartitionNames:获取指定表的所有分区名称。
· createTable:在数据库下创建表。
· alter_table:修改表的相关信息。
· dropDatabase:删除指定的数据库。
· alterDatabase:修改指定数据库的相关信息。
· dropPartitions:删除符合条件的分区。
· dropPartition:删除指定的分区。
· alter_partition:修改指定的分区。
· renamePartition:修改分区的名称。
· getFields:获取指定表下的字段信息。
· getSchema:获取Schema信息。
· getTableColumnStatistics:获取表字段(列)的统计信息。
· createFunction:创建函数。
· alterFunction:修改函数。
· dropFunction:删除函数。
在Hive 2.2.0版本之前,Hive还提供了以Hcatalog REST API的形式对外访问Hive Catalog(Hive 2.2.0版本后,已经移除了Hcatalog REST API这个功能),REST API的访问地址为http://yourserver/templeton/v1/resource。在Hive的Wiki网站https://cwiki.apache.org/confluence/display/Hive/WebHCat+Reference中详细列出了REST API支持哪些接口访问,如图2-6所示。
比如,通过调用REST API接口http://yourserver/templeton/v1/ddl/database,便可以获取到Catalog中所有数据库的信息,如图2-7所示。
图2-6
图2-7
Spark是一个基于分布式的大数据计算框架。Spark和Hadoop的最大不同是,Spark的数据主要是基于内存计算的,所以Spark的计算性能远远高于Hadoop,深受大数据开发者喜爱。Spark提供了Java、Scala、Python和R等多种开发语言的API。
Spark Catalog是Spark提供的一个元数据管理组件,专门用于Spark对元数据的读取和存储管理,它管理着Spark支持的所有数据源的元数据。Spark Catalog支持的数据源包括HDFS、Hive、JDBC等。Spark Catalog将外部数据源中的数据表映射为Spark中的表,所以通过Spark Catalog也可以采集到我们需要的元数据信息。
如图2-8所示,在Spark源码中使用org.apache.spark.sql.catalog.Catalog这个抽象类来定义Catalog的底层抽象,而在org.apache.spark.sql.internal.CatalogImpl中对Catalog进行了抽象类的实现。在org.apache.spark.sql.SparkSession中提供了Catalog的对外调用,用户在使用Catalog时可以通过SparkSession进行调用。
图2-8
Catalog类中提供的方法如表2-1所示。
表2-1 Catalog类中提供的方法
(续表)
(续表)
自Spark 3.0版本起,引入了Catalog Plugin,虽然org.apache.spark.sql.catalog.Catalog提供了一些常见的元数据查询和操作方法,但是并不够全面、强大和灵活,比如无法支持多个Catalog等,所以Catalog Plugin是Spark为了解决这些问题应运而生的。
Spark源码中提供了使用Java语言定义的Catalog Plugin接口,代码如下:
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License */ package org.apache.spark.sql.connector.catalog; import org.apache.spark.annotation.Evolving; import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.util.CaseInsensitiveStringMap; /** * A marker interface to provide a catalog implementation for Spark. * <p> * Implementations can provide catalog functions by implementing additional interfaces for tables, * views, and functions. * <p> * Catalog implementations must implement this marker interface to be loaded by * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the * required public no-arg constructor. After creating an instance, it will be configured by calling * {@link #initialize(String, CaseInsensitiveStringMap)}. * <p> * Catalog implementations are registered to a name by adding a configuration option to Spark: * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties * in the Spark configuration that share the catalog name prefix, * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive * string map of options in initialization with the prefix removed. * {@code name}, is also passed and is the catalog's name; in this case, "catalog-name" * * @since 3.0.0 */ @Evolving public interface CatalogPlugin { /** * Called to initialize configuration. * <p> * This method is called once, just after the provider is instantiated. * * @param name the name used to identify and load this catalog * @param options a case-insensitive string map of configuration */ void initialize(String name, CaseInsensitiveStringMap options); /** * Called to get this catalog's name. * <p> * This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is * called to pass the catalog's name */ String name(); /** * Return a default namespace for the catalog. * <p> * When this catalog is set as the current catalog, the namespace returned by this method will be * set as the current namespace. * <p> * The namespace returned by this method is not required to exist. * * @return a multi-part namespace */ default String[] defaultNamespace() { return new String[0]; } }
在该接口的源码中定义了以下3个方法。
· initialize:配置初始化,该方法只执行一次。
· name:获取Catalog的名字。
· defaultNamespace:获取Catalog默认的命名空间。
Catalog Plugin在源码中的位置如图2-9所示,CatalogPlugin接口位于Spark源码的spark-catalyst子工程下。
图2-9
如图2-10所示为CatalogPlugin接口在Spark源码中的代码实现类图,从中可以看到CatalogPlugin是如何对外暴露给用户进行调用的。
图2-10
若需自定义一个Catalog,则可以直接或间接实现org.apache.spark.sql.connector.catalog.CatalogPlugin接口。间接实现指的是,也可以选择实现CatalogPlugin的扩展接口,例如TableCatalog。自定义Catalog实现完成后,在运行Spark任务时,需要添加以下配置:
spark.sql.catalog.catalog_name=com.example.YourCatalogClass(自定义的 package.Catalog类名)
Spark还提供org.apache.spark.sql.catalyst.catalog.ExternalCatalog接口来扩展支持外部数据源的元数据操作,比如Spark对Hive的操作就是通过org.apache.spark.sql.hive.HiveExternalCatalog这个接口来实现的。相关的核心类图如图2-11所示。
图2-11
可以看到,org.apache.spark.sql.hive.HiveExternalCatalog实现了ExternalCatalog接口来提供对Hive的Catalog操作。
通过Spark Catalog获取元数据的代码如下:
上述代码中需要通过Maven引入Spark的相关依赖包:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.12</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <scope>provided</scope> </dependency>
然后,在Spark集群中以Spark Job的方式运行上述代码,就可以获取到相关的元数据信息。