JDBC SQL 连接器

JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。本文档描述了针对关系型数据库如何通过建立 JDBC 连接器来执行 SQL 查询。

支持 Mysql、Oracle 、PostgreSQL、Derby、Teradata-16.20、SQLServer、 DB2、GreenPlum

Flink-1.14/Flink-1.12

如果在 DDL 中定义了主键,JDBC sink 将以 upsert 模式与外部系统交换 UPDATE/DELETE 消息;否则,它将以 append 模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。(sqlserver仅在1.14引擎下支持) 在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:

Driver Group Id Artifact Id JAR
MySQL mysql mysql-connector-java https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/
PostgreSQL org.postgresql postgresql https://jdbc.postgresql.org/download.html
Derby org.apache.derby derby http://db.apache.org/derby/derby_downloads.html
Oracle com.oracle.database.jdbc ojdbc8 https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8
Sqlserver com.microsoft.sqlserver mssql-jdbc https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc
DB2 com.ibm.db2.jcc db2jcc https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4

如何创建 JDBC 表

JDBC table 可以按如下定义:

--  Flink SQL 中注册一张 MySQL  'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users'
);

-- 从另一张表 "T" 将数据写入到 JDBC 表中
INSERT INTO MyUserTable
SELECT id, name, age, status FROM T;

-- 查看 JDBC 表中的数据
SELECT id, name, age, status FROM MyUserTable;

-- JDBC 表在时态表关联中作为维表
SELECT * FROM myTopic
LEFT JOIN MyUserTable FOR SYSTEM_TIME AS OF myTopic.proctime
ON myTopic.key = MyUserTable.id;

clickhouse表

CREATE TABLE sink (
`id` bigint,
  name    varchar(100),
   age    string,
 primary key(id) not enforced
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:clickhouse://xxx:8123/',
   'database-name' = 'default',
  'username' = 'default',
  'password' = 'xx',
   'table-name' = 't1',
   'sink.distribute-table-write-local' = 'true',
   'sink.ignore-delete' = 'false',
   'sink.partition-strategy' = 'balanced',
   'sink.partition-key' = 'id'
);

db2表

create table target (
    `ID`  bigint primary key,
    `NAME` string
) WITH (
    'connector' = 'db2',
    'url' = 'jdbc:db2://xxx:50000/testdb1:currentSchema=MYSCHEMA;',
    'table-name' = 'T1',
    'username'= 'xxx',
    'password'= 'xxx'
);

sqlserver表(1.14引擎下支持)

create table source (
    id  bigint,
    name string,
    age int,
    fid bigint,
    proctime as proctime(),
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'sqlserver-cdc', -- required: specify this table type is sqlserver-cdc
    'hostname' = 'xxx',
    'port' = '1433',
    'table-name' = 'table_8',  -- required: jdbc table name
    'username' = 'xx', -- optional: jdbc user name and password
    'password' = 'xxx',
    'database-name' = 'dtstest',
    'schema-name' = 'dbo'
);

create table dim (
    id  bigint,
    address string,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'sqlserver',
    'url' = 'jdbc:sqlserver://xxx:1433;databaseName=dtstest',
    'driver'= 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
    'table-name' = 'table_8_dim',
    'username'= 'xx',
    'password'= 'xxx'
);

create table target (
    id  bigint,
    name string,
    age int,
    address string,
  PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'sqlserver',
    'url' = 'jdbc:sqlserver://xxx:1433;databaseName=dts2',
    'driver'= 'com.microsoft.sqlserver.jdbc.SQLServerDriver',
    'table-name' = 'table_8_target',
    'username'= 'xx',
    'password'= 'xxx'
);

insert into target
select s.id,s.name,s.age,d.address
from source as s left join dim FOR SYSTEM_TIME AS OF s.proctime as d on s.fid = d.id;

hana连接器

create table t2 (
    `ID`  int,
    `NAME` string
) WITH (
    'connector' = 'hana',
    'url' = 'jdbc:sap://xxx:39017?currentSchema=TEST',
    'table-name' = 'T1',
    'username'= 'xx',
    'password'= 'xx'
);

create table t1 (id int, addr string) with ('connector'='datagen','rows-per-second'='2');
create view v1 as select id % 3 as id, addr, proctime() as proc from t1;

create table print (id int, name string, addr string) with ('connector' = 'print');

insert into print
select a.id, a.addr, b.`NAME` from v1 as a left join t2 for system_time as of a.proc as b on a.id = b.`ID`;

