INFO-Spark 高性能读写 Hive 表
介绍、对比 Spark 内置的两种 Hive 表读写方式:Data Source 模式、Hive SerDe 模式。
本文未做特别说明之处,假设 Spark 版本为 3.3.0
为什么要兼容 Hive?
Hive 是 Hadoop 快速发展阶段最成功的 SQL on Hadoop 解决方案,一度成为 Hadoop 数仓解决方案的代名词。基于其在数仓领域无可争议的地位,后继者如 Spark、Presto/Trino、Impala 等都将兼容 Hive 数仓作为重要目标之一。
但站在技术的角度,“除非代码完全一致,否则完全兼容几乎是不可能的事情”。当然,这是一句废话,实际场景中用户对“兼容”的期望并没有这么苛刻,但确实存在一些客观原因,使得其他计算引擎对 Hive 表的兼容或多或少存在一些“瑕疵”。
兼容 Hive 很难吗?
兼容很简单,完全兼容很难。综合来讲,Spark 对 Hive 的兼容属于第一梯队。
主要因为 Hive 本身是一个开放系统,并且经过了漫长的野蛮生长时期,其很多地方并未经过精心的设计和严谨的工程实现,Hive 自身的版本升级也会带来众多的“不兼容”。
Hive 作为一个 SQL on Hadoop 解决方案,其数据组织的基本理念是:
- 将元数据,如表结构、分区信息,存储在 RDBMS 中,并提供一个中心化的 Metastore 服务;
- 使用开放数据格式如 Parquet、CSV、JSON,将数据以松散的结构存储在 HDFS、S3 等分布式文件系统中,比如
. └── test.db ├── tbl_1 │ ├── _SUCCESS │ ├── date=2025-03-01 │ │ ├── part-00000-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet │ │ ├── part-00001-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet │ │ ├── part-00002-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet │ │ ├── part-00003-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet │ │ └── part-00004-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet │ └── date=2025-03-02 │ ├── part-00000-5bf478f0-ebc9-4621-be84-5a69c0a3e9ea.c000.gz.parquet │ ├── part-00000-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet │ ├── part-00001-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet │ └── part-00002-fa90407e-ff8d-43dd-8fc1-50874b52d0e5.c000.gz.parquet └── tbl_2 ├── _SUCCESS ├── part-00000-860f1ec2-2413-42e4-9fa1-ebacabebdeb1-c000.snappy.orc └── part-00001-860f1ec2-2413-42e4-9fa1-ebacabebdeb1-c000.snappy.orc
读 Hive 表时,计算引擎连接 Metastore 服务获取表元数据,然后根据其中的 table location 去 HDFS 读取数据;写 Hive 表时,数据引擎将数据文件写入到 HDFS,然后连接 Metastore 服务更新表元数据。
考虑到与 Metastore 通信有现成的客户端,读写数据文件也有现成的 SDK,手搓一个能读写 Hive 表的小项目可能工作量也就几天,然后就可以宣布“兼容” Hive 了!
但 Hive 并未规范其中的很多细节,导致不同计算引擎在实现上有众多的差异:
Metastore 记录的一个表路径,在 HDFS 上不存在,该怎么办?
- 部分数据文件有损坏,该丢弃吗?
- 修改表定义,如加列、删列,需要同步修改 HDFS 上的所有数据文件吗?
- Metastore 定义的表结构,与 HDFS 中数据文件,如 Parquet、ORC,Schema 不一致,该怎么办?
- 若多个任务对表并发修改,该如何处理冲突?
- 复写已存在的分区,或者向已存在分区追加文件,要更新 Metastore 吗?
- ...
Hive 在演进过程中,也不断增加新的功能和扩展接口,本文主要要论 Hive 最常用的功能,某些非常用功能如 StorageHandler,Spark 仅提供有限的支持,本文不做深入讨论。
Spark 数据源接口
作为计算引擎,考虑到支持接入多种数据源,一般都会提供一些抽象的公用层:
- Hive 提供了 SerDe 接口,并内置了 SequenceFile、RCFile、ORC、Parquet、Textfile、Avro 等常见格式的实现
- Spark 提供了 DataSource API,并内置了 ORC、Parquet、Text、Avro、Csv、JSON、JDBC、Hive 等常见格式的实现;特别地,Hive 作为一种特殊的数据源,支持使用 Hive SerDe 读写 Hive 表
- Spark 3.0 提供了全新的 DataSource V2 API,内置 JDBC 数据源已完成迁移,并建议第三方 Connector 迁移到此 API;特别地,Iceberg 可以视作 DSv2 API 的参考实现。
Spark 建表语法
以下讨论仅限于使用默认 Hive Session Catalog 的场景,DataSource V2 表与 Multiple Catalog 特性相关,暂不做讨论。
作为一个数仓系统,Spark 必须要支持表的持久化,显然 Spark 的 DataSource 格式与 Hive 所支持的格式并不完全一致,比如若不使用第三方 SerDe 实现,Hive 并不支持 JSON。因此 Spark 引入了 DataSource 表的概念,将其元数据存储在 table properties 等字段,持久化到 Hive Metastore 中,理论上这种表只能被 Spark 识别和读取。Spark DataSource 表建表语法为:
CREATE TABLE .... USING <format> ...
例如:创建一张 DataSource Parquet 表
CREATE TABLE src(id int) USING parquet
创建一张 JDBC Data Source 表,schema 未指定时可以自动推导
CREATE TABLE pg_tbl
USING jdbc
OPTIONS (
url "jdbc:postgresql:<dbserver>",
dbtable "schema.tablename",
user 'username',
password 'password'
)
创建一张 Hive Parquet 表
CREATE TABLE src(id int) USING hive OPTIONS (fileFormat 'parquet')
为了进一步兼容 Hive 语法,Spark SQL 也支持了 Hive 建表语法的 STORED AS 关键字,下述是其等价写法
CREATE TABLE src(id int) STORED AS parquet
Spark 同时提供参数控制建表的默认格式,配置以下参数即可实现省略指定格式时,默认创建 STORED AS parquet 表
spark.sql.legacy.createHiveTableByDefault=true (默认值)
spark.hadoop.hive.default.fileformat=parquet
理论上:
- Spark 只有创建 Hive SerDe 表,才会在 Hive Metastore 中持久化与 Hive 表兼容的元数据,从而可以正确地被其他计算引擎读写。
- Spark 必须使用 Hive SerDe Class 读写 Hive DerDe 表。
实际上,Parquet / ORC 是大数据场景中最常用的两种数据格式,Spark SQL 对其做了特殊优化:
- Parquet/ORC DataSource 表在 Hive Metastore 中持久化的元数据,也会“尽力”保证与 Hive SerDe 表兼容
- 读写 Parquet/ORC Hive SerDe 表时,可以转化为 DataSource 表,避免使用 Hive SerDe Class 低效代码,从而获得更好的性能。
Spark 对于 Parquet / ORC 格式的 Hive SerDe 表读写方案对比
注意:本章节只适用于通过该创建的 Hive 表;
CREATE TABLE … STORED AS [parquet|orc]
对于使用以下语法创建的 Data Source 表,Spark SQL 总会使用 Data Source 模式读写。
CREATE TABLE … USING [parquet|orc]
另外 Spark 对 Parquet 的优化要比 ORC 更好,在网易多计算引擎混用的大数据技术栈背景下,强烈建议使用 Hive Parquet 表。
CREATE TABLE … STORED AS parquet
以下是创建 Parquet 格式的 Hive SerDe/DataSource 表时元数据的样例
# spark 执行建表
CREATE TABLE ... STORED AS [parquet|orc]
CREATE TABLE ... USING [parquet|orc]
# hive 查看建表语句
show create table ...
这样可以看到两者元数据的区别:
Hive SerDe 表:使用 LOCATION 里的地址进行读取
DataSource 表:使用 WITH SERDEPROPERTIES 的 path 里的地址进行读取
整体对比
Data Source 模式作为 Spark 内置原生高性能实现,更快、行为更贴近 ANSI SQL 标准;
Hive SerDe 模式尽量复用 Hive 代码完成读写,最大程度兼容和模拟 Hive 的行为(甚至 bug)
备注:Spark 调用了很多 Hive 私有 API 完成对 SerDe 模式的兼容,深度耦合特定的 Hive 版本,且升级频率极低,比如 Spark 2.x 使用 Hive 1.2.1,Spark 3.x/4.x 使用 Hive 2.3.x,因此 Hive 低版本存在的 bug 可能会长期影响 Spark;碰到 bug 排查链路长、修复成本高,例如:SPARK-33144
具体差异(持续更新)
向量化读取(一般有 2x~15x 性能提升)
- Data Source 模式默认使用向量化读取,性能高;Spark 3.3 起,嵌套字段(ARRAY、STRUCT、MAP)也支持向量化读取
- 由于 Hive 2.3 Parquet/ORC 向量化读取 bug 较多,Hive SerDe 模式默认关闭此特性;性能低
读取并发控制
- Data Source 模式读取表时支持合并小文件,切分大文件(若文件本身可切割),使得每个 Task 尽量处理均匀的数据:默认根据 maxPartitionBytes 切分,当切分结果不在 (minPartitionNum, maxPartitionNum) 区间内,则做调整防止过低或者过高的并行度
spark.sql.files.openCostInBytes=4m (默认) spark.sql.files.maxPartitionBytes=128m (默认) spark.sql.files.minPartitionNum=200 spark.sql.files.maxPartitionNum=50000 (网易版本 3.3.1.18 起可用,开源版本 3.5.0 起可用)
- Hive SerDe 模式读取表时,开源版本不支持合并小文件,切分大文件(若文件本身可切割),因此若上游表存在大量小文件,Spark 读取时会生成海量 Task,给 Spark Driver 极大的调度开销和内存压力,从而导致性能和稳定性降低。网易版本可通过配置启用小文件合并。
# 以下参数均为 3.3.4.60 起可用 spark.sql.hive.combineFileInputFormat.enabled=true spark.sql.hive.combineFileInputFormat.splitNumPerCore=3 spark.sql.hive.combineFileInputFormat.minSplitBytes=64m spark.sql.hive.combineFileInputFormat.maxSplitNum=10000
- Data Source 模式读取表时支持合并小文件,切分大文件(若文件本身可切割),使得每个 Task 尽量处理均匀的数据:默认根据 maxPartitionBytes 切分,当切分结果不在 (minPartitionNum, maxPartitionNum) 区间内,则做调整防止过低或者过高的并行度
读取混合分区表,如表 t 为 Parquet 格式,但其分区 hour=01 为 Parquet 格式;hour=02 为 JSON 格式
- Data Source 模式只识别表格式,所有分区数据都按照表格式读取,因此会报错;
- Hive SerDe 模式可以识别每个分区的格式,支持读取混合分区表,因此可以正确读取数据
Parquet Decimal 字段类型支持
- Data Source 模式完整支持 Parquet Decimal 所有编码规范,可以使用 spark.sql.parquet.writeLegacyFormat=true 来确保写入时,使用 Hive、Impala 兼容的老格式(FIXED_LEN_BYTE_ARRAY)
- Hive SerDe 模式仅支持老格式(FIXED_LEN_BYTE_ARRAY)
- Hive on Tez 引擎 UNION ALL 语句写入的数据,会在分区目录下生成子路径
.../table/part/HIVE_UNION_SUBDIR_1/000000_0 .../table/part/HIVE_UNION_SUBDIR_1/000000_1 .../table/part/HIVE_UNION_SUBDIR_2/000000_1
- Data Source 模式读,会忽略子路径,因此读取时返回空结果
- Hive SerDe 模式可以结合 Hadoop 参数 mapred.input.dir.recursive=true 支持递归读取子目录中的数据文件
Parquet 文件 Schema 与表 Schema 映射
- Data Source 模式读取 Parquet 表时,按 Column Name 映射
- Hive SerDe 模式默认按 Column Ordinal 映射,可以通过参数调整行为 parquet.column.index.access=false
写入文件名
- Data Source 模式写入文件时,文件名带 .
. 后缀,如
Hive SerDe 模式写入文件时,开源版本文件名不带 .part-00000-740e0249-c090-4240-90ed-b4e170dd8899-c000.snappy.parquet
. 后缀,如
网易版本自 Spark 3.3.1.17 起可通过配置控制启用part-00000-5a481e57-caf3-471c-9cf3-0ec26e94e7a3-c000
备注:对于 Text、CSV、JSON 等文本格式来讲,压缩作用于文件本身,Spark 读取时会调用对应的 Hadoop InputFormat,该过程将自动根据文件扩展名进行解压;但对于 Parquet 和 ORC 格式的文件来说,压缩作用于文件内的数据段部分,文件名后缀仅起到标识作用,Spark 在读取时会根据文件 metadata 中的压缩算法标识对相应的数据段进行解压缩。spark.sql.hive.fileExtensionParquet.enabled=true spark.sql.hive.fileExtensionOrc.enabled=true
- Data Source 模式写入文件时,文件名带 .
并发写入(极不推荐,有相关需求考虑迁移到 Iceberg)
- 无论是否为分区表,也无论是否写同一分区,Data Source 模式不支持多任务并发写同一张表,也不支持复写读取的分区(常见于小文件合并)
- 可能导致任务失败、数据错乱、丢数据等严重后果,用户若选择使用此功能需自行承担风险,Hive SerDe 模式允许多任务并发写同一张分区表的不同分区,也允许覆写读取的分区(常见于小文件合并)
INSERT OVERWRITE 动态分区写入
- Data Source 模式写入时,仅会调用 HMS API 删除或新增分区,而不会更新已存在的分区元数据;这种行为可能导致一些 OLAP 系统(如 Impala)无法感知分区数据变化
- Hive SerDe 模式写入时,会调用 HMS API 更新作业产出结果中涉及的所有分区的元数据
非分区表、分区表静态分区写入
- Data Source 模式写入时,staging 目录移动次数少,可以降低 NameNode RPC call,但配合 mapreduce.fileoutputcommitter.algorithm.version=2 时存在数据错误风险
- Hive SerDe 模式写入时,staging 目录移动次数多,且有两次发生在 Driver 侧,存在单点瓶颈,但配合 mapreduce.fileoutputcommitter.algorithm.version=2 时仍然可以保证数据正确性。
备注:更多细节可以参阅 Spark 关于 Hadoop commit 算法原理
参数
- Data Source 模式绝大部分 Hive 参数无效;Spark 参数生效
- Hive SerDe 模式下部分 Hive 参数生效;Spark 参数无效
读取 Parquet 文件时对 HDFS Router/NameNode 的压力
- Data Source 模式在使用向量化读取时,自网易内部版本 Spark 3.3.4.89 起,可以大幅降低(75%)对 HDFS Router/NameNode 的 RPC 次数,详见 SPARK-52011;
- Hive SerDe 模式,以及 Data Source 模式的非向量化读取场景,每个 task 读取一个 Parquet File/Spilt 产生 4 次 NameNode RPC 调用,若小文件较多,会给 HDFS Router/NameNode 产生极大的压力。
... 待补充
如何识别任务使用什么模式读写 Hive 表?
Data Source 与 Hive SerDe 模式切换
出于性能、数据质量考虑,Spark 默认使用 Data Source 模式读写 Parquet/ORC 表,但提供了参数用于精细化控制。
- (不推荐,仅应在个别任务使用)使用 Hive SerDe 模式读写 Hive Parquet/ORC 表
spark.sql.hive.convertMetastoreParquet=false spark.sql.hive.convertMetastoreOrc=false
- (推荐)出于性能和兼容性综合考虑,结合网易的内部大规模实践,我们建议使用 Data Source 读、Hive SerDe 写 Hive Parquet/ORC 表
spark.sql.hive.convertMetastoreParquet=true (默认) spark.sql.hive.convertMetastoreOrc=true (默认) spark.sql.hive.convertInsertingPartitionedTable=false spark.sql.hive.convertInsertingUnpartitionedTable=false (开源版 4.0.0 起支持,网易内部版本 3.3.1.43 起支持,详见 SPARK-47850) spark.sql.hive.convertMetastoreCtas=false spark.sql.hive.convertMetastoreInsertDir=false
- 同时配置以下参数,进一步增强 Spark 对 Hive 的兼容性
备注1:spark.sql.storeAssignmentPolicy 参数影响的场景为,当 INSERT INTO table SELECT ... 语句的 table schema 与 SELECT 结果集 schema 不一致时,会发生隐式 CAST,默认不允许非安全的隐式 CAST(如 CAST STRING TO BIGINT 可能导致溢出),参数设置为 LEGACY 时,将保持与 Hive 一致的行为,但可能带来数据质量问题;若目标表为 DataSource v2 表,如 Iceberg,则此参数不允许设为 LEGACY。spark.sql.storeAssignmentPolicy=LEGACY (不推荐) spark.sql.legacy.timeParserPolicy=LEGACY spark.sql.parquet.writeLegacyFormat=true spark.sql.parquet.int96AsTimestamp=true (默认)
备注2:spark.sql.legacy.timeParserPolicy 如何影响 Spark 对日期的处理,可以参考 Spark 3 中 Parquet 古代日期导致的异常分析
以上内容对您是否有帮助?