Hive

Hive Catalog

Create Catalog

Sloth平台兼容Flink社区Create Catalog语法

  • 连接hive 数据源语法如下:(读取和写入使用同样的配置连接即可)
    需要注意的是在任务中使用的集群配置必须先上传至平台,再在任务依赖处引用。具体步骤可参考文件管理

目前不支持多套kerberos认证

开启kerberos 认证的连接配置:
create catalog myhive WITH (
    'type' = 'hive',        --requird 
    'default-database'='sloth', --required, hive database
    'hive-version'='2.1.1', --required hive version
    'hive-site'='mammut-hive-site.xml', --required, hive-site.xml file name
    'hdfs-site'='mammut-hdfs-site.xml', --required, hdfs-site.xml file name
    'core-site'='mammut-core-site.xml', --required, core-site.xml file name
    'warehouse'='hdfs://bdms-test/user/sloth/hive_db', --required, hive warehouse
    'krb.keytab'='sloth.keytab', -- optional, if auth.method=kerberos it is required.
    'krb.conf'='krb5.conf',  -- optional, if auth.method=kerberos it is required.
    'krb.principal'='sloth/dev@BDMS.163.COM',  -- optional, if auth.method=kerberos it is required.
    'auth.method'='kerberos',  -- optional, kerberos or simple or using_env_kerberos
);

开启kerberos认证,使用flink-conf中配置的提交任务到hadoop集群需要的krb。注意:当连接hive使用的krb与提交任务使用的一致时需使用该方法,否则会导致任务运行10小时后出现failover
create catalog myhive WITH (
    'type' = 'hive',        --requird 
    'default-database'='sloth', --required, hive database
    'hive-version'='2.1.1', --required hive version
    'hive-site'='mammut-hive-site.xml', --required, hive-site.xml file name
    'hdfs-site'='mammut-hdfs-site.xml', --required, hdfs-site.xml file name
    'core-site'='mammut-core-site.xml', --required, core-site.xml file name
    'warehouse'='hdfs://bdms-test/user/sloth/hive_db', --required, hive warehouse
    'krb.keytab'='sloth.keytab', -- optional, auth.method=using_env_kerberos,如果只运行任务可不提供,语法检查与debug需提供.
    'krb.conf'='krb5.conf',  -- optional, auth.method=using_env_kerberos,如果只运行任务可不提供,语法检查与debug需提供.
    'krb.principal'='sloth/dev@BDMS.163.COM',  -- optional, auth.method=using_env_kerberos,如果只运行任务可不提供,语法检查与debug需提供.
    'auth.method'='using_env_kerberos',  -- optional, kerberos or simple or using_env_kerberos
);

未开启kerberos 认证的连接配置:
create catalog myhive WITH (
    'type' = 'hive',        --requird 
    'default-database'='sloth', --required, hive database
    'hive-version'='2.1.1', --required hive version
    'hive-site'='mammut-hive-site.xml', --required, hive-site.xml file name
    'hdfs-site'='mammut-hdfs-site.xml', --required, hdfs-site.xml file name
    'core-site'='mammut-core-site.xml', --required, core-site.xml file name
    'warehouse'='hdfs://bdms-test/user/sloth/hive_db', --required, hive warehouse
    'auth.method'='simple',  -- optional, kerberos or simple or using_env_kerberos
    'simple.user.name' = 'sloth', --用户名,非必填
);

同时也可以使用metahub 的方式在数据源处引入,使用catalog.库名.表名的方式在任务中使用即可,同时所依赖的配置文件需要通过set 语法设置

使用Hive数据源

官方文档

Hive Read

  • Hive Connector支持流/批的方式读取hive表。
  • 流任务可以定时监控新数据,并emit到下游。
  • 流任务支持读取分区表和非分区表,对于分区表,监控分区生成;对于非分区表,监控新文件生成,然后读取文件。

  • hive 读默认会自动推导source并行度,如不需要,可将table.exec.hive.infer-source-parallelism设为false。如果因为推导出的并行度过大,可通过配置table.exec.hive.infer-source-parallelism.max,限制最大并行度。

    维表

可以在Flink Sql中Join Hive Dim表。

  • 对于dim表是分区表来说,flink可以像读取无解数据一样读取,每个分区时生效触发Hive Source自动读取dim表数据并加载在flink内存中。(仅在流任务有效)
  • 对于dim表是非分区表的话,会一次性加载所有dim表数据到flink内存
  • 对于任务是批任务的场景,会一次性加载所有分区数据到flink内存

Properties:

