INFO-实时开发通过SQL引用CDC插件实现实时计算

使用场景
支持用户通过SQL依赖指定cdc插件进行实时任务开发;
使用示例
一、postgres数据库准备
1. 设置数据库并开启逻辑复制,并设置max_wal_senders和max_replication_slots参数,并重启postgres数据库让参数生效;
-- 开启逻辑复制
alter system set wal_level = logical;  
-- 设置wal_sender20(推荐值)
alter system set max_wal_senders = 20;  
-- 设置slot数量为20(推荐值)
alter system set max_replication_slots = 20;
2. 创建cdc用户并授予权限,需要授予权限包括REPLICATION、CREATE ON DATABASE、SELECT权限;
create role demo_admin nologin;  
-- 创建用户
create user dsc_demo with password 'xxx' replication login;
-- 将角色授予用户
grant demo_admin to dsc_demo;
-- 为角色授权
grant usage on schema  public to demo_admin;
grant create on database dsc_demo to demo_admin;
grant select,insert,update,delete on all tables in schema  public to demo_admin;
-- 创建PUBLICATION
create publication cdc_pg_demo for table public.pg_demo_src;
-- 需为cdc_pg_demo增加表时
alter publication cdc_pg_demo add table public.pg_demo_sink;
3.创建数据库及表,并设置表的复制格式为full;
-- 创建pg src
create table pg_demo_src (
                             id INT PRIMARY KEY,
                             name varchar(20),
                             event_time TIMESTAMP
);

--创建sink
create table pg_demo_sink (
                             id INT PRIMARY KEY,
                             name varchar(20),
                             event_time TIMESTAMP
);

--设置表的复制格式为full
alter table   pg_demo_src   replica identity full; 
-- 创建PUBLICATION
create publication cdc_pg_demo for table public.pg_demo_src;
-- 需为cdc_pg_demo增加表时
alter publication cdc_pg_demo add table public.pg_demo_sink;
二、实时开发准备
1. CDC插件下载:https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/2.4.2
2. 插件上传到ED平台,上传路径为: 实时开发-->文件管理 --> 上传文件,详见下方截图
3. 创建SQL任务,在右侧依赖配置中选中上传到cdc插件;

图一

4. SQL开发,详见下方代码

DEMO-实时开发通过SQL引用CDC插件实现实时计算 - 图2

--SQL
--********************************************************************--
--Author: Simon
--CreateTime: 2026-01-22 17:35:22
--Comment: SQL+CDC插件包直接实时消费源端数据库日志,用于当前未支持数据源场景
--********************************************************************--

-- 映射pg源端表,指定实时消费数据来源表
CREATE TABLE flink_demo_src (
  id INT,
  name STRING,
  event_time TIMESTAMP,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'changelog-mode' = 'all',
  'hostname' = 'dsc-demo18.jdlt.163.org',
  'port' = '55432',
  'username' = 'dsc_demo',
  'password' = '加密内容1',
  'database-name' = 'dsc_demo',
  'schema-name' = 'public',
  'table-name' = 'pg_demo_src',
  'decoding.plugin.name' = 'pgoutput',
  'slot.name' = 'flink_pg_cdc_01', 
  'scan.incremental.snapshot.enabled' = 'true',
  'scan.incremental.snapshot.chunk.size' = '8096',
  'scan.snapshot.fetch.size' = '1024',
  'debezium.publication.name' = 'cdc_pg_demo', 
  'debezium.publication.autocreate.mode' = 'disabled'
);

-- 映射pg去向表,消费数据后写入的目标表
CREATE TABLE flink_demo_sink (
  id INT,
  name STRING,
  event_time TIMESTAMP,
   PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc', 
  'url' = 'jdbc:postgresql://dsc-demo18.jdlt.163.org:55432/dsc_demo', 
  'table-name' = 'public.pg_demo_sink',  
  'username' = 'dsc_demo',
  'password' = '加密内容2'
);

insert into flink_demo_sink
select * from flink_demo_src;
5. 保存并提交上线后,进入运维界面启动任务即可;

DEMO-实时开发通过SQL引用CDC插件实现实时计算 - 图3

DEMO-实时开发通过SQL引用CDC插件实现实时计算 - 图4


作者:Simon