JDBC SQL 连接器
JDBC SQL 连接器
JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。
支持 Mysql、Oracle 、PostgreSQL、Derby、Teradata-16.20、SQLServer
Flink-1.14/Flink-1.12
如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。(sqlserver仅在1.14引擎下支持) 在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
Driver | Group Id | Artifact Id | JAR |
---|---|---|---|
MySQL | mysql |
mysql-connector-java |
https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/ |
PostgreSQL | org.postgresql |
postgresql |
https://jdbc.postgresql.org/download.html |
Derby | org.apache.derby |
derby |
http://db.apache.org/derby/derby_downloads.html |
Oracle | com.oracle.database.jdbc |
ojdbc8 |
https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 |
Sqlserver | com.microsoft.sqlserver |
mssql-jdbc |
https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
如何创建 JDBC 表
JDBC table 可以按如下定义:
-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
id BIGINT,
name STRING,
age INT,
status BOOLEAN,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'table-name' = 'users'
);
-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;
-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;
-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;
clickhouse表
CREATE TABLE sink (
`id` bigint,
name varchar(100),
age string,
primary key(id) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:clickhouse://10.196.98.29:8123/',
'database-name' = 'default',
'username' = 'default',
'password' = 'xx',
'table-name' = 't1',
'sink.distribute-table-write-local' = 'true',
'sink.ignore-delete' = 'false',
'sink.partition-strategy' = 'balanced',
'sink.partition-key' = 'id'
);
sqlserver表(1.14引擎下支持)
create table source (
id bigint,
name string,
age int,
fid bigint,
proctime as proctime(),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc', -- required: specify this table type is sqlserver-cdc
'hostname' = '59.111.223.147',
'port' = '1433',
'table-name' = 'table_8', -- required: jdbc table name
'username' = 'xx', -- optional: jdbc user name and password
'password' = 'xxx',
'database-name' = 'dtstest',
'schema-name' = 'dbo'
);
create table dim (
id bigint,
address string,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver',
'url' = 'jdbc:sqlserver://59.111.223.147:1433;databaseName=dtstest',
'driver'= 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'table-name' = 'table_8_dim',
'username'= 'xx',
'password'= 'xxx'
);
create table target (
id bigint,
name string,
age int,
address string,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver',
'url' = 'jdbc:sqlserver://59.111.223.147:1433;databaseName=dts2',
'driver'= 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
'table-name' = 'table_8_target',
'username'= 'xx',
'password'= 'xxx'
);
insert into target
select s.id,s.name,s.age,d.address
from source as s left join dim FOR SYSTEM_TIME AS OF s.proctime as d on s.fid = d.id;
连接器参数
参数 | 是否必填 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
connector |
必填 | (none) | String | 指定使用什么类型的连接器,这里应该是'jdbc'
sqlserver需要指定为sqlserver 。 |
url |
必填 | (none) | String | JDBC 数据库 url。 |
table-name |
必填 | (none) | String | 连接到 JDBC 表的名称。 |
driver |
可选 | (none) | String | 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。 |
username |
可选 | (none) | String | JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。 |
password |
可选 | (none) | String | JDBC 密码。 |
connection.max-retry-timeout |
可选 | 60s | Duration | 最大重试超时时间,以秒为单位且不应该小于 1 秒。 |
scan.partition.column |
可选 | (none) | String | 用于将输入进行分区的列名。请参阅下面的分区扫描部分了解更多详情。 |
scan.partition.num |
可选 | (none) | Integer | 分区数。 |
scan.partition.lower-bound |
可选 | (none) | Integer | 第一个分区的最小值。 |
scan.partition.upper-bound |
可选 | (none) | Integer | 最后一个分区的最大值。 |
scan.fetch-size |
可选 | 0 | Integer | 每次循环读取时应该从数据库中获取的行数。如果指定的值为 '0' ,则该配置项会被忽略。 |
scan.auto-commit |
可选 | true | Boolean | 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。有些 JDBC 驱动程序,特别是 Postgres,可能需要将此设置为 false 以便流化结果。 |
lookup.cache.max-rows |
可选 | (none) | Integer | lookup cache 的最大行数,若超过该值,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.cache.ttl |
可选 | (none) | Duration | lookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。 默认情况下,lookup cache 是未开启的。请参阅下面的 Lookup Cache 部分了解更多详情。 |
lookup.max-retries |
可选 | 3 | Integer | 查询数据库失败的最大重试时间。 |
lookup.cache.empty |
可选 | false | Boolean | 是否缓存空值 |
lookup.cache.metric.enable |
可选 | false | Boolean | 是否开启缓存监控。可以在flink-webui->Running Jobs-> 点击要查看的任务名-> 点击要查看的chainedOperator->右边的metrics页->Add Metric框:搜索cache进行查看。注意:目前前端有bug,命中率等小数无法看到真实值,只能看到0或1,需看与后端交互的接口返回值。监控指标有: XXX.cache.hit-rate:命中率=hit-count/(hit-count+miss-count) XXX.cache.hit-count:命中数 XXX.cache.miss-count:未命中数,=加载成功数+加载异常数 XXX.cache.average-load-cost:每条记录平均耗时,单位ms,total-load-time/(load-success-count+load-exception-count) XXX.cache.load-success-count:缓存加载成功数 XXX.cache.load-exception-count:缓存加载异常数,外部数据库未匹配到join key XXX.cache.load-exception-rate:缓存加载异常率,当异常率很高时建议开启缓存空值 XXX.cache.total-load-time:总的缓存加载耗时,单位s XXX.cache.cache-record-count:缓存的记录数 |
sink.buffer-flush.max-rows |
可选 | 100 | Integer | flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。 |
sink.buffer-flush.interval |
可选 | 1s | Duration | flush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。 |
sink.max-retries |
可选 | 3 | Integer | 写入记录到数据库失败后的最大重试次数。 |
sink.parallelism |
可选 | (none) | Integer | 用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。 |
sink.distribute-table-write-local | 分布式表场景中,是否开启分布式写入优化 |
sink.partition-strategy | 开启分布式写入优化时,分区策略:balanced(round-robin), hash(partition key), shuffle(random) |
sink.partition-key | 开启分布式写入优化时,指定分区键 |
特性
键处理
当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。
在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。
有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL
分区扫描
为了在并行 Source
task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。
如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。
scan.partition.column
必须是相关表中的数字、日期或时间戳列。注意,scan.partition.lower-bound
和 scan.partition.upper-bound
用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。
scan.partition.column
:输入用于进行分区的列名。scan.partition.num
:分区数。scan.partition.lower-bound
:第一个分区的最小值。scan.partition.upper-bound
:最后一个分区的最大值。
Lookup Cache
JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。
默认情况下,lookup cache 是未启用的,你可以设置 lookup.cache.max-rows
and lookup.cache.ttl
参数来启用。
lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。
当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。
当缓存命中最大缓存行 lookup.cache.max-rows
或当行超过最大存活时间 lookup.cache.ttl
时,缓存中最老的行将被设置为已过期。
缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl
设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。
幂等写入
如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,这种方式确保了幂等性。
如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。
除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是用户期待的。
由于 upsert 没有标准的语法,因此下表描述了不同数据库的 DML 语法:
Database | Upsert Grammar |
---|---|
MySQL | INSERT .. ON DUPLICATE KEY UPDATE .. |
PostgreSQL | INSERT .. ON CONFLICT .. DO UPDATE SET .. |
数据类型映射
Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、PostgreSQL、Derby、Teradata 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。
MySQL type | PostgreSQL type | Teradata type | Flink SQL type}}"> |
---|---|---|---|
TINYINT |
BYTEINT |
TINYINT |
|
SMALLINT TINYINT UNSIGNED |
SMALLINT INT2 SMALLSERIAL SERIAL2 |
SMALLINT TINYINT UNSIGNED |
SMALLINT |
INT MEDIUMINT SMALLINT UNSIGNED |
INTEGER SERIAL |
INT MEDIUMINT SMALLINT UNSIGNED |
INT |
BIGINT INT UNSIGNED |
BIGINT BIGSERIAL |
BIGINT INT UNSIGNED |
BIGINT |
BIGINT UNSIGNED |
BIGINT |
DECIMAL(20, 0) |
|
BIGINT |
BIGINT |
BIGINT |
BIGINT |
FLOAT |
REAL FLOAT4 |
FLOAT |
|
DOUBLE DOUBLE PRECISION |
FLOAT8 DOUBLE PRECISION |
FLOAT DOUBLE DOUBLE PRECISION |
DOUBLE |
NUMERIC(p, s) DECIMAL(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
NUMERIC(p, s) DECIMAL(p, s) |
DECIMAL(p, s) |
BOOLEAN TINYINT(1) |
BOOLEAN |
BOOLEAN |
|
DATE |
DATE |
DATE |
DATE |
TIME [(p)] |
TIME [(p)] [WITHOUT TIMEZONE] |
TIME [(p)] |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
CHAR(n) VARCHAR(n) TEXT |
CHAR(n) CHARACTER(n) VARCHAR(n) CHARACTER VARYING(n) TEXT |
CHAR(n) VARCHAR(n) TEXT |
STRING |
BINARY VARBINARY BLOB |
BYTEA |
不支持 | BYTES |
ARRAY |
不支持 | ARRAY |
Flink-1.12-cdc
示例
CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'isCDC'='true',
'url' = 'jdbc:mysql://localhost:3306/mydatabase',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');
如果是元数据任务只需要用set语句设置isCDC参数为true即可
参数
Option | Required | Default | Type | Description |
---|---|---|---|---|
isCDC | optional | (none) | Boolean | default false |
connector | required | (none) | String | Specify what connector to use, here should be 'jdbc' . |
hostname | required | (none) | String | IP address or hostname of the MySQL database server. |
username | required | (none) | String | Name of the MySQL database to use when connecting to the MySQL database server. |
password | required | (none) | String | Password to use when connecting to the MySQL database server. |
database-name | required | (none) | String | Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression. |
table-name | required | (none) | String | Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression. |
port | optional | 3306 | Integer | Integer port number of the MySQL database server. |
server-id | optional | (none) | Integer | A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value. |
scan.startup.specific-offset.file | optional | true | Boolean | Optional offsets used in case of "specific-offset" startup mode |
scan.startup.specific-offset.pos | optional | 8096 | Integer | Optional offsets used in case of "specific-offset" startup mode |
scan.startup.timestamp-millis | optional | 1024 | Integer | Optional timestamp used in case of "timestamp" startup mode. |
scan.startup.mode | optional | initial | String | Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information. |
server-time-zone | optional | UTC | String | The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more here. |
Flink-1.10
示例
CREATE TABLE kafka_source (item_id INT, pv INT) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'group-offsets',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
CREATE TABLE dim (
item_id INT,
item_name VARCHAR,
price INT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3331/test',
'connector.table' = 'test_join',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'sloth',
'connector.password' = 'password',
'connector.lookup.cache.strategy' = 'all',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '60s',
'connector.lookup.max-retries' = '3',
);
-- sink
CREATE TABLE sink (
item_id INT,
item_name VARCHAR,
price INT,
pv INT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3331/test',
'connector.table' = 'test_join_sink',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'sloth',
'connector.password' = 'password',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO
sink
SELECT
s.item_id as item_id,
d.item_name as item_name,
d.price as price,
sum(s.pv) AS pv
FROM
(
SELECT
item_id,
pv,
PROCTIME() as proc
from
kafka_source
) as s
join dim FOR SYSTEM_TIME AS OF s.proc as d on s.item_id = d.item_id
GROUP BY
s.item_id,
d.item_name,
d.price;
Sink With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:jdbc |
connector.url | 数据库 jdbc url | 必填 |
connector.table | 数据库表名 | 必填 |
connector.driver | jdbc 驱动 | 必填 |
connector.username | 数据库连接用户名 | 必填 |
connector.password | 数据库连接密码 | 必填 |
connector.write.flush.max-rows | 数据刷新到数据库的最大条数(包括append,upsert和delete) | 选填,默认5000 |
connector.write.flush.interval | 数据定时异步刷新的时间间隔 | 选填,默认0 |
connector.write.max-retries | 数据输出异常后连接尝试次数 | 选填,默认3 |
维表 With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:jdbc |
connector.url | 数据库 jdbc url | 必填 |
connector.table | 数据库表名 | 必填 |
connector.driver | jdbc 驱动 | 必填 |
connector.username | 数据库连接用户名 | 必填 |
connector.password | 数据库连接密码 | 必填 |
connector.read.partition.column | 分区column名字 | 选填 |
connector.read.partition.num | 分区数 | 选填 |
connector.read.max-partition.lower-bound | 第一个分区的最小值 | 选填 |
connector.read.max-partition.upper-bound | 最后一个分区的最大值 | 选填 |
connector.lookup.cache.max-rows | 维表缓存数据的最大数量 | 选填 |
connector.lookup.cache.ttl | 维表缓存数据的过期时间 | 选填 |
connector.lookup.max-retries | 维表数据查询异常的尝试次数 | 选填,默认3 |
缓存说明:
connector.lookup.cache.max-rows
connector.lookup.cache.ttl
指定 以上参数后表示需支持缓存,每次维表 JOIN 操作都会先从缓存中匹配,如果匹配不上再执行查询语句。
以上内容对您是否有帮助?