MySQL Sink

简述

Easystream 支持输出到 Kafka,支持多个 Kafka 版本。

示例

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
  'connector.table' = 'jdbc_table_name',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'name',
  'connector.password' = 'password',
  'connector.write.flush.max-rows' = '5000',
  'connector.write.flush.interval' = '2s',
  'connector.write.max-retries' = '3'
)

默认情况下,connector.write.flush.intervalis 0s,connector.write.flush.max-rowsis 5000,这意味着对于低流量的输出行可能长时间不会刷新到数据库。因此,建议设置间隔配置。

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:jdbc
connector.url 数据库 jdbc url 必填
connector.table 数据库表名 必填
connector.driver jdbc 驱动 必填
connector.username 数据库连接用户名 必填
connector.password 数据库连接密码 必填
connector.write.flush.max-rows 数据刷新到数据库的最大条数(包括append,upsert和delete) 选填,默认5000
connector.write.flush.interval 数据定时异步刷新的时间间隔 选填,默认0
connector.write.max-retries 数据输出异常后连接尝试次数 选填,默认3
connector.key-fields 制定sink时候的主键,多个用逗号隔开 选填