key default type description
streaming-source.enable false Boolean 是否能流式读取。保证分区可见与分区内数据的原子性,否则有可能读取不完整数据
streaming-source.partition.include all String 支持all/latest。all表示读取所有分区数据,latest仅在流任务,并且读取维表时使用。
streaming-source.monitor-interval None Duration 支持’1 m’, ‘60 m’, ‘12 h’。 周期监控最新分区/数据文件周期
streaming-source.partition-order partition-name String 支持partition-name/create-time/partition-time
streaming-source.consume-start-offset None String 开始流读取的分区,搭配partition-order来使用

Source并行度:

key default type description
table.exec.hive.infer-source-parallelism true Boolean True:Hive source的并发度由Table 分区/文件数/InputSplit进行计算所得;False:使用用户配置的并发度
table.exec.hive.infer-source-parallelism.max 1000 Integer 最大并发度

DIM Cache TTL:

当配置streaming-source.partition.include=all加载整个hive表数据加载到内存进行Dim Join时,可以配置lookup.join.cache.ttl=60 min, 每六十分钟对cache过期,并且过期后会重新load最新的hive数据到内存。

key default type description
lookup.join.cache.ttl 60 min Duration 维表关联缓存在内存的TTL,默认60分钟. NOTES: The option only works when lookup bounded hive table source, if you’re using streaming hive source as temporal table, please use ‘streaming-source.monitor-interval’ to configure the interval of data update.
lookup.cache.metric.enable false Boolean 是否开启缓存监控。可以在flink-webui->Running Jobs-> 点击要查看的任务名-> 点击要查看的chainedOperator->右边的metrics页->Add Metric框:搜索cache进行查看。注意:目前前端有bug,命中率等小数无法看到真实值,只能看到0或1,需看与后端交互的接口返回值。监控指标有:XXX.cache.cache-record-count:缓存的记录数

查询优化:

key default type description
table.optimizer.source.filter-snapshot-transpose-enabled true Boolean 1.12可用:hive作为维表关联join时,是否进行过滤条件下推,可以减少hive source缓存的数据量。目前仅支持过滤条件:=,>,<,>=,<=,<>,is null, is not null, is true, is false, is not true, is not false;函数:upper(field_name),lower(field_name),field_name=value;组合条件:and/or。 e.g. WHERE dim.user_id IS NOT NULL AND dim.user_id <> ‘’;WHERE dim.age > 60 OR CAST(dim.hr as int) = 8;WHERE dim.hr = ‘8’ IS NOT FALSE

Dim Demo:

kafka流表与hive维表关联,设置 streaming-source.enable=truestreaming-source.partition.include=latest 以支持维表关联最新分区的数据。

CREATE TABLE orders_table (
   order_id STRING,
   order_amount DOUBLE,
   product_id STRING,
   log_ts TIMESTAMP(3),
   proctime as PROCTIME()
 ) WITH (...);

 -- streaming sql, kafka temporal join a hive dimension table. Flink will automatically reload data from the
 -- configured latest partition in the interval of 'streaming-source.monitor-interval'.

 SELECT * FROM orders_table AS o
 JOIN hive_catalog.hive_db.dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
 ON o.product_id = dim.product_id;

流读 Demo:

  1. 使用HINT的方式让上述 Properties 生效。即/*+ OPTIONS()*/
set 'table.dynamic-table-options.enabled' = 'true';
-- 注册hivecatalog
create catalog hive_catalog WITH (
    'type' = 'hive',
    'hive-version'='2.1.1',
    'hive-site'='krb-hive-site.xml',
    'hdfs-site'='krb-hdfs-site.xml',
    'core-site'='krb-core-site.xml',
    'auth.method'='kerberos',
    'krb.keytab'='sloth.keytab',
    'krb.principal'='sloth/dev@BDMS.163.COM',
    'krb.conf'='krb5.conf',
    'is.file.name.encrypt'='false',
    'default-database'='default'
);

CREATE TABLE print_table WITH ('connector' = 'print')
    LIKE `hive_catalog`.sloth.user_dim (EXCLUDING ALL);

insert into print_table
select * from `hive_catalog`.sloth.user_dim
/*+ OPTIONS('streaming-source.enable'='true'
  ,'streaming-source.partition-order'='create-time'
  ,'streaming-source.monitor-interval'='20s') */;
  1. 使用元数据catalog时,利用set方式让 Properties 参数生效
set 'user_dim.streaming-source.enable'='true';
set 'user_dim.streaming-source.partition-order'='create-time';
set 'user_dim.streaming-source.monitor-interval'='20s';

