Redis 维表

简述

Easystream 支持使用 Redis 表作为维表进行 join,支持配置 cache。

示例

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'xxx',
    'connector.properties.bootstrap.servers' = 'xxx',
    'connector.properties.group.id' = 'xxx',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);
--dim
CREATE TABLE dim_table (
    user_id VARCHAR,
    age varchar
) WITH (
    'connector.type' = 'redis',
    'host' = '*',
    'port' = '*',
    'mode' = 'single',
    'db.index' = '0',
    'password' = '*',
    'connector.lookup.cache.type' = 'all',
    'connector.lookup.cache.ttl' = '3000'
);


-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'connector.table' = 'pvuv_age_sink_redis',
    'connector.username' = '*',
    'connector.password' = '*',
    'connector.write.flush.max-rows' = '1'
);


INSERT INTO pvuvage_sink
select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
left join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:redis
host redis servers 必填,多个直接用个逗号分隔
port redis servers 服务端口 选填,默认值为‘6379’
mode redis 集群模式 选填;默认值为single;支持single, sentinel, cluster 3种集群模式;cluster及sentinel模式中,host用逗号分隔,如host,host,host…;sentinel模式需要设置master
master master 地址 当 mode = ‘sentinel’ 时才需要
password redis 密码 当 mode 为 single、sentinel 可选填
db.index 对应 redis 的 db 选填;默认为0;
connector.lookup.cache.type 缓存类型 选填,默认’none’,支持:’all’, ‘lru’, ‘none’
connector.lookup.cache.max-rows 最大缓存条数 ,默认10000条, type 为 lru 时有效 选填
connector.lookup.cache.ttl 当选择’lru’表示缓存失效时间,默认不过期;当选择’all’表示 reload 间隔时间 选填,默认不 reload
connector.lookup.max-retries dim 表获取失败时最大重试次数 选填,默认3次