实时同步任务创建
更新时间: 2023-06-06 14:00:12
阅读 182
数据传输支持实时同步任务的创建、管理。
使用须知
当前实时同步支持的数据源类型如下所示:
数据源类型 | Reader(读) | Writer(写) | 支持的版本及认证方式 |
---|---|---|---|
Kafka | 支持 | 不支持 | 版本:2.3.1,认证方式:无认证 版本:2.3.1,认证方式:SASL_Plaintext 版本:2.7.1,认证方式:无认证 版本:2.7.1,认证方式:SASL_Plaintext |
Hive | 不支持 | 支持 | 平台内置集群Hive |
任务配置流程
1. 在数据传输页面,选择左侧导航栏“实时同步任务”,点击“新建任务”进入任务创建页面。
2. 首先配置数据来源,在选择数据来源环节配置任务基本信息和选择数据来源。
任务基本信息:
配置项 | 描述 |
---|---|
任务名称 | 最多64个字符,仅允许包含中文、字母、数字、“-”和“_”,仅能以中文和字母开头 (同一项目-集群下不允许存在同名任务) |
负责人 | 可选择项目下任意成员,默认为创建任务的用户 |
引擎版本 | 当前仅支持FLINK-1.13版本 |
任务描述 | 选填,最多可填写128个字符 |
选择数据来源:
配置项 | 描述 |
---|---|
数据源 | 仅可选择类型为Kafka的当前项目组下数据源 |
Topic | 请输入Topic名称。如需读取多个Topic,请点击“+添加Topic”输入多个需读取的Topic名称 |
序列化方式 | 可选择Json、canal-json、debezium-json。 1)如果选择json,解析Topic schema时仅解析首层字段。 示例: {"table":"OGG_TEST.ORDERS","op_type":"I","op_ts":"2020-05-29 10:01:27.000067","current_ts":"2020-05-29T18:01:33.062000","pos":"00000002790849451348","after":{"OID":"1","PID":"2","NUM":"3"}} 则字段映射中来源Topic字段为:table、op_type、op_ts、current_ts、pos、after。 2)如果选择canal json,解析Topic schema时除data字段外仅解析首层字段,data嵌套字段内则子字段作为来源Topic字段。 示例: insert {"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"INSERT"} update {"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"DELETE"} {"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"INSERT"} delete {"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"DELETE"} 则字段映射中来源Topic字段为:id、name、age、address、type。 3)如果选择debezium-json,解析Topic schema时除before/after字段外仅解析首层字段,以before/after嵌套字段内的子字段作为来源Topic字段。 示例: insert {"before":null,"after":{"id":22,"name":"opt1","age":null},"op":"c"} update {"before":{"id":22,"name":"opt1","age":null},"after":null,"op":"d"} {"before":null,"after":{"id":22,"name":"opt1123","age":null},"op":"c"} delete {"before":{"id":22,"name":"opt1123","age":null},"after":null,"op":"d"} 则字段映射中来源Topic字段解析为:id、name、age、op。 |
格式转换 | 序列化格式为json时,支持格式转换。 如勾选首层嵌套JSON平铺,会将首层中类型为row或map的嵌套字段解析为row类型并分解为单层结构,字段名称为“首层字段名称.次层字段名称”。 示例Topic: { "obj": { "time1": "12:12:43Z", "str": "sfasfafs", "lg": { "name": "1" }, "num": 3 }, "arr": [ { "f1": "f1str11", "f2": 134 }, { "f1": "f1str22", "f2": 555 } ], "map": { "key1": 234, "key2": 456 } } 则字段映射中来源Topic字段解析为:obj.time1、obj.str、obj.lg、num、arr、map.key1、map.key2。 |
消费起始位点 | 支持从最新数据、最早数据、指定时间戳开始读取。 如从指定时间戳开始读取,请按“yyyy-MM-dd HH:mm:ss.SSS”格式填写,其中“.SSS”可不填。 |
注意事项 | 如勾选首层嵌套JSON平铺,除解析出的“首层字段名称.次层字段名称”的列名外,首层和次层的字段名称不允许再包含“.”。如果首层或次层名称包含“.”,例如,首层字段名称:a.bc,此处字段名称:d.ef,则字段映射时请使用自定义表达式:`a.bc`.`d.ef` as 列名 表示该字段。 |
3. 点击“下一步”进行数据去向配置。
选择数据去向:
配置项 | 描述 |
---|---|
数据源 | 仅可选择类型为Hive的当前项目组下数据源。 |
导入方式 | 支持选择导入到单表或导入到多表。(区别为:导入到单表时,直接选择去向库表即可;导入到多表时,会提供设置去向表映射规则功能,批量建立多表间的映射 |
设置去向表映射规则 | 去向库名批量映射:此处选中的库名在点击“批量应用映射规则”后,选中的数据库将批量填入到下方的“设置来源Topic和去向表映射”的去向库名。 去向表名映射规则:支持根据内置变量映射去向表名,点击“批量应用映射规则”后批量填入下方“设置来源Topic和去向表映射”处的去向表名。如去向库下不存在表名匹配的表,则无法自动填入,需手动选择。 支持变量如下: 1)${source topic}:表示来源Topic名称 2)${source datasource}:表示来源数据源名称 |
设置来源Topic与去向表映射 | 选择各来源Topic需写入的Hive库表 |
写入分区 | 如导入表为分区表,需配置写入分区。支持根据数据写入时间分区和根据字段内容动态分区。 1) 按照数据写入时间分区:支持填写内置变量:yyyy、MM、dd、HH 、mm、ss、SSS,分别表示数据写入时间的年、月、日、时、分、秒、毫秒,支持多个内置变量组合使用,如:yyyy-MM-dd,表示数据写入时间的当日,示例:2022-01-02; 2)根据字段内容动态分区:支持选择来源表字段并选择字段类型,会将源端对应字段所在数据行写入到Hive表对应的分区中。示例:选择来源表字段为A,当A字段值为aa时,实时同步会将数据写入到Hive表对应的aa分区中,当A字段值为bb时,实时同步会将数据写入到Hive表对应的bb分区中。 (注意:如写入已有Hive表时,请保证表的分区结构与此处分区结构保持一致,否则任务可能运行失败) |
写入规则 | 支持的写入规则为:insert into(追加)。 |
注意事项 | 1)导入到多表时,写入分区取首张表的分区结构,请保持多表的分区结构一致; 2)支持的表格式为:Text、CSV、SequenceFile、ORC、Parquet; 3)由于Flink Hive Writer仅可支持decimal(38,18),故来源Topic的字段类型需与去向表字段类型保持一致,定义为decimal(38,18)。 |
4. 完成数据去向配置后,点击“下一步”进行字段映射配置,在映射配置中分为全局映射配置和单个Topic映射配置。
配置项 | 描述 |
---|---|
字段映射全局映射 | 点击“同名映射”或“同行映射”,则所有Topic-表映射的字段映射均按照此规则去配置字段映射。 同名映射:根据去向表字段名称匹配同名的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。 同行映射:根据去向表字段序号匹配序号相同的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。 |
来源Topic-去向表 | 支持根据来源Topic、去向库名、去向表名搜索符合搜索条件的Topic-表映射。 在页面左侧切换Topic-表映射时,可针对选中的Topic-表映射配置字段映射。 |
对于单个Topic映射配置支持“一键解析Topic字段”、“同名映射”、“同行映射”、“设置自定义表达式字段类型”。
配置项 | 描述 |
---|---|
一键解析Topic字段 | 进入字段映射模块时默认会解析各Topic的字段名称和字段类型,点击“一键映射Topic字段”会重新解析一次最新的Topic字段名称和字段类型,在Topic schema更新、首次解析不符合预期等情况下可点击此按钮。 |
同名映射 | 根据去向表字段名称匹配同名的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。 |
同行映射 | 根据去向表字段序号匹配序号相同的来源Topic字段名称,若无法匹配则来源Topic字段置为自定义表达式。 |
设置自定义表达式字段类型 | 如果存在自定义表达式引用来源Topic字段且字段映射未指定类型时,请点击“设置自定义表达式字段类型”按钮设置引用字段的类型,否则任务会运行失败。 |
来源Topic字段 | 支持选择Topic字段或自定义表达式。选中Topic字段时,支持输入或选择Topic字段名称,如果来源Topic为空,此处支持手动填写字段名称;如果来源Topic非空,此处默认会自动解析Topic首层字段名称。选中自定义表达式时,请按照格式填写:“表达式” as 列名,支持填写常量、Flink函数、内置变量current_timestamp(表示数据写入时间),示例:current_timestamp as 列名。 |
字段类型 | 系统支持推断Topic字段的字段类型。选中某个Topic字段时,会展示该字段的推断类型,如与字段实际类型存在差异,可手动调整切换字段类型。 |
关于字段类型,规则如下:
- string定义为string
- int、bigint、tinyint、smallint统一定义为bigint
- float、double、decimal统一定义为decimal
- boolean定义为boolean
- row、map统一定义为row
- array定义为array
- time、timestamp、date、byte、varbinary无法解析,统一定义为string
此外,点击复制图标可将来源Topic字段类型批量映射为去向Hive表字段类型,映射规则如下。在推断类型准确性欠佳时可考虑使用此功能。
其映射规则如下:
来源Topic字段类型 | Hive字段类型 |
---|---|
int | int |
tinyint | tinyint |
smallint | smallint |
bigint | bigint |
float | float |
double | double |
decimal(38,18) | decimal(p,s) |
timestamp | timestamp |
date | date |
不映射字段类型 | char(n) |
string | varchar(n) |
string | string |
boolean | boolean |
varbinary | binary |
map | map |
array | array |
不映射字段类型 | struct |
不映射字段类型 | union |
说明:数据来源的序列化格式为canal-json、debezium-json、json的时候,支持SQL、ISO_8601、RFC3339三种timestamp格式。 一个任务只支持一类timestamp格式 ,默认为SQL,可通过自定义参数调整:json.timestamp-format.standard= SQL\ISO_8601\RFC3339。 |
5. 完成字段映射配置后,点击“下一步”进入高级配置页面,该页面支持参数的填写。参数名称填写格式:source/target.参数名称,source.表示数据来源端参数,target.表示数据去向端参数。示例:source.json.timestamp-format.standard。
参数类型 | 数据源类型 | 参数名称 | 参数说明 |
---|---|---|---|
数据来源 | Kafka | source.json.timestamp-format.standard | 任务支持的timestamp格式。同一任务仅支持一类timestamp格式,默认值为SQL,可填写值为:SQL、ISO_8601、RFC3339。 |
6. 配置完成后,点击“保存”按钮即可完成任务配置。
文档反馈
以上内容对您是否有帮助?