Metahub任务

支持的版本插件版本 1.12.4-1.1.4 版本

对于 kafka/cdc format 的场景下,我们可能需要读取kafka 消息的元信息、cdc 格式数据里的元信息的场景下, 我们无法通过表登记来定义这些字段,因此,我们通过 set 语句来补充定义这些字段。

:这些字段会追加到 流表定义的尾部 ,追加的顺序和流表定义的尾部,追加的顺序和 set 语句的定义的顺序保持一致。

使用语法:

set '{tbl}.metadata.field.expression.{column-name}' = '{expression}';
set '{tbl}.metadata.field.type.{column-name}'' = '{type}';

: 为流表名为 canal_metadata 添加一个 metadata 字段,读取 canal-json 里的 database 字段,功能与 origin_database STRING METADATA FROM 'value.database' VIRTUAL 相同。

set 'canal_metadata.metadata.field.expression.origin_database' = 'value.database VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_database' = 'String';

完整用例: canal_metadata 流表的定义是:

`id` BIGINT,
 `user` STRING,
  description STRING,
  weight float
--  1、在canal_metadata 表的尾部定义 metadata 字段。
set 'canal_metadata.value.format' = 'canal-json';
set 'canal_metadata.metadata.field.expression.origin_database' = 'value.database VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_database' = 'String';

set 'canal_metadata.metadata.field.expression.origin_table' = 'value.table VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_table' = 'String';

set 'canal_metadata.metadata.field.expression.origin_sql_type' = 'value.sql-type VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_sql_type' = 'MAP<STRING, INT>';

set 'canal_metadata.metadata.field.expression.origin_pk_names' = 'value.pk-names VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_pk_names' = 'ARRAY<STRING>';

set 'canal_metadata.metadata.field.expression.origin_ts' = 'value.ingestion-timestamp VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_ts' = 'TIMESTAMP(3)';

set 'canal_metadata.metadata.field.expression.origin_event_ts' = 'value.event-timestamp VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_event_ts' = 'TIMESTAMP(3)';

set 'canal_metadata.metadata.field.expression.event_time' = 'timestamp';
set 'canal_metadata.metadata.field.type.event_time' = 'TIMESTAMP(3)';

set 'canal_metadata.metadata.field.expression.partition' = 'VIRTUAL';
set 'canal_metadata.metadata.field.type.partition' = 'bigint';

set 'canal_metadata.metadata.field.expression.offset' = 'VIRTUAL';
set 'canal_metadata.metadata.field.type.offset' = 'bigint';

set 'canal_metadata.properties.group.id' = 'xxxxxx';
set 'canal_metadata.scan.startup.mode' = 'latest-offset';

--  2、目标表定义。

create table kafkaSink (
  `id` BIGINT,
  `user` STRING,
  description STRING,
  weight float,
  origin_database STRING,
  origin_table STRING,
  origin_sql_type MAP<STRING, INT>,
  origin_pk_names ARRAY<STRING>,
  origin_ts TIMESTAMP,
  origin_event_ts TIMESTAMP,
  `event_time` TIMESTAMP,
    `partition` BIGINT,
    `offset` BIGINT,
    `ts` TIMESTAMP(3) METADATA FROM 'timestamp',  //sink 写 kafka 的 timestamp metadata 字段。
  primary key(`id`) not enforced
) with (
  'connector' = 'kafka',
  'topic' = 'cdc_canal_sink',
  'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:39092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'value.format' = 'canal-json',
  'value.canal-json.fail-on-missing-field' = 'false'
);

--  3dml 读写
insert into  kafkaSink select *, origin_event_ts as ts from cdc_test.canal_metadata;