1.14

简述

  • 无缝衔接混合数据源。用户在使用Flink引擎处理binlog CDC数据、机器样本学习、离线任务实时化等场景时,需要考虑先加载历史存量数据,然后无缝衔接增量数据进行计算。
  • 用户可以配置多个不同数据源。
  • 注意:使用时用户需要保证前几个数据源是 bounded,只有最后一个数据源可以是 unbounded。建表时,hybrid表的库名default

  • 下表为数据源对 bounded、unbounded 的支持情况

Name Source
Filesystem Bounded and Unbounded Scan, Lookup
ES Not supported
Kafka Unbounded Scan
JDBC Bounded Scan, Lookup
Hbase Bounded Scan, Lookup
Hive Bounded and Unbounded Scan, Lookup

示例

示例1-(hive ddl、kafka 元数据中心配置流表)

  • 历史全量数据存在hive中,每个分区都是一个全量数据快照,需要利用根据分区名定义的时间排序最近的一个分区的数据进行状态初始化。
  • 再读kafka中指定offset的实时增量数据,继续任务
  • 其中: hive与kafka中存在部分字段名称不同

-- 创建hive catalog
create catalog hive_test with (
'type' = 'hive',
'hive-version'='2.1.1',
'hive-site'='hive-site.xml',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'auth.method'='simple',
'simple.user.name'='hdfs',
'default-database'='default'
);

-- 在元数据中心定义kafka流表,并在此配置表启动相关参数
set 'user_action.properties.group.id' = 'test';
set 'user_action.scan.startup.mode' = 'timestamp';
set 'user_action.scan.startup.timestamp-millis' = '1636354739003';
set 'user_action.value.json.ignore-parse-errors' = 'true';
set 'user_action.format' = 'json';

-- 创建hybridcatalog
create catalog hybrid with ('type' = 'hybrid');

-- hybrid
create table hybrid.`default`.hybrid_table
(
    user_id     VARCHAR,
    itemId     VARCHAR,
    category_id VARCHAR,
    behavior    VARCHAR
) with (
      'connector' = 'hybrid',
-- 子表不允许为空,子表list,用","分隔,会按此配置顺序读子数据源
      'tables' = 'hive_test.`default`.user_action, sloth_autotest_kafka_eleven.ztes.user_action',
-- 数字对应tables中表list的下标,必须按顺序对应,从1开始;或者直接填表全名,如'hive_test.`default`.user_action.connector'='XXX'。通过指定表名配置不同表的具体配置,该配置与对应的connector配置一致
-- streaming-source.partition.include:是hive connector配置,latest表示读取最新的分区,all表示读取所有分区数据。
      '1.streaming-source.partition.include' = 'latest',
-- 可选,读取最近分区数,默认为1,仅在streaming-source.partition.include=latest时有效。
      '1.partition.latest.size' = '1',
      -- 可选。三种顺序配置:create-time代表数据的最近修改时间, partition-time表示分区时间, partition-name表示分区名称
      '1.streaming-source.partition-order' = 'partition-time',
--     分区时间戳的抽取格式。默认的 extractor 需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。可自定义抽取器
      '1.partition.time-extractor.timestamp-pattern' = '$da $hr:00:00',
-- 可选。不填以hybrid `hybrid_table` 表的 schema作为子表schema。定义子schema,配置子表schemahybrid_table schema映射关系,必须与上方hybrid_table schema的字段配置顺序一致,子表必须包含hybrid_table表所有字段,允许名称不同,字段类型必须要相同。
      '1.fields' = 'user_id,item_id,category_id,behavior'
      );

CREATE TABLE print_table WITH ('connector' = 'print')
    LIKE `hybrid`.`default`.`hybrid_table`
(
    EXCLUDING ALL
);

insert into print_table
select *
from hybrid.`default`.hybrid_table;

