Metadata配置
更新时间: 2023-04-24 20:24:14
阅读 114
Metahub任务
支持的版本: 插件版本 1.12.4-1.1.4 版本
对于 kafka/cdc format 的场景下,我们可能需要读取kafka 消息的元信息、cdc 格式数据里的元信息的场景下, 我们无法通过表登记来定义这些字段,因此,我们通过 set 语句来补充定义这些字段。
注:这些字段会追加到 流表定义的尾部 ,追加的顺序和流表定义的尾部,追加的顺序和 set 语句的定义的顺序保持一致。
使用语法:
set '{tbl}.metadata.field.expression.{column-name}' = '{expression}';
set '{tbl}.metadata.field.type.{column-name}'' = '{type}';
例:
为流表名为 canal_metadata 添加一个 metadata 字段,读取 canal-json 里的 database 字段,功能与 origin_database STRING METADATA FROM 'value.database' VIRTUAL
相同。
set 'canal_metadata.metadata.field.expression.origin_database' = 'value.database VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_database' = 'String';
完整用例: canal_metadata 流表的定义是:
`id` BIGINT,
`user` STRING,
description STRING,
weight float
-- 1、在canal_metadata 表的尾部定义 metadata 字段。
set 'canal_metadata.value.format' = 'canal-json';
set 'canal_metadata.metadata.field.expression.origin_database' = 'value.database VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_database' = 'String';
set 'canal_metadata.metadata.field.expression.origin_table' = 'value.table VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_table' = 'String';
set 'canal_metadata.metadata.field.expression.origin_sql_type' = 'value.sql-type VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_sql_type' = 'MAP<STRING, INT>';
set 'canal_metadata.metadata.field.expression.origin_pk_names' = 'value.pk-names VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_pk_names' = 'ARRAY<STRING>';
set 'canal_metadata.metadata.field.expression.origin_ts' = 'value.ingestion-timestamp VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_ts' = 'TIMESTAMP(3)';
set 'canal_metadata.metadata.field.expression.origin_event_ts' = 'value.event-timestamp VIRTUAL';
set 'canal_metadata.metadata.field.type.origin_event_ts' = 'TIMESTAMP(3)';
set 'canal_metadata.metadata.field.expression.event_time' = 'timestamp';
set 'canal_metadata.metadata.field.type.event_time' = 'TIMESTAMP(3)';
set 'canal_metadata.metadata.field.expression.partition' = 'VIRTUAL';
set 'canal_metadata.metadata.field.type.partition' = 'bigint';
set 'canal_metadata.metadata.field.expression.offset' = 'VIRTUAL';
set 'canal_metadata.metadata.field.type.offset' = 'bigint';
set 'canal_metadata.properties.group.id' = 'xxxxxx';
set 'canal_metadata.scan.startup.mode' = 'latest-offset';
-- 2、目标表定义。
create table kafkaSink (
`id` BIGINT,
`user` STRING,
description STRING,
weight float,
origin_database STRING,
origin_table STRING,
origin_sql_type MAP<STRING, INT>,
origin_pk_names ARRAY<STRING>,
origin_ts TIMESTAMP,
origin_event_ts TIMESTAMP,
`event_time` TIMESTAMP,
`partition` BIGINT,
`offset` BIGINT,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp', //sink 写 kafka 的 timestamp metadata 字段。
primary key(`id`) not enforced
) with (
'connector' = 'kafka',
'topic' = 'cdc_canal_sink',
'properties.bootstrap.servers' = 'sloth-test2.dg.163.org:39092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'canal-json',
'value.canal-json.fail-on-missing-field' = 'false'
);
-- 3、dml 读写
insert into kafkaSink select *, origin_event_ts as ts from cdc_test.canal_metadata;
文档反馈
以上内容对您是否有帮助?