JDBC SQL 连接器
JDBC SQL 连接器
JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。
支持 Mysql、Oracle 、PostgreSQL、Derby、Teradata-16.20、SQLServer、 DB2、GreenPlum
Flink-1.14/Flink-1.12
如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。(sqlserver仅在1.14引擎下支持) 在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
Driver | Group Id | Artifact Id | JAR |
---|---|---|---|
MySQL | mysql |
mysql-connector-java |
https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/ |
PostgreSQL | org.postgresql |
postgresql |
https://jdbc.postgresql.org/download.html |
Derby | org.apache.derby |
derby |
http://db.apache.org/derby/derby_downloads.html |
Oracle | com.oracle.database.jdbc |
ojdbc8 |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
Sqlserver | com.microsoft.sqlserver |
mssql-jdbc |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
DB2 | com.ibm.db2.jcc |
db2jcc |
https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 |
如何创建 JDBC 表
JDBC table 可以按如下定义:
-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;
-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
clickhouse表
CREATE TABLE sink (
`id` bigint,
name varchar(100),
age string,
primary key(id) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:clickhouse://xxx:8123/',
'database-name' = 'default',
'username' = 'default',
'password' = 'xx',
'table-name' = 't1',
'sink.distribute-table-write-local' = 'true',
'sink.ignore-delete' = 'false',
'sink.partition-strategy' = 'balanced',
'sink.partition-key' = 'id'
);
db2表
create table target (
`ID` bigint primary key,
`NAME` string
) WITH (
'connector' = 'db2',
'url' = 'jdbc:db2://xxx:50000/testdb1:currentSchema=MYSCHEMA;',
'table-name' = 'T1',
'username'= 'xxx',
'password'= 'xxx'
);
sqlserver表(1.14引擎下支持)
create table source (
id bigint,
name string,
age int,
fid bigint,
proctime as proctime(),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc', -- required: specify this table type is sqlserver-cdc
'hostname' = 'xxx',
'port' = '1433',
'table-name' = 'table_8', -- required: jdbc table name
'username' = 'xx', -- optional: jdbc user name and password
'password' = 'xxx',
'database-name' = 'dtstest',
'schema-name' = 'dbo'
);
create table dim (
id bigint,
address string,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver',
'url' = 'jdbc:sqlserver://xxx:1433;databaseName=dtstest',
'driver'= 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'table-name' = 'table_8_dim',
'username'= 'xx',
'password'= 'xxx'
);
create table target (
id bigint,
name string,
age int,
address string,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver',
'url' = 'jdbc:sqlserver://xxx:1433;databaseName=dts2',
'driver'= 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'table-name' = 'table_8_target',
'username'= 'xx',
'password'= 'xxx'
);
insert into target
select s.id,s.name,s.age,d.address
from source as s left join dim FOR SYSTEM_TIME AS OF s.proctime as d on s.fid = d.id;
hana连接器
create table t2 (
`ID` int,
`NAME` string
) WITH (
'connector' = 'hana',
'url' = 'jdbc:sap://xxx:39017?currentSchema=TEST',
'table-name' = 'T1',
'username'= 'xx',
'password'= 'xx'
);
create table t1 (id int, addr string) with ('connector'='datagen','rows-per-second'='2');
create view v1 as select id % 3 as id, addr, proctime() as proc from t1;
create table print (id int, name string, addr string) with ('connector' = 'print');
insert into print
select a.id, a.addr, b.`NAME` from v1 as a left join t2 for system_time as of a.proc as b on a.id = b.`ID`;
达梦连接器
create table t1
(city_id string,
city_name string,
region_id int)
with (
'connector' = 'dm',
'url' = 'jdbc:dm://127.0.0.1:5236',
'table-name' = 'DMHR.CITY',
'username' = 'SYSDBA',
'password' = 'xxxxx'
);
tdsql连接器
按jdbc连接器使用方法
CREATE TABLE sink (
a int,
b int,
c string
) WITH (
'connector' = 'jdbc', -- required: specify this table type is jdbc
'url' = 'jdbc:mysql://10.196.73.157:15005/test_db', -- required: JDBC DB url
'table-name' = 'lyb_t1', -- required: jdbc table name
'driver' = 'com.mysql.jdbc.Driver',
'username' = 'hanlon',
'password' = 'xxxxx',
'sink.buffer-flush.max-rows' = '5000', -- optional, flush max size (includes all append, upsert and delete records),
-- over this number of records, will flush data. The default value is "5000".
'sink.buffer-flush.interval' = '2s', -- optional, flush interval mills, over this time, asynchronous threads will flush data.
-- The default value is "0s", which means no asynchronous flush thread will be scheduled.
'sink.max-retries' = '3' -- optional, max retry times if writing records to database failed
);
GreenPlum,仅Flink-1.14支持
- 注意:
- Flink DECIMAL(XX, XX)精度仅支持0-38,当GP的numeric(XX,XX)精度大于38位时,读不到数据
- 当前 Flink 的JDBC connector 仅支持无时区的时间数据,因此不支持timetz等带时区类型
CREATE TABLE gp (
a VARCHAR,
b BIGINT,
c INT,
d char(1),
e DOUBLE,
f boolean,
g float,
h TINYINT,
i DECIMAL
) WITH (
'connector' = 'jdbc', -- required: specify this table type is jdbc
'url' = 'jdbc:postgresql://{ip}:{port}/{database}', -- required: JDBC DB url
'table-name' = '{schemaName}.test', -- required: jdbc table name,
'username' = 'xx',
'password' = 'xx'
);
Oracle
ddl写法:
不支持小写列名
create table t(
ID int, NAME string
) with(
'connector' ='jdbc',
'url' ='jdbc:oracle:thin:@xxx:11521:orcl',
'table-name'='TEST.T1',
'username'='xxx',
'password'='xxx'
);
metahub写法:
若在实时数仓注册流表,库名、表名含小写、特殊字符都可用,列名不可小写
若用catalog.db.table三元组,库名、表名含小写、特殊字符(加号、点号、单引号之外)时,需要用如下写法,列名也不可小写 select - from catalog.
"autotest_%_test"
."autotest_%_test2"
create table t1 with ('connector' = 'print') like `oracle_11_metahub_autotest_336_49161_17`.`"autotest_#_test"`.`t1` (excluding all);
insert into t1 select - from `oracle_11_metahub_autotest_336_49161_17`.`"autotest_#_test"`.`"t1"`
参数配置
配置名称 | 是否必填 | 配置生效类型 | 参数值字段类型 | 参数默认值 | 参数说明(用于用户手册) |
---|---|---|---|---|---|
connector | 必填 | 源表、目标表 | String | - | 指定使用什么类型的连接器,这里应该是'jdbc' |
url | 必填 | 源表、目标表 | String | - | JDBC 数据库 url。 |
table-name | 必填 | 源表、目标表 | String | - | 连接到 JDBC 表的名称。 |
username | 可选 | 源表、目标表 | String | - | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
password | 可选 | 源表、目标表 | String | - | JDBC 密码。 |
driver | 可选 | 源表、目标表 | String | - | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
scan.partition.column | 可选 | 源表 | String | - | 用于将输入进行分区的列名。 |
scan.partition.lower-bound | 可选 | 源表 | Long | - | 第一个分区的最小值。 |
scan.partition.upper-bound | 可选 | 源表 | Long | - | 最后一个分区的最大值。 |
scan.partition.num | 可选 | 源表 | Integer | - | 分区数。 |
scan.fetch-size | 可选 | 源表 | Integer | 0 | 每次循环读取时应该从数据库中获取的行数。如果指定的值为 '0',则该配置项会被忽略。 |
scan.auto-commit | 可选 | 源表 | Boolean | true | 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。 |
lookup.cache.max-rows | 可选 | 维表 | Long | 10000 | lookup cache 的最大行数,若超过该值,则最老的行记录将会过期。 |
lookup.cache.ttl | 可选 | 维表 | Duration | - | lookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。 |
lookup.max-retries | 可选 | 维表 | Integer | 3 | 查询数据库失败的最大重试时间。 |
lookup.cache.metric.enable | 可选 | 维表 | Boolean | false | 是否开启缓存监控。 |
lookup.cache.empty | 可选 | 维表 | Boolean | false | 是否缓存空值。 |
sink.buffer-flush.max-rows | 可选 | 目标表 | Integer | 100 | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
sink.buffer-flush.interval | 可选 | 目标表 | Duration | 1s | flush 间隔时间,超过该时间后异步线程将 flush 数据。 |
sink.max-retries | 可选 | 目标表 | Integer | 3 | 写入记录到数据库失败后的最大重试次数。 |
sink.parallelism | 可选 | 目标表 | Integer | - | 用于定义 JDBC sink 算子的并行度。 |
isCDC | 可选 | 源表 | Boolean | false | 标记源表是否是CDC类型 |
clickhouse 特有参数
sink.distribute-table-write-local | 分布式表场景中,是否开启分布式写入优化 |
sink.partition-strategy | 开启分布式写入优化时,分区策略:balanced(round-robin), hash(partition key), shuffle(random) |
sink.partition-key | 开启分布式写入优化时,指定分区键 |
特性
键处理
当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。
在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。
有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL
分区扫描
为了在并行 Source
task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。
如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。
scan.partition.column
必须是相关表中的数字、日期或时间戳列。注意,scan.partition.lower-bound
和 scan.partition.upper-bound
用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。
scan.partition.column
:输入用于进行分区的列名。scan.partition.num
:分区数。scan.partition.lower-bound
:第一个分区的最小值。scan.partition.upper-bound
:最后一个分区的最大值。
Lookup Cache
JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。
默认情况下,lookup cache 是未启用的,你可以设置 lookup.cache.max-rows
and lookup.cache.ttl
参数来启用。
lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。
当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。
当缓存命中最大缓存行 lookup.cache.max-rows
或当行超过最大存活时间 lookup.cache.ttl
时,缓存中最老的行将被设置为已过期。
缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl
设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。
幂等写入
如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,这种方式确保了幂等性。
如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。
除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是用户期待的。
由于 upsert 没有标准的语法,因此下表描述了不同数据库的 DML 语法:
Database | Upsert Grammar |
---|---|
MySQL | INSERT .. ON DUPLICATE KEY UPDATE .. |
PostgreSQL | INSERT .. ON CONFLICT .. DO UPDATE SET .. |
数据类型映射
Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、PostgreSQL、Derby、Teradata 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。
Flink-1.12-cdc
示例
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'isCDC'='true',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');
如果是元数据任务只需要用set语句设置isCDC参数为true即可
参数
Option | Required | Default | Type | Description |
---|---|---|---|---|
isCDC | optional | (none) | Boolean | default false |
connector | required | (none) | String | Specify what connector to use, here should be 'jdbc' . |
hostname | required | (none) | String | IP address or hostname of the MySQL database server. |
username | required | (none) | String | Name of the MySQL database to use when connecting to the MySQL database server. |
password | required | (none) | String | Password to use when connecting to the MySQL database server. |
database-name | required | (none) | String | Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression. |
table-name | required | (none) | String | Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression. |
port | optional | 3306 | Integer | Integer port number of the MySQL database server. |
server-id | optional | (none) | Integer | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. |
scan.startup.specific-offset.file | optional | true | Boolean | Optional offsets used in case of "specific-offset" startup mode |
scan.startup.specific-offset.pos | optional | 8096 | Integer | Optional offsets used in case of "specific-offset" startup mode |
scan.startup.timestamp-millis | optional | 1024 | Integer | Optional timestamp used in case of "timestamp" startup mode. |
scan.startup.mode | optional | initial | String | Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information. |
server-time-zone | optional | UTC | String | The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more here. |
Flink-1.10
示例
CREATE TABLE kafka_source (item_id INT, pv INT) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'group-offsets',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
CREATE TABLE dim (
item_id INT,
item_name VARCHAR,
price INT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3331/test',
'connector.table' = 'test_join',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'sloth',
'connector.password' = 'password',
'connector.lookup.cache.strategy' = 'all',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '60s',
'connector.lookup.max-retries' = '3',
);
-- sink
CREATE TABLE sink (
item_id INT,
item_name VARCHAR,
price INT,
pv INT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3331/test',
'connector.table' = 'test_join_sink',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'sloth',
'connector.password' = 'password',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO
sink
SELECT
s.item_id as item_id,
d.item_name as item_name,
d.price as price,
sum(s.pv) AS pv
FROM
(
SELECT
item_id,
pv,
PROCTIME() as proc
from
kafka_source
) as s
join dim FOR SYSTEM_TIME AS OF s.proc as d on s.item_id = d.item_id
GROUP BY
s.item_id,
d.item_name,
d.price;
Sink With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:jdbc |
connector.url | 数据库 jdbc url | 必填 |
connector.table | 数据库表名 | 必填 |
connector.driver | jdbc 驱动 | 必填 |
connector.username | 数据库连接用户名 | 必填 |
connector.password | 数据库连接密码 | 必填 |
connector.write.flush.max-rows | 数据刷新到数据库的最大条数(包括append,upsert和delete) | 选填,默认5000 |
connector.write.flush.interval | 数据定时异步刷新的时间间隔 | 选填,默认0 |
connector.write.max-retries | 数据输出异常后连接尝试次数 | 选填,默认3 |
维表 With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:jdbc |
connector.url | 数据库 jdbc url | 必填 |
connector.table | 数据库表名 | 必填 |
connector.driver | jdbc 驱动 | 必填 |
connector.username | 数据库连接用户名 | 必填 |
connector.password | 数据库连接密码 | 必填 |
connector.read.partition.column | 分区column名字 | 选填 |
connector.read.partition.num | 分区数 | 选填 |
connector.read.max-partition.lower-bound | 第一个分区的最小值 | 选填 |
connector.read.max-partition.upper-bound | 最后一个分区的最大值 | 选填 |
connector.lookup.cache.max-rows | 维表缓存数据的最大数量 | 选填 |
connector.lookup.cache.ttl | 维表缓存数据的过期时间 | 选填 |
connector.lookup.max-retries | 维表数据查询异常的尝试次数 | 选填,默认3 |
缓存说明:
connector.lookup.cache.max-rows
connector.lookup.cache.ttl
指定 以上参数后表示需支持缓存,每次维表 JOIN 操作都会先从缓存中匹配,如果匹配不上再执行查询语句。
以上内容对您是否有帮助?