Hive SQL 连接器
Hive
Hive Catalog
Create Catalog
Sloth平台兼容Flink社区Create Catalog语法
- 连接hive 数据源语法如下:(读取和写入使用同样的配置连接即可)
需要注意的是在任务中使用的集群配置必须先上传至平台,再在任务依赖处引用。具体步骤可参考文件管理
目前不支持多套kerberos认证
开启kerberos 认证的连接配置:
create catalog myhive WITH (
'type' = 'hive', --requird
'default-database'='sloth', --required, hive database
'hive-version'='2.1.1', --required hive version
'hive-site'='mammut-hive-site.xml', --required, hive-site.xml file name
'hdfs-site'='mammut-hdfs-site.xml', --required, hdfs-site.xml file name
'core-site'='mammut-core-site.xml', --required, core-site.xml file name
'warehouse'='hdfs://bdms-test/user/sloth/hive_db', --required, hive warehouse
'krb.keytab'='sloth.keytab', -- optional, if auto.method=kerberos it is required.
'krb.conf'='krb5.conf', -- optional, if auto.method=kerberos it is required.
'krb.principal'='sloth/dev@BDMS.163.COM', -- optional, if auto.method=kerberos it is required.
'auth.method'='kerberos', -- optional, kerberos or simple or using_env_kerberos
);
开启kerberos认证,使用flink-conf中配置的提交任务到hadoop集群需要的krb。注意:当连接hive使用的krb与提交任务使用的一致时需使用该方法,否则会导致任务运行10小时后出现failover:
create catalog myhive WITH (
'type' = 'hive', --requird
'default-database'='sloth', --required, hive database
'hive-version'='2.1.1', --required hive version
'hive-site'='mammut-hive-site.xml', --required, hive-site.xml file name
'hdfs-site'='mammut-hdfs-site.xml', --required, hdfs-site.xml file name
'core-site'='mammut-core-site.xml', --required, core-site.xml file name
'warehouse'='hdfs://bdms-test/user/sloth/hive_db', --required, hive warehouse
'krb.keytab'='sloth.keytab', -- optional, 当auto.method=using_env_kerberos,如果只运行任务可不提供,语法检查与debug需提供.
'krb.conf'='krb5.conf', -- optional, 当auto.method=using_env_kerberos,如果只运行任务可不提供,语法检查与debug需提供.
'krb.principal'='sloth/dev@BDMS.163.COM', -- optional, 当auto.method=using_env_kerberos,如果只运行任务可不提供,语法检查与debug需提供.
'auth.method'='using_env_kerberos', -- optional, kerberos or simple or using_env_kerberos
);
未开启kerberos 认证的连接配置:
create catalog myhive WITH (
'type' = 'hive', --requird
'default-database'='sloth', --required, hive database
'hive-version'='2.1.1', --required hive version
'hive-site'='mammut-hive-site.xml', --required, hive-site.xml file name
'hdfs-site'='mammut-hdfs-site.xml', --required, hdfs-site.xml file name
'core-site'='mammut-core-site.xml', --required, core-site.xml file name
'warehouse'='hdfs://bdms-test/user/sloth/hive_db', --required, hive warehouse
'auth.method'='simple', -- optional, kerberos or simple or using_env_kerberos
'simple.user.name' = 'sloth', --用户名,非必填
);
同时也可以使用metahub 的方式在数据源处引入,使用catalog.库名.表名的方式在任务中使用即可,同时所依赖的配置文件需要通过set 语法设置
使用Hive数据源
Hive Read
- Hive Connector支持流/批的方式读取hive表。
- 流任务可以定时监控新数据,并emit到下游。
- 流任务支持读取分区表和非分区表,对于分区表,监控分区生成;对于非分区表,监控新文件生成,然后读取文件。
- hive 读默认会自动推导source并行度,如不需要,可将table.exec.hive.infer-source-parallelism设为false。如果因为推导出的并行度过大,可通过配置table.exec.hive.infer-source-parallelism.max,限制最大并行度。
维表
可以在Flink Sql中Join Hive Dim表。
- 对于dim表是分区表来说,flink可以像读取无解数据一样读取,每个分区时生效触发Hive Source自动读取dim表数据并加载在flink内存中。(仅在流任务有效)
- 对于dim表是非分区表的话,会一次性加载所有dim表数据到flink内存
- 对于任务是批任务的场景,会一次性加载所有分区数据到flink内存
Properties:
key | default | type | description |
---|---|---|---|
streaming-source.enable | false | Boolean | 是否能流式读取。保证分区可见与分区内数据的原子性,否则有可能读取不完整数据 |
streaming-source.partition.include | all | String | 支持all/latest。all表示读取所有分区数据,latest仅在流任务,并且读取维表时使用。 |
streaming-source.monitor-interval | None | Duration | 支持’1 m’, ‘60 m’, ‘12 h’。 周期监控最新分区/数据文件周期 |
streaming-source.partition-order | partition-name | String | 支持partition-name/create-time/partition-time |
streaming-source.consume-start-offset | None | String | 开始流读取的分区,搭配partition-order来使用 |
Source并行度:
key | default | type | description |
---|---|---|---|
table.exec.hive.infer-source-parallelism | true | Boolean | True:Hive source的并发度由Table 分区/文件数/InputSplit进行计算所得;False:使用用户配置的并发度 |
table.exec.hive.infer-source-parallelism.max | 1000 | Integer | 最大并发度 |
DIM Cache TTL:
当配置streaming-source.partition.include=all
加载整个hive表数据加载到内存进行Dim Join时,可以配置lookup.join.cache.ttl=60 min
,
每六十分钟对cache过期,并且过期后会重新load最新的hive数据到内存。
key | default | type | description |
---|---|---|---|
lookup.join.cache.ttl | 60 min | Duration | 维表关联缓存在内存的TTL,默认60分钟. NOTES: The option only works when lookup bounded hive table source, if you’re using streaming hive source as temporal table, please use ‘streaming-source.monitor-interval’ to configure the interval of data update. |
lookup.cache.metric.enable | false | Boolean | 是否开启缓存监控。可以在flink-webui->Running Jobs-> 点击要查看的任务名-> 点击要查看的chainedOperator->右边的metrics页->Add Metric框:搜索cache进行查看。注意:目前前端有bug,命中率等小数无法看到真实值,只能看到0或1,需看与后端交互的接口返回值。监控指标有:XXX.cache.cache-record-count:缓存的记录数 |
查询优化:
key | default | type | description |
---|---|---|---|
table.optimizer.source.filter-snapshot-transpose-enabled | true | Boolean | 1.12可用:hive作为维表关联join时,是否进行过滤条件下推,可以减少hive source缓存的数据量。目前仅支持过滤条件:=,>,<,>=,<=,<>,is null, is not null, is true, is false, is not true, is not false;函数:upper(field_name),lower(field_name),field_name=value;组合条件:and/or。 e.g. WHERE dim.user_id IS NOT NULL AND dim.user_id <> ‘’;WHERE dim.age > 60 OR CAST(dim.hr as int) = 8;WHERE dim.hr = ‘8’ IS NOT FALSE |
Dim Demo:
kafka流表与hive维表关联,设置 streaming-source.enable=true
和 streaming-source.partition.include=latest
以支持维表关联最新分区的数据。
CREATE TABLE orders_table (
order_id STRING,
order_amount DOUBLE,
product_id STRING,
log_ts TIMESTAMP(3),
proctime as PROCTIME()
) WITH (...);
-- streaming sql, kafka temporal join a hive dimension table. Flink will automatically reload data from the
-- configured latest partition in the interval of 'streaming-source.monitor-interval'.
SELECT * FROM orders_table AS o
JOIN hive_catalog.hive_db.dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
ON o.product_id = dim.product_id;
Hive Write
注意:hive sink 使用前面的catalog 方式连接hive即可,hive表可以提前在hive 中创建,也可以在flink sql 任务中创建,字段可以直接使用。下面案列提前在hive中创建表 写hive 时支持分区表和非分区表,分区字段取值在sql 中指定即可。 默认任务创建生成的分区,在hive metastore 中不会自行创建,需要手动添加分区后可查询。 也可以通过sql hints 在任务中指定自动申报,指定后,分区会自行构建。
创建表demo:
create table sloth.ods_water_sensor(
sensor_id String,
ts timestamp,
water_level double
)partitioned by (ds string)
stored as parquet;
-- hive 数据库中执行,必须提前创建。
未指定分区自行构建参数,需要手动添加分区,sql 如下(在查询数据时hive库中执行):
alter table sloth.ods_water_sensor add if not exists partition(ds='2021-11-16');
flink sql开发demo:
--SQL
-- 注册kafkasource
create table water_sensor(
sensor_id VARCHAR,
ts timestamp,
water_level double
)with(
'connector' = 'kafka',
'topic' = 'demo_1',
'scan.startup.mode' = 'latest-offset',
'properties.bootstrap.servers' = 'xxx',
'properties.zookeeper.connect' = 'xxx',
'format' = 'json',
'properties.group.id' = 'test'
);
-- 注册hivecatalog
create catalog hive_catalog WITH (
'type' = 'hive',
'default-database'='sloth',
'hive-version'='2.1.1',
'hive-site'='hive-site.xml',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'warehouse'='hdfs://bdms-test/user/sloth/hive_db',
'krb.keytab'='sloth.keytab',
'krb.conf'='krb5.conf',
'krb.principal'='sloth/dev@BDMS.163.COM',
'auth.method'='kerberos',
'sys.db.url'='',
'sys.db.user'='',
'sys.db.password'=''
);
insert into hive_catalog.sloth.ods_water_sensor /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */ -- sql Hints 指定刷写数据时自动创建分区
select
sensor_id,
ts,
water_level,
date_format(ts, 'yyyy-MM-dd') as ds
from water_sensor;
使用DDL 方式开发时,sys 开头参数为解析必备的系统参数,无实际含义,但构建是需要保留
在sink 过程中,可以控制表写入及刷新的一些参数。可以利用sql hints 语法进行设置
具体使用可参考sql Hints
作业查询结果如下:
Hive Formats
hive表支持以下几种数据格式:
- Text
- CSV
- SequenceFile
- ORC
- Parquet
Hive Function
可以在Flink Sql中使用Hive自定义函数,需要将Hive Udf类/Jar添加到任务依赖中, 不然flink任务会报class not find异常。
支持hive:
- UDF
- GenericUDF
- GenericUDTF
- GenericUDAF
Hive UDX:
TestHiveSimpleUDF
package com.netease.sloth.hive.exmaples.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
/**
* Test simple udf. Registered under name 'myudf'
*/
public class TestHiveSimpleUDF extends UDF {
public IntWritable evaluate(IntWritable i) {
return new IntWritable(i.get());
}
public Text evaluate(Text text) {
return new Text(text.toString());
}
}
TestHiveGenericUDF
package com.netease.sloth.hive.exmaples.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Test generic udf. Registered under name 'mygenericudf'
*/
public class TestHiveGenericUDF extends GenericUDF {
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
checkArgument(arguments.length == 2);
checkArgument(arguments[1] instanceof ConstantObjectInspector);
Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
checkArgument(constant instanceof IntWritable);
checkArgument(((IntWritable) constant).get() == 1);
if (arguments[0] instanceof IntObjectInspector ||
arguments[0] instanceof StringObjectInspector) {
return arguments[0];
} else {
throw new RuntimeException("Not support argument: " + arguments[0]);
}
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
return arguments[0].get();
}
@Override
public String getDisplayString(String[] children) {
return "TestHiveGenericUDF";
}
}
TestHiveUDTF
package com.netease.sloth.hive.exmaples.udf;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;
import java.util.Collections;
import static com.google.common.base.Preconditions.checkArgument;
/**
* Test split udtf. Registered under name 'mygenericudtf'
*/
public class TestHiveUDTF extends GenericUDTF {
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
checkArgument(argOIs.length == 2);
// TEST for constant arguments
checkArgument(argOIs[1] instanceof ConstantObjectInspector);
Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
checkArgument(constant instanceof IntWritable);
checkArgument(((IntWritable) constant).get() == 1);
return ObjectInspectorFactory.getStandardStructObjectInspector(
Collections.singletonList("col1"),
Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
}
@Override
public void process(Object[] args) throws HiveException {
String str = (String) args[0];
for (String s : str.split(",")) {
forward(s);
forward(s);
}
}
@Override
public void close() {
}
}
Hive Shell
hive> add jar /home/sloth/tmp_yexx/original-hive-udf-1.0.jar;
Added [/home/sloth/tmp_yexx/original-hive-udf-1.0.jar] to class path
Added resources: [/home/sloth/tmp_yexx/original-hive-udf-1.0.jar]
hive> create function myudf as 'com.netease.sloth.hive.exmaples.udf.TestHiveSimpleUDF';
OK
Time taken: 1.59 seconds
hive> create function mygenericudf as 'com.netease.sloth.hive.exmaples.udf.TestHiveGenericUDF';
OK
Time taken: 0.014 seconds
hive> create function mygenericudtf as 'com.netease.sloth.hive.exmaples.udf.TestHiveUDTF';
OK
Flink Sql
select
mygenericudf(myudf(name), 1) as a,
mygenericudf(myudf(id), 1) as b,
s
from
source,
lateral table(mygenericudtf(name, 1)) as T(s)
1.14
hive 数据源
Hive Read
Properties:
key | default | type | description |
---|---|---|---|
streaming-source.enable | false | Boolean | 是否能流式读取。保证分区可见与分区内数据的原子性,否则有可能读取不完整数据 |
streaming-source.partition.include | all | String | 支持all/latest/specific。all表示读取所有分区数据,latest表示读取最近(以partition-order排序最大值)分区,specific表示以partition-order排序倒序的指定区间 |
partition.latest.size | 1 | Integer | streaming-source.partition.include必须为latest。表示读取的最近分区数量,默认1代表取最近(大)的一个分区。以partition-order排序 |
partition.specific.from | 0 | Integer | streaming-source.partition.include必须为specific。指定分区的开始下标,以partition-order排序,从大到小,从0开始。配合partition.specific.end。必须<=end。e.g.有分区[2021-04-08,2021-04-09,2021-04-10,2021-04-11],from 1, end 3,代表:[2021-04-08,2021-04-09,2021-04-10] |
partition.specific.end | 0 | Integer | streaming-source.partition.include必须为specific。指定分区的结束下标,以partition-order排序,从大到小,从0开始。配合partition.specific.from。必须>=from。e.g.有分区[2021-04-08,2021-04-09,2021-04-10,2021-04-11],from 1, end 3,代表:[2021-04-08,2021-04-09,2021-04-10] |
streaming-source.monitor-interval | None | Duration | 支持’1 m’, ‘60 m’, ‘12 h’。 周期监控最新分区/数据文件周期 |
streaming-source.partition-order | partition-name | String | 支持partition-name/create-time/partition-time |
streaming-source.consume-start-offset | None | String | 开始流读取的分区,搭配partition-order来使用 |
示例
create catalog music with (
'type' = 'hive',
'hive-version'='2.1.1',
'hive-site'='hive-site.xml',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'auth.method'='kerberos',
'krb.keytab'='da_music.keytab',
'krb.conf'='krb5.conf',
'krb.principal'='da_music/dev@HADOOP.HZ.NETEASE.COM',
'default-database'='music_db_test',
'streaming-source.partition.include' = 'specific',
'streaming-source.partition-order' = 'partition-time',
-- 分区时间戳的抽取格式。默认的 extractor 需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。可自定义抽取器
'partition.time-extractor.timestamp-pattern' = '$dt 00:00:00',
-- 'partition.latest.size' = '2'
'partition.specific.from' = '1',
'partition.specific.end' = '1'
);
以上内容对您是否有帮助?