示例2-(hive ddl、kafka 在hybrid表中通过ddl创建)

  1. 需先创建子catalog
    create catalog myHive with (
    'type' = 'hive',
    'hive-version'='2.1.1');
  2. 创建 hybridCatalog,可以初始化一个默认的
    create catalog myHybrid with ('type' = 'hybrid');
  3. 仅有一个默认的库default,必须带上,或者先 "use catalog myHybrid;" 后,直接使用表名 ``sql create tablemyHybrid.default.hybrid_table` ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR) with ( 'connector' = 'hybrid',

-- 子表不允许为空,子表list,用","分隔,会按此配置顺序读子数据源。这里的ua_kafka为任意自定义表名,将表的相关配置放在下方的 2.XXX 中,由此通过ddl定义子表。 'tables' = 'myHive.default.user_action, ua_kafka',

-- 数字对应tables中表list的下标,必须按顺序对应,从1开始;或者直接填表全名,如'myHive.default.user_action.connector'='XXX'。通过指定表名配置不同表的具体配置,该配置与对应的connector配置一致

-- myHive.default.user_action表配置 -- 可选。配置读取分区,latest、all,latest根据partition-order配置的顺序取值top n(即latest.partition.size配置的值)分区 '1.streaming-source.partition.include' = 'latest',

-- 可选。默认为1 '1.partition.latest.size' = '2',

-- 可选。三种顺序配置:create-time代表最近修改时间, partition-time, partition-name '1.streaming-source.partition-order' = 'partition-time',

-- 可选。分区时间戳的抽取格式。默认的 extractor 需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段名(e.g. da, hr)做占位符替换。可自定义抽取器 'myHive.default.user_action.partition.time-extractor.timestamp-pattern' = '$da $hr:00:00',

-- 可选。不填以hybrid hybrid_table 表的 schema作为子表schema。定义子schema,配置子表schema与hybrid_table schema映射关系,必须与上方hybrid_table schema的字段配置顺序一致,子表必须包含hybrid_table表所有字段,允许名称不同,字段类型必须要相同。 '1.fields' = 'userId,item_id,categoryid,behavior',

'2.connector' = 'kafka',

'2.topic' = 'user_action',

-- 指定消费的起点,默认latest-offset。e.g. ’earliest-offset’,’latest-offset’,’group-offsets’,’timestamp’ 和 ‘specific-offsets’. -- 未来增加continue:从上个数据源结束的时间开始。 如何定义数据源结束时间待定 '2.scan.startup.mode' = 'earliest-offset',

'2.properties.bootstrap.servers' = '10.122.173.110:9292',

'2.format' = 'json' );

4. 将其视为普通的source表使用,只能用于source
```sql
insert into XXX
select user_id, item_id from `


`.`default`.`hybrid_table` ;

示例3-(hive ddl、kafka ddl独立定义)

也可以先提前建好子表,再组合起来

create catalog hive_test with (
'type' = 'hive',
'hive-version'='2.1.1',
'hive-site'='hive-site.xml',
'hdfs-site'='hdfs-site.xml',
'core-site'='core-site.xml',
'auth.method'='simple',
'simple.user.name'='hdfs',
'default-database'='default'
);

--     {"user_id":"12","item_id":"18","category_id":"ad","behavior":"kfk1"}
create table `ua_kafka`
(
    user_id     VARCHAR,
    item_id     VARCHAR,
    category_id VARCHAR,
    behavior    VARCHAR
) with (
  'connector' = 'kafka',
  'topic' = 'user_action',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = '10.122.173.110:9292',
  'format' = 'json'
);

create catalog hybrid with ('type' = 'hybrid');

create table hybrid.`default`.hybrid_table
(
    user_id     VARCHAR,
    item_id     VARCHAR,
    category_id VARCHAR,
    behavior    VARCHAR
) with (
      'connector' = 'hybrid',
      'tables' = 'hive_test.`default`.user_action, ua_kafka',
      '1.streaming-source.partition.include' = 'latest',
      '1.streaming-source.partition-order' = 'partition-time',
--     分区时间戳的抽取格式。默认的 extractor 需要写成 yyyy-MM-dd HH:mm:ss 的形式,并用 Hive 表中相应的分区字段做占位符替换。可自定义抽取器
      '1.partition.time-extractor.timestamp-pattern' = '$da $hr:00:00',
      '1.partition.latest.size' = '1'
);

CREATE TABLE print_table WITH ('connector' = 'print')
    LIKE `hybrid`.`default`.`hybrid_table`
(
    EXCLUDING ALL
);

insert into print_table
select *
from hybrid.`default`.hybrid_table;

参数

Key 默认值 类型 是否必填 描述
connector
String hybrid
tables
String 该混合表包含的真实子表,不允许为空,子表 list 用","分隔,会按此配置顺序读子数据源
{n|table-name}.XXX
String 配置子表with参数,数字{n}对应tables参数中表list的下标,必须按tables参数中配置顺序对应,从1开始;

或者直接填表全名,如'myHive.default.user_action.connector'='XXX'。通过指定表名配置不同表的具体配置,该配置与对应的connector配置一致,请参阅 其它connector的配置
{n|table-name}.fields
String 定义子表schema。不填以 hybrid_table 表的 schema作为子数据源schema。

当子表与hybrid父表的字段名不同时,可以通过这里配置父表与子表的字段映射关系。只允许名称不同,字段类型必须要一致。

如填hive表里的字段名,顺序必须与hybrid表定义中的schema字段配置顺序一致,子表必须包含所有hybrid表的字段。
  • 建表时,hybrid表的库名只能是 default
  • 使用时用户需要保证前几个数据源是 bounded,只有最后一个数据源可以是 unbounded