GreenPlum,仅Flink-1.14支持

  • 注意
    • Flink DECIMAL(XX, XX)精度仅支持0-38,当GP的numeric(XX,XX)精度大于38位时,读不到数据
    • 当前 Flink 的JDBC connector 仅支持无时区的时间数据,因此不支持timetz等带时区类型
CREATE TABLE gp (
  a VARCHAR,
  b BIGINT,
  c INT,
  d char(1),
  e DOUBLE,
  f boolean,
  g float,
  h TINYINT,
 i DECIMAL
) WITH (
  'connector' = 'jdbc', -- required: specify this table type is jdbc
  'url' = 'jdbc:postgresql://{ip}:{port}/{database}', -- required: JDBC DB url
  'table-name' = '{schemaName}.test',  -- required: jdbc table name
  'username' = 'xx',
  'password' = 'xx'
);

参数配置

配置名称 是否必填 配置生效类型 参数值字段类型 参数默认值 参数说明(用于用户手册)
connector 必填 源表、目标表 String - 指定使用什么类型的连接器,这里应该是'jdbc'
url 必填 源表、目标表 String - JDBC 数据库 url。
table-name 必填 源表、目标表 String - 连接到 JDBC 表的名称。
username 可选 源表、目标表 String - JDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。
password 可选 源表、目标表 String - JDBC 密码。
driver 可选 源表、目标表 String - 用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
scan.partition.column 可选 源表 String - 用于将输入进行分区的列名。
scan.partition.lower-bound 可选 源表 Long - 第一个分区的最小值。
scan.partition.upper-bound 可选 源表 Long - 最后一个分区的最大值。
scan.partition.num 可选 源表 Integer - 分区数。
scan.fetch-size 可选 源表 Integer 0 每次循环读取时应该从数据库中获取的行数。如果指定的值为 '0',则该配置项会被忽略。
scan.auto-commit 可选 源表 Boolean true 在 JDBC 驱动程序上设置 auto-commit 标志, 它决定了每个语句是否在事务中自动提交。
lookup.cache.max-rows 可选 维表 Long 10000 lookup cache 的最大行数,若超过该值,则最老的行记录将会过期。
lookup.cache.ttl 可选 维表 Duration - lookup cache 中每一行记录的最大存活时间,若超过该时间,则最老的行记录将会过期。
lookup.max-retries 可选 维表 Integer 3 查询数据库失败的最大重试时间。
lookup.cache.metric.enable 可选 维表 Boolean false 是否开启缓存监控。
lookup.cache.empty 可选 维表 Boolean false 是否缓存空值。
sink.buffer-flush.max-rows 可选 目标表 Integer 100 flush 前缓存记录的最大值,可以设置为 '0' 来禁用它。
sink.buffer-flush.interval 可选 目标表 Duration 1s flush 间隔时间,超过该时间后异步线程将 flush 数据。
sink.max-retries 可选 目标表 Integer 3 写入记录到数据库失败后的最大重试次数。
sink.parallelism 可选 目标表 Integer - 用于定义 JDBC sink 算子的并行度。
isCDC 可选 源表 Boolean false 标记源表是否是CDC类型

clickhouse 特有参数

sink.distribute-table-write-local 分布式表场景中,是否开启分布式写入优化
sink.partition-strategy 开启分布式写入优化时,分区策略:balanced(round-robin), hash(partition key), shuffle(random)
sink.partition-key 开启分布式写入优化时,指定分区键

特性

键处理

当写入数据到外部数据库时,Flink 会使用 DDL 中定义的主键。如果定义了主键,则连接器将以 upsert 模式工作,否则连接器将以 append 模式工作。

在 upsert 模式下,Flink 将根据主键判断插入新行或者更新已存在的行,这种方式可以确保幂等性。为了确保输出结果是符合预期的,推荐为表定义主键并且确保主键是底层数据库中表的唯一键或主键。在 append 模式下,Flink 会把所有记录解释为 INSERT 消息,如果违反了底层数据库中主键或者唯一约束,INSERT 插入可能会失败。

有关 PRIMARY KEY 语法的更多详细信息,请参见 CREATE TABLE DDL

分区扫描

为了在并行 Source task 实例中加速读取数据,Flink 为 JDBC table 提供了分区扫描的特性。

