数据传输支持实时同步任务的创建、管理。

使用须知

当前实时同步支持的数据源类型如下所示:

数据源类型 Reader(读) Writer(写) 支持的版本及认证方式
Kafka 支持 不支持 版本:2.3.1,认证方式:无认证
版本:2.3.1,认证方式:SASL_Plaintext
版本:2.7.1,认证方式:无认证
版本:2.7.1,认证方式:SASL_Plaintext
Hive 不支持 支持 平台内置集群Hive

任务配置流程

1. 在数据传输页面,选择左侧导航栏“实时同步任务”,点击“新建任务”进入任务创建页面。
image.png

2. 首先配置数据来源,在选择数据来源环节配置任务基本信息和选择数据来源。

任务基本信息:
image.png

配置项 描述
任务名称 最多64个字符,仅允许包含中文、字母、数字、“-”和“_”,仅能以中文和字母开头
(同一项目-集群下不允许存在同名任务)
负责人 可选择项目下任意成员,默认为创建任务的用户
引擎版本 当前仅支持FLINK-1.13版本
任务描述 选填,最多可填写128个字符

选择数据来源:
实时同步任务创建 - 图3

配置项 描述
数据源 仅可选择类型为Kafka的当前项目组下数据源
Topic 请输入Topic名称。如需读取多个Topic,请点击“+添加Topic”输入多个需读取的Topic名称
序列化方式 可选择Json、canal-json、debezium-json。
1)如果选择json,解析Topic schema时仅解析首层字段。
示例:
{"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}}
则字段映射中来源Topic字段为:table、op_type、op_ts、current_ts、pos、after。
2)如果选择canal json,解析Topic schema时除data字段外仅解析首层字段,data嵌套字段内则子字段作为来源Topic字段。
示例:
insert
{"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"INSERT"}
update
{"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"DELETE"}
{"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"INSERT"}
delete
{"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"DELETE"}
则字段映射中来源Topic字段为:id、name、age、address、type。
3)如果选择debezium-json,解析Topic schema时除before/after字段外仅解析首层字段,以before/after嵌套字段内的子字段作为来源Topic字段。
示例:
insert
{"before":null,"after":{"id":22,"name":"opt1","age":null},"op":"c"}
update
{"before":{"id":22,"name":"opt1","age":null},"after":null,"op":"d"}
{"before":null,"after":{"id":22,"name":"opt1123","age":null},"op":"c"}
delete
{"before":{"id":22,"name":"opt1123","age":null},"after":null,"op":"d"}
则字段映射中来源Topic字段解析为:id、name、age、op。
格式转换 序列化格式为json时,支持格式转换。
如勾选首层嵌套JSON平铺,会将首层中类型为row或map的嵌套字段解析为row类型并分解为单层结构,字段名称为“首层字段名称.次层字段名称”。
示例Topic:
{ "obj": { "time1": "12:12:43Z", "str": "sfasfafs", "lg": { "name": "1" }, "num": 3 }, "arr": [ { "f1": "f1str11", "f2": 134 }, { "f1": "f1str22", "f2": 555 } ], "map": { "key1": 234, "key2": 456 } }
则字段映射中来源Topic字段解析为:obj.time1、obj.str、obj.lg、num、arr、map.key1、map.key2。
消费起始位点 支持从最新数据、最早数据、指定时间戳开始读取。
如从指定时间戳开始读取,请按“yyyy-MM-dd HH:mm:ss.SSS”格式填写,其中“.SSS”可不填。
注意事项 如勾选首层嵌套JSON平铺,除解析出的“首层字段名称.次层字段名称”的列名外,首层和次层的字段名称不允许再包含“.”。如果首层或次层名称包含“.”,例如,首层字段名称:a.bc,此处字段名称:d.ef,则字段映射时请使用自定义表达式:`a.bc`.`d.ef` as 列名 表示该字段。

3. 点击“下一步”进行数据去向配置。

选择数据去向:
image.png

配置项 描述
数据源 仅可选择类型为Hive的当前项目组下数据源。
导入方式 支持选择导入到单表或导入到多表。(区别为:导入到单表时,直接选择去向库表即可;导入到多表时,会提供设置去向表映射规则功能,批量建立多表间的映射
设置去向表映射规则 去向库名批量映射:此处选中的库名在点击“批量应用映射规则”后,选中的数据库将批量填入到下方的“设置来源Topic和去向表映射”的去向库名。
去向表名映射规则:支持根据内置变量映射去向表名,点击“批量应用映射规则”后批量填入下方“设置来源Topic和去向表映射”处的去向表名。如去向库下不存在表名匹配的表,则无法自动填入,需手动选择。
支持变量如下:
1)${source topic}:表示来源Topic名称
2)${source datasource}:表示来源数据源名称
设置来源Topic与去向表映射 选择各来源Topic需写入的Hive库表
写入分区 如导入表为分区表,需配置写入分区。支持根据数据写入时间分区和根据字段内容动态分区。
1) 按照数据写入时间分区:支持填写内置变量:yyyy、MM、dd、HH 、mm、ss、SSS,分别表示数据写入时间的年、月、日、时、分、秒、毫秒,支持多个内置变量组合使用,如:yyyy-MM-dd,表示数据写入时间的当日,示例:2022-01-02;
2)根据字段内容动态分区:支持选择来源表字段并选择字段类型,会将源端对应字段所在数据行写入到Hive表对应的分区中。示例:选择来源表字段为A,当A字段值为aa时,实时同步会将数据写入到Hive表对应的aa分区中,当A字段值为bb时,实时同步会将数据写入到Hive表对应的bb分区中。
(注意:如写入已有Hive表时,请保证表的分区结构与此处分区结构保持一致,否则任务可能运行失败)
写入规则 支持的写入规则为:insert into(追加)。
注意事项 1)导入到多表时,写入分区取首张表的分区结构,请保持多表的分区结构一致;
2)支持的表格式为:Text、CSV、SequenceFile、ORC、Parquet;
3)由于Flink Hive Writer仅可支持decimal(38,18),故来源Topic的字段类型需与去向表字段类型保持一致,定义为decimal(38,18)。

