CDC 任务数据源配置说明

数据源自定义参数配置

数据源类型 作用域 自定义参数配置说明
MySQL/Oracle/SQLServer/PostgreSQL/db2 source 基于flink-cdc datastream api实现,自定义参数可以填写对应数据源的debezium相关配置参数
kafka source 基于flink kafka datastream api实现,自定义参数可以填写kafka官方原生的参数配置
MySQL/Oracle/SQLServer sink 基于flink sql jdbc连接器实现,自定义参数可以填写flink sql jdbc连接器配置参数
kafka sink 基于flink kafka datastream api实现,自定义参数可以填写kafka官方原生的配置参数
流表 sink 基于flink kafka sql api实现,自定义参数可以参考flink sql kafka连接器的配置参数

作用域 source 是指来源端数据源,sink是指去向端数据源

额外新增自定义参数说明

自定义参数名称 默认值 支持版本 作用域 数据源类型 描述
useSid true 2.0.0+ source Oracle true表示oracle使用sid,false表示oracle使用service name
server-time-zone utc 2.0.0+ source MySQL mysql的timestamp字段类型存储是的UTC时间,会出现时区问题,需要配置为'Asia/Shanghai'
fields.convert-tinyint-one-boolean.enabled true 2.1.5+ source/sink MySQL true时候表示mysql的tinyint(1)类型转为flink的boolean类型, false时候表示mysql的tinyint(1)类型转为flink的tinyint类型
json.ignore-parse-errors false 2.1.7+ source kafka以及增量读取方式为ogg的Oracle、SQLServer 默认false不忽略kafka消息解析错误并抛出异常终止程序异常信息中带有消息明细,设置为true会忽略该消息
json.parse-errors.print false 2.1.7+ source kafka以及增量读取方式为ogg的Oracle、SQLServer false表示kafka消息出现解析异常被忽略的时候不会打印消息日志,设置为true息出现解析异常被忽略的时候会打印该消息日志级别为warn
json.ignore-key-null false 2.1.7+ source 增量读取方式为ogg的Oracle、SQLServer 默认false不忽略kafka消息中主键key为null并会抛出异常并打印主键为null的消息,设置为true会忽略该消息
json.key-null.print false 2.1.7+ source 增量读取方式为ogg的Oracle、SQLServer 默认false不忽略kafka消息中主键key为null并会抛出异常并打印主键为null的消息,设置为true会忽略该消息
database.schema.ignore-unique-key false 2.1.7+ source Oracle 默认false任务初始化schema时候不忽略唯一索引作为key,为true的时候忽略唯一索引作为key,解决oracle表无主键且联合唯一索引首个字段为函数索引报错问题。
库名.表名.primary.key 2.1.7+ sink MySQL/Oracle/SQLServer 例如schema.table.primary.key=a,b,c 用于定义目标端逻辑主键,目标端表无主键且只有唯一索引可以通过自定义参数用唯一索引列作为逻辑主键
json.timestamp-format.standard RFC3339 2.1.8+ sink kafka timestamp类型字段JSON序列化方式,默认RFC3339可选项SQL\ISO_8601\RFC3339,例如RFC3339序列化为"2022-01-10T10:30:41Z",SQL序列化为"2022-01-10 10:30:41",ISO_8601序列化为"2022-01-10T10:30:41"
json.key.is-table-name true 2.1.9+ source oracle ture表示ogg发往kafka中的key为表名称,false表示可以为非表名称
json.fields.event-ts.enabled false 2.1.9+ source mysql/oracle 默认false表示不包含事件时间,true表示包含事件时间
fields.type.auto-conversion.enabled false 2.1.9+ source all 默认false表示不自动将source字段转换成sink字段类型, true表示自动转换字段类型
ogg-json.downstream-key.is-table-name-pk false 2.1.12+ source Oracle/SQLServer 源端为oggjson 目标端为kakfa时候,是否指定目标端kafka的key为表名称+主键,false表示不是,true表示是

其他补充说明

流表选择

source为"kafka"或则sink端为"流表"表示流表,流表的列表下拉框中定义的流表名称仅仅显示canal-json、debezium-json、maxwell-json三种序列化类型,使用的时候需要注意

kafka 数据源作为cdc 目标表时

kafka 对应的表序列化方式只支持选择debezium-json、canal-json、 maxwell-json这3种之一。 这3种不同的序列化方式输出的数据结构各不相同。具体如下,请根据数据使用情况进行选择。

  • debezium-json

    insert

    {"before":null,"after":{"id":22,"name":"opt1","age":null},"op":"c"}

    update

    {"before":{"id":22,"name":"opt1","age":null},"after":null,"op":"d"}
    {"before":null,"after":{"id":22,"name":"opt1123","age":null},"op":"c"}

    delete

    {"before":{"id":22,"name":"opt1123","age":null},"after":null,"op":"d"}
  • canal-json

    insert

    {"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"INSERT"}

    update

    {"data":[{"id":22,"name":"opt","age":123,"address":"北京市"}],"type":"DELETE"}
    {"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"INSERT"}

    delete

    {"data":[{"id":22,"name":"opt11","age":123,"address":"北京市"}],"type":"DELETE"}
  • maxwell-json

    insert

    {"data":{"id":6,"salary":88.8,"gmt_created":"2015-01-01 11:15:10"},"type":"insert"}

    update

    {"data":{"id":6,"salary":88.8,"gmt_created":"2015-01-01 11:15:10"},"type":"delete"}
    {"data":{"id":6,"salary":111.8,"gmt_created":"2015-01-01 11:15:10"},"type":"insert"}

    delete

    {"data":{"id":6,"salary":111.8,"gmt_created":"2015-01-01 11:15:10"},"type":"delete"}
MySQL 作为cdc 源表
  • mysql 表字段timestamp 类型时区问题

如果出现时区问题需要在自定义参数里 额外增加参数'server-time-zone' = 'Asia/Shanghai'