如果下述分区扫描参数中的任一项被指定,则下述所有的分区扫描参数必须都被指定。这些参数描述了在多个 task 并行读取数据时如何对表进行分区。 scan.partition.column 必须是相关表中的数字、日期或时间戳列。注意,scan.partition.lower-boundscan.partition.upper-bound 用于决定分区的起始位置和过滤表中的数据。如果是批处理作业,也可以在提交 flink 作业之前获取最大值和最小值。

  • scan.partition.column:输入用于进行分区的列名。
  • scan.partition.num:分区数。
  • scan.partition.lower-bound:第一个分区的最小值。
  • scan.partition.upper-bound:最后一个分区的最大值。

Lookup Cache

JDBC 连接器可以用在时态表关联中作为一个可 lookup 的 source (又称为维表),当前只支持同步的查找模式。

默认情况下,lookup cache 是未启用的,你可以设置 lookup.cache.max-rows and lookup.cache.ttl 参数来启用。

lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性能。默认情况下,lookup cache 不开启,所以所有请求都会发送到外部数据库。 当 lookup cache 被启用时,每个进程(即 TaskManager)将维护一个缓存。Flink 将优先查找缓存,只有当缓存未查找到时才向外部数据库发送请求,并使用返回的数据更新缓存。 当缓存命中最大缓存行 lookup.cache.max-rows 或当行超过最大存活时间 lookup.cache.ttl 时,缓存中最老的行将被设置为已过期。 缓存中的记录可能不是最新的,用户可以将 lookup.cache.ttl 设置为一个更小的值以获得更好的刷新数据,但这可能会增加发送到数据库的请求数。所以要做好吞吐量和正确性之间的平衡。

幂等写入

如果在 DDL 中定义了主键,JDBC sink 将使用 upsert 语义而不是普通的 INSERT 语句。upsert 语义指的是如果底层数据库中存在违反唯一性约束,则原子地添加新行或更新现有行,这种方式确保了幂等性。

如果出现故障,Flink 作业会从上次成功的 checkpoint 恢复并重新处理,这可能导致在恢复过程中重复处理消息。强烈推荐使用 upsert 模式,因为如果需要重复处理记录,它有助于避免违反数据库主键约束和产生重复数据。

除了故障恢复场景外,数据源(kafka topic)也可能随着时间的推移自然地包含多个具有相同主键的记录,这使得 upsert 模式是用户期待的。

由于 upsert 没有标准的语法,因此下表描述了不同数据库的 DML 语法:

Database Upsert Grammar
MySQL INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL INSERT .. ON CONFLICT .. DO UPDATE SET ..

数据类型映射

Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、PostgreSQL、Derby、Teradata 等。其中,Derby 通常是用于测试目的。下表列出了从关系数据库数据类型到 Flink SQL 数据类型的类型映射,映射表可以使得在 Flink 中定义 JDBC 表更加简单。

MySQL type PostgreSQL type Teradata type Flink SQL type}}">
TINYINT BYTEINT TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INTEGER
SERIAL
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGSERIAL
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNED BIGINT DECIMAL(20, 0)
BIGINT BIGINT BIGINT BIGINT
FLOAT REAL
FLOAT4
FLOAT
DOUBLE
DOUBLE PRECISION
FLOAT8
DOUBLE PRECISION
FLOAT
DOUBLE
DOUBLE PRECISION
DOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s)
BOOLEAN
TINYINT(1)
BOOLEAN BOOLEAN
DATE DATE DATE DATE
TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE] TIME [(p)] TIME [(p)] [WITHOUT TIMEZONE]
DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE]
CHAR(n)
VARCHAR(n)
TEXT
CHAR(n)
CHARACTER(n)
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
CHAR(n)
VARCHAR(n)
TEXT
STRING
BINARY
VARBINARY
BLOB
BYTEA 不支持 BYTES
ARRAY 不支持 ARRAY

Flink-1.12-cdc

示例

CREATE TABLE orders (
     order_id INT,
     order_date TIMESTAMP(0),
     customer_name STRING,
     price DECIMAL(10, 5),
     product_id INT,
     order_status BOOLEAN,
     PRIMARY KEY(order_id) NOT ENFORCED
     ) WITH (
     'connector' = 'jdbc',
     'isCDC'='true',
     'url' = 'jdbc:mysql://localhost:3306/mydatabase',
     'username' = 'root',
     'password' = '123456',
     'database-name' = 'mydb',
     'table-name' = 'orders');
     如果是元数据任务只需要用set语句设置isCDC参数为true即可

参数

