INFO-实时同步相关架构简介

简介
Flink CDC 是基于 Apache Flink 构建的变更数据捕获(Change Data Capture)
用于实时捕获数据库的增量变更(INSERT, UPDATE, DELETE)并将其作为流接入 Flink 作业进行处理
相关模块

一、核心模块
flink-cdc-connectors (核心连接器库)

功能:提供与各种数据库的 CDC 连接器实现。


工作模式:
单表同步:捕获单张表的变更。

整库同步:捕获整个数据库(或指定库)所有表的变更(Flink CDC 2.0+ 支持)。


特性:
自动生成 Flink SQL DDL 语句。

支持 Schema 变更同步(如 MySQL ALTER TABLE)。

简化分库分表合并同步。


debezium-core (底层引擎)

依赖:Flink CDC 多数连接器基于 Debezium 实现。

原理:通过数据库的 Binlog (MySQL)、WAL (PostgreSQL) 等日志实时解析变更事件。


二、核心功能
全量 + 增量一体化同步

自动初始化:首次启动时先做全量快照(Snapshot),再无缝切换至增量 Binlog 读取。

断点续传:依赖 Flink Checkpoint 机制保存读取位置,故障恢复后继续同步。


Exactly-Once 语义

通过 Flink Checkpoint + Binlog 位置持久化 保证数据不重复、不丢失。


Schema 变更自动同步

可捕获源库的 DDL 变更(如新增列),动态更新 Flink 作业的 Table Schema(需启用 scan.startup.mode = latest-offset)。


并行读取优化

分片策略:全量阶段自动拆分表为多个 Split 并行读取,加速初始化。

无锁读取:多数连接器(如 MySQL)无需全局锁表,不影响源库写入。


格式转换

将 Debezium 的 JSON 变更事件转换为 Flink Table 的 RowData 结构,可直接用于计算。


三、适用场景
实时数仓同步
MySQL/Oracle → Kafka/Hudi/Iceberg

微服务解耦
数据库变更 → 通知下游服务(如订单状态更新)

多活数据中心同步
双向数据复制(需解决循环同步问题)

搜索引擎索引更新
数据库变更 → Elasticsearch/OpenSearch


四、flink cdc jar开发:

1. Flink SQL 示例(MySQL  Flink
sql
-- 创建 MySQL CDC 
CREATE TABLE mysql_source (
    id INT,
    name STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'user',
    'password' = 'pass',
    'database-name' = 'test_db',
    'table-name' = 'user_table'
);

-- 创建 Kafka Sink 
CREATE TABLE kafka_sink (
    id INT,
    name STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_events',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

-- 将变更数据写入 Kafka
INSERT INTO kafka_sink SELECT * FROM mysql_source;

2. DataStream API 示例
java
SourceFunction<SourceRecord> source = MySQLSource.<SourceRecord>builder()
    .hostname("localhost")
    .port(3306)
    .username("user")
    .password("pass")
    .databaseList("test_db")
    .tableList("test_db.user_table")
    .deserializer(new JsonDebeziumDeserializationSchema())
    .build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.addSource(source, "MySQL CDC Source")
    .print();
env.execute();


五、注意事项  
数据库权限:需开启 Binlog 并授予复制权限(如 MySQL  REPLICATION SLAVE, REPLICATION CLIENT)。  

性能影响:增量读取对源库压力较小,但全量阶段可能触发 I/O 峰值(建议分库分表并行)。  

事务一致性:Binlog 延迟可能导致短暂数据延迟(通常 < 1s)。  

DDL 兼容性:Schema 变更同步对复杂 DDL(如修改主键)支持有限。

作者:华柄印