CREATE TABLE print_table WITH ('connector' = 'print')
    LIKE `hive_catalog`.sloth.user_dim (EXCLUDING ALL);

insert into print_table
select * from `hive_catalog`.sloth.user_dim;

Hive Write

注意:hive sink 使用前面的catalog 方式连接hive即可,hive表可以提前在hive 中创建,也可以在flink sql 任务中创建,字段可以直接使用。下面案列提前在hive中创建表 写hive 时支持分区表和非分区表,分区字段取值在sql 中指定即可。 默认任务创建生成的分区,在hive metastore 中不会自行创建,需要手动添加分区后可查询。 也可以通过sql hints 在任务中指定自动申报,指定后,分区会自行构建。

创建表demo:
create table sloth.ods_water_sensor(
    sensor_id String,
    ts timestamp,
    water_level double
)partitioned by (ds string)
stored as parquet;
-- hive 数据库中执行,必须提前创建。

未指定分区自行构建参数,需要手动添加分区,sql 如下(在查询数据时hive库中执行):
alter table sloth.ods_water_sensor add if not exists partition(ds='2021-11-16');

flink sql开发demo:
--SQL
-- 注册kafkasource 
create table water_sensor(
    sensor_id VARCHAR,
    ts timestamp,
    water_level double
)with(
        'connector' = 'kafka',
        'topic' = 'demo_1',
        'scan.startup.mode' = 'latest-offset',
        'properties.bootstrap.servers' = 'xxx',
        'properties.zookeeper.connect' = 'xxx',
        'format' = 'json',
        'properties.group.id' = 'test'
        );

-- 注册hivecatalog
create catalog hive_catalog WITH (
    'type' = 'hive',       
    'default-database'='sloth', 
    'hive-version'='2.1.1', 
    'hive-site'='hive-site.xml', 
    'hdfs-site'='hdfs-site.xml', 
    'core-site'='core-site.xml', 
    'warehouse'='hdfs://bdms-test/user/sloth/hive_db', 
    'krb.keytab'='sloth.keytab', 
    'krb.conf'='krb5.conf',
    'krb.principal'='sloth/dev@BDMS.163.COM',
    'auth.method'='kerberos',
    'sys.db.url'='',
    'sys.db.user'='',
    'sys.db.password'=''
);

insert into hive_catalog.sloth.ods_water_sensor   /*+ OPTIONS('sink.partition-commit.policy.kind'='metastore,success-file') */ -- sql Hints 指定刷写数据时自动创建分区
    select 
        sensor_id,
        ts,
        water_level,
        date_format(ts, 'yyyy-MM-dd') as ds
    from water_sensor;

使用DDL 方式开发时,sys 开头参数为解析必备的系统参数,无实际含义,但构建是需要保留 在sink 过程中,可以控制表写入及刷新的一些参数。可以利用sql hints 语法进行设置 具体使用可参考sql Hints
作业查询结果如下: Hive SQL 连接器 - 图1

Hive Formats

hive表支持以下几种数据格式:

  • Text
  • CSV
  • SequenceFile
  • ORC
  • Parquet

Hive Function

可以在Flink Sql中使用Hive自定义函数,需要将Hive Udf类/Jar添加到任务依赖中, 不然flink任务会报class not find异常。

支持hive:

  • UDF
  • GenericUDF
  • GenericUDTF
  • GenericUDAF

Hive UDX:

TestHiveSimpleUDF

package com.netease.sloth.hive.exmaples.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

/**
 * Test simple udf. Registered under name 'myudf'
 */
public class TestHiveSimpleUDF extends UDF {
   public IntWritable evaluate(IntWritable i) {
      return new IntWritable(i.get());
   }

   public Text evaluate(Text text) {
      return new Text(text.toString());
   }
}

TestHiveGenericUDF

package com.netease.sloth.hive.exmaples.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.IntWritable;

import static com.google.common.base.Preconditions.checkArgument;


/**
 * Test generic udf. Registered under name 'mygenericudf'
 */
public class TestHiveGenericUDF extends GenericUDF {

   @Override
   public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
      checkArgument(arguments.length == 2);

      checkArgument(arguments[1] instanceof ConstantObjectInspector);
      Object constant = ((ConstantObjectInspector) arguments[1]).getWritableConstantValue();
      checkArgument(constant instanceof IntWritable);
      checkArgument(((IntWritable) constant).get() == 1);

      if (arguments[0] instanceof IntObjectInspector ||
            arguments[0] instanceof StringObjectInspector) {
         return arguments[0];
      } else {
         throw new RuntimeException("Not support argument: " + arguments[0]);
      }
   }

   @Override
   public Object evaluate(DeferredObject[] arguments) throws HiveException {
      return arguments[0].get();
   }

   @Override
   public String getDisplayString(String[] children) {
      return "TestHiveGenericUDF";
   }
}