4. 完成数据去向配置后,点击“下一步”进行字段映射配置,在映射配置中分为全局映射配置和单个Topic映射配置。
image.png

配置项 描述
字段映射全局映射 点击“同名映射”或“同行映射”,则所有Topic-表映射的字段映射均按照此规则去配置字段映射。
同名映射:根据去向表字段名称匹配同名的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。
同行映射:根据去向表字段序号匹配序号相同的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。
来源Topic-去向表 支持根据来源Topic、去向库名、去向表名搜索符合搜索条件的Topic-表映射。
在页面左侧切换Topic-表映射时,可针对选中的Topic-表映射配置字段映射。

对于单个Topic映射配置支持“一键解析Topic字段”、“同名映射”、“同行映射”、“设置自定义表达式字段类型”。
image.png

配置项 描述
一键解析Topic字段 进入字段映射模块时默认会解析各Topic的字段名称和字段类型,点击“一键映射Topic字段”会重新解析一次最新的Topic字段名称和字段类型,在Topic schema更新、首次解析不符合预期等情况下可点击此按钮。
同名映射 根据去向表字段名称匹配同名的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。
同行映射 根据去向表字段序号匹配序号相同的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。
设置自定义表达式字段类型 如果存在自定义表达式引用来源Topic字段且字段映射未指定类型时,请点击“设置自定义表达式字段类型”按钮设置引用字段的类型,否则任务会运行失败。
来源Topic字段 支持选择Topic字段或自定义表达式。选中Topic字段时,支持输入或选择Topic字段名称,如果来源Topic为空,此处支持手动填写字段名称;如果来源Topic非空,此处默认会自动解析Topic首层字段名称。选中自定义表达式时,请按照格式填写:“表达式” as 列名,支持填写常量、Flink函数、内置变量current_timestamp(表示数据写入时间),示例:current_timestamp as 列名。
字段类型 系统支持推断Topic字段的字段类型。选中某个Topic字段时,会展示该字段的推断类型,如与字段实际类型存在差异,可手动调整切换字段类型。

关于字段类型,规则如下:

  • string定义为string
  • int、bigint、tinyint、smallint统一定义为bigint
  • float、double、decimal统一定义为decimal
  • boolean定义为boolean
  • row、map统一定义为row
  • array定义为array
  • time、timestamp、date、byte、varbinary无法解析,统一定义为string

此外,点击复制图标可将来源Topic字段类型批量映射为去向Hive表字段类型,映射规则如下。在推断类型准确性欠佳时可考虑使用此功能。
image.png
其映射规则如下:

来源Topic字段类型 Hive字段类型
int int
tinyint tinyint
smallint smallint
bigint bigint
float float
double double
decimal(38,18) decimal(p,s)
timestamp timestamp
date date
不映射字段类型 char(n)
string varchar(n)
string string
boolean boolean
varbinary binary
map map
array array
不映射字段类型 struct
不映射字段类型 union
说明:数据来源的序列化格式为canal-json、debezium-json、json的时候,支持SQL、ISO_8601、RFC3339三种timestamp格式。 一个任务只支持一类timestamp格式 ,默认为SQL,可通过自定义参数调整:json.timestamp-format.standard= SQL\ISO_8601\RFC3339。

5. 完成字段映射配置后,点击“下一步”进入高级配置页面,该页面支持参数的填写。参数名称填写格式:source/target.参数名称,source.表示数据来源端参数,target.表示数据去向端参数。示例:source.json.timestamp-format.standard。
image.png

参数类型 数据源类型 参数名称 参数说明
数据来源 Kafka source.json.timestamp-format.standard 任务支持的timestamp格式。同一任务仅支持一类timestamp格式,默认值为SQL,可填写值为:SQL、ISO_8601、RFC3339。

6. 配置完成后,点击“保存”按钮即可完成任务配置。