Option Required Default Type Description
isCDC optional (none) Boolean default false
connector required (none) String Specify what connector to use, here should be 'jdbc'.
hostname required (none) String IP address or hostname of the MySQL database server.
username required (none) String Name of the MySQL database to use when connecting to the MySQL database server.
password required (none) String Password to use when connecting to the MySQL database server.
database-name required (none) String Database name of the MySQL server to monitor. The database-name also supports regular expressions to monitor multiple tables matches the regular expression.
table-name required (none) String Table name of the MySQL database to monitor. The table-name also supports regular expressions to monitor multiple tables matches the regular expression.
port optional 3306 Integer Integer port number of the MySQL database server.
server-id optional (none) Integer A numeric ID or a numeric ID range of this database client, The numeric ID syntax is like '5400', the numeric ID range syntax is like '5400-5408', The numeric ID range syntax is recommended when 'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all currently-running database processes in the MySQL cluster. This connector joins the MySQL cluster as another server (with this unique ID) so it can read the binlog. By default, a random number is generated between 5400 and 6400, though we recommend setting an explicit value.
scan.startup.specific-offset.file optional true Boolean Optional offsets used in case of "specific-offset" startup mode
scan.startup.specific-offset.pos optional 8096 Integer Optional offsets used in case of "specific-offset" startup mode
scan.startup.timestamp-millis optional 1024 Integer Optional timestamp used in case of "timestamp" startup mode.
scan.startup.mode optional initial String Optional startup mode for MySQL CDC consumer, valid enumerations are "initial" and "latest-offset". Please see Startup Reading Positionsection for more detailed information.
server-time-zone optional UTC String The session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in MYSQL converted to STRING. See more here.

Flink-1.10

示例

CREATE TABLE kafka_source (item_id INT, pv INT) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'user_behavior',
  'connector.startup-mode' = 'group-offsets',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'update-mode' = 'append',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
);
CREATE TABLE dim (
  item_id INT,
  item_name VARCHAR,
  price INT
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://localhost:3331/test',
  'connector.table' = 'test_join',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'sloth',
  'connector.password' = 'password',
  'connector.lookup.cache.strategy' = 'all',
  'connector.lookup.cache.max-rows' = '5000',
  'connector.lookup.cache.ttl' = '60s',
  'connector.lookup.max-retries' = '3',
);
-- sink
CREATE TABLE sink (
  item_id INT,
  item_name VARCHAR,
  price INT,
  pv INT
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://localhost:3331/test',
  'connector.table' = 'test_join_sink',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'sloth',
  'connector.password' = 'password',
  'connector.write.flush.max-rows' = '1'
);
INSERT INTO
  sink
SELECT
  s.item_id as item_id,
  d.item_name as item_name,
  d.price as price,
  sum(s.pv) AS pv
FROM
  (
    SELECT
      item_id,
      pv,
      PROCTIME() as proc
    from
      kafka_source
  ) as s
  join dim FOR SYSTEM_TIME AS OF s.proc as d on s.item_id = d.item_id
GROUP BY
  s.item_id,
  d.item_name,
  d.price;

Sink With 参数

参数 注释说明 备注
connector.type 维表类型 必填:jdbc
connector.url 数据库 jdbc url 必填
connector.table 数据库表名 必填
connector.driver jdbc 驱动 必填
connector.username 数据库连接用户名 必填
connector.password 数据库连接密码 必填
connector.write.flush.max-rows 数据刷新到数据库的最大条数(包括append,upsert和delete) 选填,默认5000
connector.write.flush.interval 数据定时异步刷新的时间间隔 选填,默认0
connector.write.max-retries 数据输出异常后连接尝试次数 选填,默认3

维表 With 参数

参数 注释说明 备注
connector.type 维表类型 必填:jdbc
connector.url 数据库 jdbc url 必填
connector.table 数据库表名 必填
connector.driver jdbc 驱动 必填
connector.username 数据库连接用户名 必填
connector.password 数据库连接密码 必填
connector.read.partition.column 分区column名字 选填
connector.read.partition.num 分区数 选填
connector.read.max-partition.lower-bound 第一个分区的最小值 选填
connector.read.max-partition.upper-bound 最后一个分区的最大值 选填
connector.lookup.cache.max-rows 维表缓存数据的最大数量 选填
connector.lookup.cache.ttl 维表缓存数据的过期时间 选填
connector.lookup.max-retries 维表数据查询异常的尝试次数 选填,默认3

缓存说明:

connector.lookup.cache.max-rows

connector.lookup.cache.ttl

指定 以上参数后表示需支持缓存,每次维表 JOIN 操作都会先从缓存中匹配,如果匹配不上再执行查询语句。