Flink-1.14-cdc

基于flink1.14、flink-cdc2.4.1版本,支持mysql-cdc、oracle-cdc、sqlserver-cdc、tidb-cdc

需要特别关注:oracle技术栈是闭源的,cdc连接器对其支持是非常有限的,不保证线上可用性,仅可作测试使用!!!

mysql-cdc

示例1(原生sql)

CREATE TABLE sink (
    test_int INT,
  test_varchar VARCHAR,
  test_boolean BOOLEAN,
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_bigint BIGINT,
  test_float FLOAT,
  test_double DOUBLE,
  PRIMARY KEY(test_int) NOT ENFORCED
) WITH (
    'connector' = 'jdbc', -- required: specify this table type is jdbc
    'url' = 'jdbc:mysql://localhost:3306/qa_slothdata', -- required: JDBC DB url
    'table-name' = 'alltype1',  -- required: jdbc table name
    'username' = 'root', -- required: jdbc user name and password
    'password' = 'root'
);

CREATE TABLE source (
  test_int INT,
  test_varchar VARCHAR,
  test_boolean BOOLEAN,
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_bigint BIGINT,
  test_float FLOAT,
--  testDecimal DECIMAL(38, 18),
  test_double DOUBLE,
  PRIMARY KEY(test_int) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = 'localhost',
    'port' = '3306',
    'database-name' = 'qa_slothdata',
    'table-name' = 'alltype',  -- required: jdbc table name
    'username' = 'root', -- required: jdbc user name and password
    'password' = 'root'
);

INSERT INTO sink select * FROM source;

