CDC任务开发
CDC 任务是Easystream用于从不同的数据源库摄取数据变更作业的任务类型。目前支持页面向导模式配置,底层整合NDC订阅能力基于Flink CDC 2.0版本研发,支持多种异构数据源的变更数据捕获,同时也支持历史数据的全量导入。
使用限制
- 引擎版本:Flink 1.13。
- 在LTS7.0-Update1.1版本(对应实时计算-v3.9.17版本)更新后, CDC 任务中源端为 MySQL、Oracle、Kafka,目标端为Kafka的创建入口关闭,用户需前往数据传输-实时同步任务模块创建对应数据源类型的任务。
操作步骤
1. 创建CDC作业
- 登录 EasyStream实时计算平台 。
- 快捷导航栏 选择 实时开发 ,若 快捷导航栏 中无相关选项可参考 快速导航栏 进行操作。
- 切换至目标 工作空间 。
- 单击 新建CDC任务 ,在弹出窗口中填写任务配置信息。
- 单击 确定。
2.配置Source
以下配置操作将以MySQl为例,其他数据源类型操作过程相似,具体配置不同可参考 CDC任务数据源配置说明 。
选择CDC任务Source端右上角,单击下拉列表,切换CDC Source端数据源类型。
根据表单内容完成 CDC Source 部分配置。
表单参数 | 说明 | |
---|---|---|
查找库表 | - 库表选择:通过手动方式选择采集库表,支持多选; - 正则匹配:通过输入正则表达式匹配指定数据源和数据库下的批量数据表。 |
|
订阅类型 | 支持采集 插入(Insert)、更新(Update)、删除(Delete) 操作数据,默认全部勾选。 |
|
增量读取方式 | - 日志读取:CDC任务默认采用日志采集的方式进行实时数据采集,根据数据源类型不同各有差异,比如MySQL binlog日志,Oracle Logminer日志等。 - 间隔轮询:支持用户对间隔轮询周期、数据量进行设置从而控制采集速度,支持手动设定间隔轮询传输位点。 |
|
传输起始位点 | - 最新数据:当任务启动时开始进行数据捕获。 - 全量初始化:先全量读取表数据,读取完成后从自动切换任务,开始从任务启动时间记录的点位进行变更数据捕获。
|
|
流量限制 | 默认不限制,如果源端库存在业务压力,可以配置适当限制,缓解源库压力。 | |
读取并行度 | 除 MySQL、Kafka 数据源外,其余数据源目前均为单并行度读取数据(包括全量初始化过程)。 |
3. 配置sink
单击sink页面右上角下拉框选择相应的数据源,以mysql 为例。 在此之前必须提前在数据源管理处引入数据源,kafka数据源引入后,可以在数据库创建数仓表,引入时直接选择topic 对应的数仓表即可。详情请参考数仓管理
- 目标数据源
选择需要导入的sql server 数据源名称
- 导入方式
支持单表和多表导入,选择单表导入只需要选择相应的库表即可,多表导入,需要为源端的每一张表选择对应的目标端库表,或者通过“批量设置”进行配置,目前批量设置只支持导入到同一个库表中,如果是分库分表,在批量应用后,还需要手动修改对应库表。
- 写入方式
支持“insert”和“upsert”更新写入,注意更新写入必须配置主键,否则相当于“insert”
- 异常处理
为了防止脏数据导致任务失败,提供了忽略错误记录的选项,可以根据实际需求进行配置
- 批量写入
对数据进行分批插入,建议开启,同时配置批量的写入规则
- 写入并行度
当前所有sink 端数据源默认均支持多并行度写入,可以按需求调整作业运行配置的并行数量
![](/documents/uploads/projects/easystream-v4.8.0/202201/16c8cad20d3dee00.png)
字段映射
CDC 作业会根据sink 和souce 的配置生成对应的字段映射关系。默认按照字段名称映射,如果源端与目标表字段名称不一致,源端字段默认为不导入。如果不符合按照字段名称映射的需求,需要手动调整字段映射关系,否则可能出现字段未同步情况。 如果有需求对输出字段进行调整,可以在源端表字段进行手动指定。新增了不导入字段以及自定义表达式,支持在同步过程中对数据进行转换。- 不导入
选择不导入,该字段内容不会写入结果表,结果表字段输出为字段默认值或null - 自定义表达式
CDC 作业增加了自定义表达式功能,可以对字段使用函数进行数据转换。目前支持Flink 的标量(一进一出)内置函数,暂时不支持自定义函数。配置方式选择自定义表达式,书写函数即可。
如图:
- 不导入
运行配置检查
CDC 作业实际运行的是Flink 作业,可以按需对任务资源进行调整配置,以达到期望的性能要求。 作业资源配置可参考Flink 作业推荐配置保存并启动
OGG配置
若企业已有OGG(Oracle GoldenGate)产品,平台支持基于OGG实现对应实时采集链路。此方案要求业务方主动将OGG采集任务启动并写入到目标Kafka Topic,Flink CDC 将会从 Kafka中以OGG标准格式对数据内容进行解析,并转写至目标端。
在CDC任务中,选择Source端为Oracle时,用户选择增量读取方式为OGG
。
此时需要选择一个已经注册在项目中的消息中间件(目前仅支持Kafka)作为数据来源。手动输入Topic名称时,当输入已存在的Topic名称时任务可正常执行;若输入的Topic不存在,系统将自动创建Topic并执行任务。Topic输入名称对大小写敏感。
以上内容对您是否有帮助?