TestHiveUDTF

package com.netease.sloth.hive.exmaples.udf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;

import java.util.Collections;

import static com.google.common.base.Preconditions.checkArgument;

/**
 * Test split udtf. Registered under name 'mygenericudtf'
 */
public class TestHiveUDTF extends GenericUDTF {

   @Override
   public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
      checkArgument(argOIs.length == 2);

      // TEST for constant arguments
      checkArgument(argOIs[1] instanceof ConstantObjectInspector);
      Object constant = ((ConstantObjectInspector) argOIs[1]).getWritableConstantValue();
      checkArgument(constant instanceof IntWritable);
      checkArgument(((IntWritable) constant).get() == 1);

      return ObjectInspectorFactory.getStandardStructObjectInspector(
            Collections.singletonList("col1"),
            Collections.singletonList(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
   }

   @Override
   public void process(Object[] args) throws HiveException {
      String str = (String) args[0];
      for (String s : str.split(",")) {
         forward(s);
         forward(s);
      }
   }

   @Override
   public void close() {
   }
}

Hive Shell

hive> add jar /home/sloth/tmp_yexx/original-hive-udf-1.0.jar;
Added [/home/sloth/tmp_yexx/original-hive-udf-1.0.jar] to class path
Added resources: [/home/sloth/tmp_yexx/original-hive-udf-1.0.jar]
hive> create function myudf as 'com.netease.sloth.hive.exmaples.udf.TestHiveSimpleUDF';
OK
Time taken: 1.59 seconds
hive> create function mygenericudf as 'com.netease.sloth.hive.exmaples.udf.TestHiveGenericUDF';
OK
Time taken: 0.014 seconds
hive> create function mygenericudtf as 'com.netease.sloth.hive.exmaples.udf.TestHiveUDTF';
OK

Flink Sql

select 
    mygenericudf(myudf(name), 1) as a,
    mygenericudf(myudf(id), 1) as b,
    s
from 
    source,
    lateral table(mygenericudtf(name, 1)) as T(s)

1.14

hive 数据源

Hive Read

Properties:

key default type description
streaming-source.enable false Boolean 是否能流式读取。保证分区可见与分区内数据的原子性,否则有可能读取不完整数据
streaming-source.partition.include all String 支持all/latest/specific。all表示读取所有分区数据,latest表示读取最近(以partition-order排序最大值)分区,specific表示以partition-order排序倒序的指定区间
partition.latest.size 1 Integer streaming-source.partition.include必须为latest。表示读取的最近分区数量,默认1代表取最近(大)的一个分区。以partition-order排序
partition.specific.from 0 Integer streaming-source.partition.include必须为specific。指定分区的开始下标,以partition-order排序,从大到小,从0开始。配合partition.specific.end。必须<=end。e.g.有分区[2021-04-08,2021-04-09,2021-04-10,2021-04-11],from 1, end 3,代表:[2021-04-08,2021-04-09,2021-04-10]
partition.specific.end 0 Integer streaming-source.partition.include必须为specific。指定分区的结束下标,以partition-order排序,从大到小,从0开始。配合partition.specific.from。必须>=from。e.g.有分区[2021-04-08,2021-04-09,2021-04-10,2021-04-11],from 1, end 3,代表:[2021-04-08,2021-04-09,2021-04-10]
streaming-source.monitor-interval None Duration 支持’1 m’, ‘60 m’, ‘12 h’。 周期监控最新分区/数据文件周期
streaming-source.partition-order partition-name String 支持partition-name/create-time/partition-time
streaming-source.consume-start-offset None String 开始流读取的分区,搭配partition-order来使用

示例

create catalog music with (
'type' = 'hive',
'hive-version'='2.1.1',
'hive-site'='hive-site.xml',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'auth.method'='kerberos',
'krb.keytab'='da_music.keytab',
'krb.conf'='krb5.conf',
'krb.principal'='da_music/dev@HADOOP.HZ.NETEASE.COM',
'default-database'='music_db_test',
'streaming-source.partition.include' = 'specific',
'streaming-source.partition-order' = 'partition-time',
--     分区时间戳的抽取格式。默认的 extractor 需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。可自定义抽取器
'partition.time-extractor.timestamp-pattern' = '$dt 00:00:00',
--   'partition.latest.size' = '2'
'partition.specific.from' = '1',
'partition.specific.end' = '1'
);