示例1参数


Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'mysql-cdc'.
hostname required (none) String IP address or hostname of the MySQL database server.
username required (none) String Name of the MySQL database to use when connecting to the MySQL database server.
password required (none) String Password to use when connecting to the MySQL database server.
database-name required (none) String Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression.
table-name required (none) String Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression.
port optional 3306 Integer Integer port number of the MySQL database server.
server-id optional (none) Integer A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
scan.incremental.snapshot.enabled optional true Boolean Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including: (1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism. Please see Incremental Snapshot Readingsection for more detailed information.
scan.incremental.snapshot.chunk.size optional 8096 Integer The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table.
scan.snapshot.fetch.size optional 1024 Integer The maximum fetch size for per poll when read table snapshot.
scan.startup.mode optional initial String Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information.
server-time-zone optional UTC String The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more here.
debezium.min.row. count.to.stream.result optional 1000 Integer
During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot.
connect.timeout optional 30s Duration The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out.
connect.max-retries optional 3 Integer The max retry times that the connector should retry to build MySQL database server connection.
connection.pool.size optional 20 Integer The connection pool size.
jdbc.properties. optional 20 String Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'.
heartbeat.interval optional 30s Duration The interval of sending heartbeat event for tracing the latest available binlog offsets.
debezium. optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's MySQL Connector properties

示例2(无ddl,metahub方式)

isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项

set 'alltype.isCDC' = 'true'; -- required
set 'alltype.scan.startup.mode' = 'latest-offset';  -- optional
set 'alltype1.primary.keys'='test_int';

INSERT INTO sloth_test_qa_mysql_167.qa_slothdata.alltype1 select * FROM sloth_test_qa_mysql_167.qa_slothdata.alltype;

oracle-cdc(仅供测试)

需要特别关注:oracle技术栈是闭源的,cdc连接器对其支持是非常有限的,不保证线上可用性,仅可作测试使用!!!

示例1(原生sql)

create table source (
    id  decimal(38,0),
    col_char    string,
    col_varchar2    string,
    col_nchar    string,
    col_nvarchar2    string,
    col_number    decimal(10,5),
    col_number_bigint    decimal(19,0),
    col_date    timestamp,
    col_decimal    decimal(20,5),
    col_smallint    decimal(22,0),
    col_integer    decimal(22,0),
    col_binary_float    float,
    col_binary_double    double,
    col_real    float,
    col_float    float,
    col_timestamp    timestamp,
    col_decimal_bigint    decimal(19,0),
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'oracle-cdc', -- required: specify this table type is oracle-cdc
    'hostname' = 'localhost',
    'port' = '11521',
    'table-name' = 'ALL_FIELD',  -- required: jdbc table name
    'username' = 'root', -- required: jdbc user name and password
    'password' = 'password',
    'database-name' = 'orcl',
    'schema-name' = 'NDC',
    'debezium.database.tablename.case.insensitive' = 'false',
    'debezium.database.connection.adapter' = 'logminer',
    'debezium.log.mining.strategy' = 'online_catalog',
    'debezium.log.mining.continuous.mine' = 'true'
);

create table target (
    id    decimal(38,0),
    col_char    string,
    col_varchar2    string,
    col_nchar    string,
    col_nvarchar2    string,
    col_number    decimal(10,5),
    col_number_bigint    decimal(19,0),
    col_date    timestamp,
    col_decimal    decimal(20,5),
    col_smallint    decimal(22,0),
    col_integer    decimal(22,0),
    col_binary_float    float,
    col_binary_double    double,
    col_real    float,
    col_float    float,
    col_timestamp    timestamp,
    col_decimal_bigint    decimal(19,0),
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'oracle_xstream_all_field',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'canal-json'
);

insert into target select * from source;

示例1参数


Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'oracle-cdc'.
hostname required (none) String IP address or hostname of the Oracle database server.
username required (none) String Name of the Oracle database to use when connecting to the Oracle database server.
password required (none) String Password to use when connecting to the Oracle database server.
database-name required (none) String Database name of the Oracle server to monitor.
schema-name required (none) String Schema name of the Oracle database to monitor.
table-name required (none) String Table name of the Oracle database to monitor.
port optional 1521 Integer Integer port number of the Oracle database server.
scan.startup.mode optional initial String Optional startup mode for Oracle CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information.
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Oracle server. For example: 'debezium.snapshot.mode' = 'never'. See more about the Debezium's Oracle Connector properties

示例2(无ddl语句,metahub方式)

isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项

-- 源端oracle-cdc
set 'WH_TABLE1.isCDC' = 'true';  -- required
set 'WH_TABLE1.schema-name' = 'NDC';
set 'WH_TABLE1.scan.startup.mode' = 'latest-offset';
set 'WH_TABLE1.debezium.database.tablename.case.insensitive' = 'false';
set 'WH_TABLE1.debezium.database.connection.adapter' = 'logminer';
set 'WH_TABLE1.debezium.log.mining.strategy' = 'online_catalog';
set 'WH_TABLE1.debezium.log.mining.continuous.mine' = 'true';

-- 目标端kafka 配置
set 'target_table.topic' = 'ingestion_wanghua_01';
set 'target_table.key.format' = 'csv';
set 'target_table.key.fields' = 'ID';
set 'target_table.format' = 'canal-json';

INSERT INTO poc_kafka_x_catalog.mem.target_table select * FROM dts.NDC.WH_TABLE1;

sqlserver-cdc

示例1(原生sql)

create table source (
    id  bigint,
    name string,
    age int,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'sqlserver-cdc', -- required: specify this table type is sqlserver-cdc
    'hostname' = 'localhost',
    'port' = '1433',
    'table-name' = 'table_9',  -- required: jdbc table name
    'username' = 'root', -- required: jdbc user name and password
    'password' = 'root',
    'database-name' = 'dtstest',
    'schema-name' = 'dbo'
);

create table target (
     id  bigint,
    name string,
    age int
) WITH (
  'connector' = 'kafka',
  'topic' = 'lyb1',
  'properties.bootstrap.servers' = 'localhost:9092',
  'format' = 'canal-json'
);

insert into target select * from source;

示例1参数


Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'sqlserver-cdc'.
hostname required (none) String IP address or hostname of the SQLServer database.
username required (none) String Username to use when connecting to the SQLServer database.
password required (none) String Password to use when connecting to the SQLServer database.
database-name required (none) String Database name of the SQLServer database to monitor.
schema-name required (none) String Schema name of the SQLServer database to monitor.
table-name required (none) String Table name of the SQLServer database to monitor.
port optional 1433 Integer Integer port number of the SQLServer database.
server-time-zone optional UTC String The session time zone in database server, e.g. "Asia/Shanghai".
debezium.* optional (none) String Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SQLServer. For example: 'debezium.snapshot.mode' = 'initial_only'. See more about the Debezium's SQLServer Connector properties

示例2(无ddl语句,metahub方式)

isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项

set 'table_polling_1.isCDC' = 'true'; -- required
set 'table_polling_1.schema-name' = 'dbo';
set 'table_polling_1.scan.startup.mode' = 'latest-offset';

set 'target_table.topic' = 'lyb1';
set 'target_table.key.format' = 'csv';
set 'target_table.key.fields' = 'id';
set 'target_table.format' = 'canal-json';


INSERT INTO poc_kafka_x_catalog.mem.target_table select * FROM poc_sqlserver.dbo.table_polling_1;

tidb-cdc

示例1(原生sql)

CREATE TABLE sink (
  id INT,
  age INT,
  name VARCHAR,
  score double,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql:/localhost:4000/test', -- required: JDBC DB url
    'table-name' = 'easyds_test1',  -- required: jdbc table name
    'username' = 'root', -- required: jdbc user name and password
    'password' = 'root'
);

CREATE TABLE source (
  id INT,
  age INT,
  name VARCHAR,
  score double,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'tidb-cdc', -- required: specify this table type is tidb-cdc
    'pd-addresses' = localhost:2390', -- required: pd-addresses
    'table-name' = 'root',  -- required:  table name
    'database-name' = 'root' -- required:  database
);

INSERT INTO sink select * FROM source;

示例1参数

Option Required Default Type Description
connector required (none) String Specify what connector to use, here should be 'tidb-cdc'.
database-name required (none) String Database name of the TiDB server to monitor.
table-name required (none) String Table name of the TiDB database to monitor.
scan.startup.mode optional initial String Optional startup mode for TiDB CDC consumer, valid enumerations are "initial" and "latest-offset".
pd-addresses required (none) String TiKV cluster's PD address.
tikv.grpc.timeout_in_ms optional (none) Long TiKV GRPC timeout in ms.
tikv.grpc.scan_timeout_in_ms optional (none) Long TiKV GRPC scan timeout in ms.
tikv.batch_get_concurrency optional 20 Integer TiKV GRPC batch get concurrency.
tikv.* optional (none) String Pass-through TiDB client's properties.

示例2(无ddl语句,metahub方式)

  • isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项
  • methhub方式在登记tidb数据源时候,需要在自定义参数中增加 "pd-addresses"参数,否则该方式无法正常使用
set 'easyds_test.isCDC' = 'true';
set 'tikv.grpc.timeout_in_ms' = '20000';
set 'easyds_test1.primary.keys'='id';

INSERT INTO test_tidb.test.easyds_test1 select * FROM test_tidb.test.easyds_test;