DEMO-实时开发通过SQL引用CDC插件实现实时计算
更新时间: 2026-01-22 22:05:02
阅读 3
INFO-实时开发通过SQL引用CDC插件实现实时计算
使用场景
支持用户通过SQL依赖指定cdc插件进行实时任务开发;使用示例
一、postgres数据库准备
1. 设置数据库并开启逻辑复制,并设置max_wal_senders和max_replication_slots参数,并重启postgres数据库让参数生效;-- 开启逻辑复制
alter system set wal_level = logical;
-- 设置wal_sender为20(推荐值)
alter system set max_wal_senders = 20;
-- 设置slot数量为20(推荐值)
alter system set max_replication_slots = 20;
2. 创建cdc用户并授予权限,需要授予权限包括REPLICATION、CREATE ON DATABASE、SELECT权限;create role demo_admin nologin;
-- 创建用户
create user dsc_demo with password 'xxx' replication login;
-- 将角色授予用户
grant demo_admin to dsc_demo;
-- 为角色授权
grant usage on schema public to demo_admin;
grant create on database dsc_demo to demo_admin;
grant select,insert,update,delete on all tables in schema public to demo_admin;
-- 创建PUBLICATION
create publication cdc_pg_demo for table public.pg_demo_src;
-- 需为cdc_pg_demo增加表时
alter publication cdc_pg_demo add table public.pg_demo_sink;
3.创建数据库及表,并设置表的复制格式为full;-- 创建pg src表
create table pg_demo_src (
id INT PRIMARY KEY,
name varchar(20),
event_time TIMESTAMP
);
--创建sink表
create table pg_demo_sink (
id INT PRIMARY KEY,
name varchar(20),
event_time TIMESTAMP
);
--设置表的复制格式为full
alter table pg_demo_src replica identity full;
-- 创建PUBLICATION
create publication cdc_pg_demo for table public.pg_demo_src;
-- 需为cdc_pg_demo增加表时
alter publication cdc_pg_demo add table public.pg_demo_sink;
二、实时开发准备
1. CDC插件下载:https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc/2.4.22. 插件上传到ED平台,上传路径为: 实时开发-->文件管理 --> 上传文件,详见下方截图
3. 创建SQL任务,在右侧依赖配置中选中上传到cdc插件;
--SQL
--********************************************************************--
--Author: Simon
--CreateTime: 2026-01-22 17:35:22
--Comment: SQL+CDC插件包直接实时消费源端数据库日志,用于当前未支持数据源场景
--********************************************************************--
-- 映射pg源端表,指定实时消费数据来源表
CREATE TABLE flink_demo_src (
id INT,
name STRING,
event_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'changelog-mode' = 'all',
'hostname' = 'dsc-demo18.jdlt.163.org',
'port' = '55432',
'username' = 'dsc_demo',
'password' = '加密内容1',
'database-name' = 'dsc_demo',
'schema-name' = 'public',
'table-name' = 'pg_demo_src',
'decoding.plugin.name' = 'pgoutput',
'slot.name' = 'flink_pg_cdc_01',
'scan.incremental.snapshot.enabled' = 'true',
'scan.incremental.snapshot.chunk.size' = '8096',
'scan.snapshot.fetch.size' = '1024',
'debezium.publication.name' = 'cdc_pg_demo',
'debezium.publication.autocreate.mode' = 'disabled'
);
-- 映射pg去向表,消费数据后写入的目标表
CREATE TABLE flink_demo_sink (
id INT,
name STRING,
event_time TIMESTAMP,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://dsc-demo18.jdlt.163.org:55432/dsc_demo',
'table-name' = 'public.pg_demo_sink',
'username' = 'dsc_demo',
'password' = '加密内容2'
);
insert into flink_demo_sink
select * from flink_demo_src;
5. 保存并提交上线后,进入运维界面启动任务即可;

作者:Simon
文档反馈
以上内容对您是否有帮助?