CDC SQL连接器
更新时间: 2023-04-24 20:23:27
阅读 261
Flink-1.14-cdc
基于flink1.14、flink-cdc2.2.1版本,支持mysql-cdc、oracle-cdc、sqlserver-cdc、tidb-cdc
mysql-cdc
示例1(原生sql)
CREATE TABLE sink (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
test_double DOUBLE,
PRIMARY KEY(test_int) NOT ENFORCED
) WITH (
'connector' = 'jdbc', -- required: specify this table type is jdbc
'url' = 'jdbc:mysql://localhost:3306/qa_slothdata', -- required: JDBC DB url
'table-name' = 'alltype1', -- required: jdbc table name
'username' = 'root', -- required: jdbc user name and password
'password' = 'root'
);
CREATE TABLE source (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
-- testDecimal DECIMAL(38, 18),
test_double DOUBLE,
PRIMARY KEY(test_int) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'database-name' = 'qa_slothdata',
'table-name' = 'alltype', -- required: jdbc table name
'username' = 'root', -- required: jdbc user name and password
'password' = 'root'
);
INSERT INTO sink select * FROM source;
示例1参数
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'mysql-cdc' . |
hostname | required | (none) | String | IP address or hostname of the MySQL database server. |
username | required | (none) | String | Name of the MySQL database to use when connecting to the MySQL database server. |
password | required | (none) | String | Password to use when connecting to the MySQL database server. |
database-name | required | (none) | String | Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression. |
table-name | required | (none) | String | Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression. |
port | optional | 3306 | Integer | Integer port number of the MySQL database server. |
server-id | optional | (none) | Integer | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. |
scan.incremental.snapshot.enabled | optional | true | Boolean | Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism, the incremental snapshot has many advantages, including: (1) source can be parallel during snapshot reading, (2) source can perform checkpoints in the chunk granularity during snapshot reading, (3) source doesn't need to acquire global read lock (FLUSH TABLES WITH READ LOCK) before snapshot reading. If you would like the source run in parallel, each parallel reader should have an unique server id, so the 'server-id' must be a range like '5400-6400', and the range must be larger than the parallelism. Please see Incremental Snapshot Readingsection for more detailed information. |
scan.incremental.snapshot.chunk.size | optional | 8096 | Integer | The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. |
scan.snapshot.fetch.size | optional | 1024 | Integer | The maximum fetch size for per poll when read table snapshot. |
scan.startup.mode | optional | initial | String | Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information. |
server-time-zone | optional | UTC | String | The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more here. |
debezium.min.row. count.to.stream.result | optional | 1000 | Integer | During a snapshot operation, the connector will query each included table to produce a read event for all rows in that table. This parameter determines whether the MySQL connection will pull all results for a table into memory (which is fast but requires large amounts of memory), or whether the results will instead be streamed (can be slower, but will work for very large tables). The value specifies the minimum number of rows a table must contain before the connector will stream results, and defaults to 1,000. Set this parameter to '0' to skip all table size checks and always stream all results during a snapshot. |
connect.timeout | optional | 30s | Duration | The maximum time that the connector should wait after trying to connect to the MySQL database server before timing out. |
connect.max-retries | optional | 3 | Integer | The max retry times that the connector should retry to build MySQL database server connection. |
connection.pool.size | optional | 20 | Integer | The connection pool size. |
jdbc.properties. | optional | 20 | String | Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'. |
heartbeat.interval | optional | 30s | Duration | The interval of sending heartbeat event for tracing the latest available binlog offsets. |
debezium. | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from MySQL server.
For example: 'debezium.snapshot.mode' = 'never' .
See more about the Debezium's MySQL Connector properties |
示例2(无ddl,metahub方式)
isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项
set 'alltype.isCDC' = 'true'; -- required
set 'alltype.scan.startup.mode' = 'latest-offset'; -- optional
set 'alltype1.primary.keys'='test_int';
INSERT INTO sloth_test_qa_mysql_167.qa_slothdata.alltype1 select * FROM sloth_test_qa_mysql_167.qa_slothdata.alltype;
oracle-cdc
示例1(原生sql)
create table source (
id decimal(38,0),
col_char string,
col_varchar2 string,
col_nchar string,
col_nvarchar2 string,
col_number decimal(10,5),
col_number_bigint decimal(19,0),
col_date timestamp,
col_decimal decimal(20,5),
col_smallint decimal(22,0),
col_integer decimal(22,0),
col_binary_float float,
col_binary_double double,
col_real float,
col_float float,
col_timestamp timestamp,
col_decimal_bigint decimal(19,0),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc', -- required: specify this table type is oracle-cdc
'hostname' = 'localhost',
'port' = '11521',
'table-name' = 'ALL_FIELD', -- required: jdbc table name
'username' = 'root', -- required: jdbc user name and password
'password' = 'password',
'database-name' = 'orcl',
'schema-name' = 'NDC',
'debezium.database.tablename.case.insensitive' = 'false',
'debezium.database.connection.adapter' = 'logminer',
'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'
);
create table target (
id decimal(38,0),
col_char string,
col_varchar2 string,
col_nchar string,
col_nvarchar2 string,
col_number decimal(10,5),
col_number_bigint decimal(19,0),
col_date timestamp,
col_decimal decimal(20,5),
col_smallint decimal(22,0),
col_integer decimal(22,0),
col_binary_float float,
col_binary_double double,
col_real float,
col_float float,
col_timestamp timestamp,
col_decimal_bigint decimal(19,0),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'oracle_xstream_all_field',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'canal-json'
);
insert into target select * from source;
示例1参数
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'oracle-cdc' . |
hostname | required | (none) | String | IP address or hostname of the Oracle database server. |
username | required | (none) | String | Name of the Oracle database to use when connecting to the Oracle database server. |
password | required | (none) | String | Password to use when connecting to the Oracle database server. |
database-name | required | (none) | String | Database name of the Oracle server to monitor. |
schema-name | required | (none) | String | Schema name of the Oracle database to monitor. |
table-name | required | (none) | String | Table name of the Oracle database to monitor. |
port | optional | 1521 | Integer | Integer port number of the Oracle database server. |
scan.startup.mode | optional | initial | String | Optional startup mode for Oracle CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information. |
debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from Oracle server.
For example: 'debezium.snapshot.mode' = 'never' .
See more about the Debezium's Oracle Connector properties |
示例2(无ddl语句,metahub方式)
isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项
-- 源端oracle-cdc
set 'WH_TABLE1.isCDC' = 'true'; -- required
set 'WH_TABLE1.schema-name' = 'NDC';
set 'WH_TABLE1.scan.startup.mode' = 'latest-offset';
set 'WH_TABLE1.debezium.database.tablename.case.insensitive' = 'false';
set 'WH_TABLE1.debezium.database.connection.adapter' = 'logminer';
set 'WH_TABLE1.debezium.log.mining.strategy' = 'online_catalog';
set 'WH_TABLE1.debezium.log.mining.continuous.mine' = 'true';
-- 目标端kafka 配置
set 'target_table.topic' = 'ingestion_wanghua_01';
set 'target_table.key.format' = 'csv';
set 'target_table.key.fields' = 'ID';
set 'target_table.format' = 'canal-json';
INSERT INTO poc_kafka_x_catalog.mem.target_table select * FROM dts.NDC.WH_TABLE1;
sqlserver-cdc
示例1(原生sql)
create table source (
id bigint,
name string,
age int,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc', -- required: specify this table type is sqlserver-cdc
'hostname' = 'localhost',
'port' = '1433',
'table-name' = 'table_9', -- required: jdbc table name
'username' = 'root', -- required: jdbc user name and password
'password' = 'root',
'database-name' = 'dtstest',
'schema-name' = 'dbo'
);
create table target (
id bigint,
name string,
age int
) WITH (
'connector' = 'kafka',
'topic' = 'lyb1',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'canal-json'
);
insert into target select * from source;
示例1参数
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'sqlserver-cdc' . |
hostname | required | (none) | String | IP address or hostname of the SQLServer database. |
username | required | (none) | String | Username to use when connecting to the SQLServer database. |
password | required | (none) | String | Password to use when connecting to the SQLServer database. |
database-name | required | (none) | String | Database name of the SQLServer database to monitor. |
schema-name | required | (none) | String | Schema name of the SQLServer database to monitor. |
table-name | required | (none) | String | Table name of the SQLServer database to monitor. |
port | optional | 1433 | Integer | Integer port number of the SQLServer database. |
server-time-zone | optional | UTC | String | The session time zone in database server, e.g. "Asia/Shanghai". |
debezium.* | optional | (none) | String | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SQLServer.
For example: 'debezium.snapshot.mode' = 'initial_only' .
See more about the Debezium's SQLServer Connector properties |
示例2(无ddl语句,metahub方式)
isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项
set 'table_polling_1.isCDC' = 'true'; -- required
set 'table_polling_1.schema-name' = 'dbo';
set 'table_polling_1.scan.startup.mode' = 'latest-offset';
set 'target_table.topic' = 'lyb1';
set 'target_table.key.format' = 'csv';
set 'target_table.key.fields' = 'id';
set 'target_table.format' = 'canal-json';
INSERT INTO poc_kafka_x_catalog.mem.target_table select * FROM poc_sqlserver.dbo.table_polling_1;
tidb-cdc
示例1(原生sql)
CREATE TABLE sink (
id INT,
age INT,
name VARCHAR,
score double,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql:/localhost:4000/test', -- required: JDBC DB url
'table-name' = 'easyds_test1', -- required: jdbc table name
'username' = 'root', -- required: jdbc user name and password
'password' = 'root'
);
CREATE TABLE source (
id INT,
age INT,
name VARCHAR,
score double,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'tidb-cdc', -- required: specify this table type is tidb-cdc
'pd-addresses' = localhost:2390', -- required: pd-addresses
'table-name' = 'root', -- required: table name
'database-name' = 'root' -- required: database
);
INSERT INTO sink select * FROM source;
示例1参数
Option | Required | Default | Type | Description |
---|---|---|---|---|
connector | required | (none) | String | Specify what connector to use, here should be 'tidb-cdc' . |
database-name | required | (none) | String | Database name of the TiDB server to monitor. |
table-name | required | (none) | String | Table name of the TiDB database to monitor. |
scan.startup.mode | optional | initial | String | Optional startup mode for TiDB CDC consumer, valid enumerations are "initial" and "latest-offset". |
pd-addresses | required | (none) | String | TiKV cluster's PD address. |
tikv.grpc.timeout_in_ms | optional | (none) | Long | TiKV GRPC timeout in ms. |
tikv.grpc.scan_timeout_in_ms | optional | (none) | Long | TiKV GRPC scan timeout in ms. |
tikv.batch_get_concurrency | optional | 20 | Integer | TiKV GRPC batch get concurrency. |
tikv.* | optional | (none) | String | Pass-through TiDB client's properties. |
示例2(无ddl语句,metahub方式)
- isCDC参数为必填项,其余参数可以根据需要自行补充,参考示例1参数配置项
- methhub方式在登记tidb数据源时候,需要在自定义参数中增加 "pd-addresses"参数,否则该方式无法正常使用
set 'easyds_test.isCDC' = 'true';
set 'tikv.grpc.timeout_in_ms' = '20000';
set 'easyds_test1.primary.keys'='id';
INSERT INTO test_tidb.test.easyds_test1 select * FROM test_tidb.test.easyds_test;
文档反馈
以上内容对您是否有帮助?