JDBC 维表

简述

Easystream 支持使用 JDBC 表作为维表进行 join。

示例

CREATE TABLE kafka_source (item_id INT, pv INT) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'user_behavior',
  'connector.startup-mode' = 'group-offsets',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.group.id' = 'xxx',
  'update-mode' = 'append',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
);
CREATE TABLE dim (
  item_id INT,
  item_name VARCHAR,
  price INT
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://localhost:3331/test',
  'connector.table' = 'test_join',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'sloth',
  'connector.password' = 'password',
  'connector.lookup.cache.strategy' = 'all',
  'connector.lookup.cache.max-rows' = '5000',
  'connector.lookup.cache.ttl' = '60s',
  'connector.lookup.max-retries' = '3',
);
-- sink
CREATE TABLE sink (
  item_id INT,
  item_name VARCHAR,
  price INT,
  pv INT
) WITH (
  'connector.type' = 'jdbc',
  'connector.url' = 'jdbc:mysql://localhost:3331/test',
  'connector.table' = 'test_join_sink',
  'connector.driver' = 'com.mysql.jdbc.Driver',
  'connector.username' = 'sloth',
  'connector.password' = 'password',
  'connector.write.flush.max-rows' = '1'
);
INSERT INTO
  sink
SELECT
  s.item_id as item_id,
  d.item_name as item_name,
  d.price as price,
  sum(s.pv) AS pv
FROM
  (
    SELECT
      item_id,
      pv,
      PROCTIME() as proc
    from
      kafka_source
  ) as s
  join dim FOR SYSTEM_TIME AS OF s.proc as d on s.item_id = d.item_id
GROUP BY
  s.item_id,
  d.item_name,
  d.price;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:jdbc
connector.url 数据库 jdbc url 必填
connector.table 数据库表名 必填
connector.driver jdbc 驱动 必填
connector.username 数据库连接用户名 必填
connector.password 数据库连接密码 必填
connector.read.partition.column 分区column名字 选填
connector.read.partition.num 分区数 选填
connector.read.max-partition.lower-bound 第一个分区的最小值 选填
connector.read.max-partition.upper-bound 最后一个分区的最大值 选填
connector.lookup.cache.max-rows 维表缓存数据的最大数量 选填
connector.lookup.cache.ttl 维表缓存数据的过期时间 选填
connector.lookup.max-retries 维表数据查询异常的尝试次数 选填,默认3

缓存说明:

connector.lookup.cache.max-rows

connector.lookup.cache.ttl

指定 以上参数后表示需支持缓存,每次维表 JOIN 操作都会先从缓存中匹配,如果匹配不上再执行查询语句。