JDBC 维表
更新时间: 2021-11-23 10:32:36
阅读 1341
JDBC 维表
简述
Easystream 支持使用 JDBC 表作为维表进行 join。
示例
CREATE TABLE kafka_source (item_id INT, pv INT) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'group-offsets',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
CREATE TABLE dim (
item_id INT,
item_name VARCHAR,
price INT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3331/test',
'connector.table' = 'test_join',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'sloth',
'connector.password' = 'password',
'connector.lookup.cache.strategy' = 'all',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '60s',
'connector.lookup.max-retries' = '3',
);
-- sink
CREATE TABLE sink (
item_id INT,
item_name VARCHAR,
price INT,
pv INT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3331/test',
'connector.table' = 'test_join_sink',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'sloth',
'connector.password' = 'password',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO
sink
SELECT
s.item_id as item_id,
d.item_name as item_name,
d.price as price,
sum(s.pv) AS pv
FROM
(
SELECT
item_id,
pv,
PROCTIME() as proc
from
kafka_source
) as s
join dim FOR SYSTEM_TIME AS OF s.proc as d on s.item_id = d.item_id
GROUP BY
s.item_id,
d.item_name,
d.price;
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:jdbc |
connector.url | 数据库 jdbc url | 必填 |
connector.table | 数据库表名 | 必填 |
connector.driver | jdbc 驱动 | 必填 |
connector.username | 数据库连接用户名 | 必填 |
connector.password | 数据库连接密码 | 必填 |
connector.read.partition.column | 分区column名字 | 选填 |
connector.read.partition.num | 分区数 | 选填 |
connector.read.max-partition.lower-bound | 第一个分区的最小值 | 选填 |
connector.read.max-partition.upper-bound | 最后一个分区的最大值 | 选填 |
connector.lookup.cache.max-rows | 维表缓存数据的最大数量 | 选填 |
connector.lookup.cache.ttl | 维表缓存数据的过期时间 | 选填 |
connector.lookup.max-retries | 维表数据查询异常的尝试次数 | 选填,默认3 |
缓存说明:
connector.lookup.cache.max-rows
connector.lookup.cache.ttl
指定 以上参数后表示需支持缓存,每次维表 JOIN 操作都会先从缓存中匹配,如果匹配不上再执行查询语句。
文档反馈
以上内容对您是否有帮助?