Redis

Redis连接器支持流式写,维表join

FLINK-1.12

DDL写法

1、建表

当redis表作为sink时,需要配置connector.command

CREATE TABLE dim_table (
    user_id VARCHAR,
    age varchar
) WITH (
    'connector' = 'redis',
    'host' = '10.122.173.131',
    'port' = '6379',
    'mode' = 'single',
    'db.index' = '0',
    'password' = 'xxx',
    'lookup.cache.type' = 'all',
    'lookup.cache.ttl' = '3000',
    'primary.key' = 'user_id'
);

CREATE TABLE sink (
    item_id VARCHAR,
    item_type varchar
) WITH (
    'connector' = 'redis',
    'host' = '10.122.173.131',
    'port' = '6379',
    'mode' = 'single',
    'db.index' = '0',
    'password' = 'xxx',
    'lookup.cache.type' = 'all',
    'lookup.cache.ttl' = '3000',
    'primary.key' = 'item_id',
    'connector.command' = 'SET'
);

2、维表join

INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY d.ts, d.age;

Metahub写法

1、建表

redis命令名作为属性头

set 'SADD.db.index' ='4';
set 'SADD.primary.key' = 'item_id';

2、维表join

使用metahub方式的redis作为维表时,redis命令使用GET,关联键名是k,值名是v

INSERT INTO sloth_redis_test.mem.`HSET`
select d.ts as item_id, cast(d.age as string) as item_type, cast(count(*) as string) as `redis.value`
from (
SELECT
  u.ts,
  cast(w.v as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join sloth_redis_test.`default`.`GET` for system_time as of u.proc as w
on u.user_id = w.k
where w.v > 10
) as d
GROUP BY d.ts, d.age;

支持的命令

数据类型 支持的命令
LIST
LPUSH、RPUSH
SET
SADD
STRING
SET、GET、SETEX、INCRBY、DECRBY、DESCRBY_EX
SORTED_SET
ZADD、ZREM
HASH
HSET
HYPER_LOG_LOG
PFADD
PUBSUB
PUBLISH

属性

Key Default Type Description
host
(none) String redis服务端主机名,可以配置成ip1:port1,ip2:port2,此时port属性无效,也可以配置成ip1,ip2,此时使用port属性里的端口号
port
(none) Integer redis服务端的缺省端口号
password
(none) String redis服务端用户密码
mode
(none) String redis服务端模式,比如single、cluster、sentinel
db.index
(none) Integer 使用的redis库的索引值
primary.key
(none) String 指定某字段映射成redis键
connector.command
(none) String redis命令类型
ttl
(none) Integer 存活时长
master
(none) String 主库
lookup.cache.max-rows
(none) Integer 最多缓存键的个数
lookup.cache.ttl
(none) Integer 配置缓存类型为lru后,可配置缓存保留时长
lookup.max-retries
(none) Integer 当没找到缓存后,最多从redis查找的次数
lookup.cache.type
(none) String 缓存类型,可配置none、lru、all

FLINK-1.10

示例

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'xxx',
    'connector.properties.bootstrap.servers' = 'xxx',
    'connector.properties.group.id' = 'xxx',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);
--dim
CREATE TABLE dim_table (
    user_id VARCHAR,
    age varchar
) WITH (
    'connector.type' = 'redis',
    'host' = '*',
    'port' = '*',
    'mode' = 'single',
    'db.index' = '0',
    'password' = '*',
    'connector.lookup.cache.type' = 'all',
    'connector.lookup.cache.ttl' = '3000'
);


-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'connector.table' = 'pvuv_age_sink_redis',
    'connector.username' = '*',
    'connector.password' = '*',
    'connector.write.flush.max-rows' = '1'
);


INSERT INTO pvuvage_sink
select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
left join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:redis
host redis servers 必填,多个直接用个逗号分隔
port redis servers 服务端口 选填,默认值为‘6379’
mode redis 集群模式 选填;默认值为single;支持single, sentinel, cluster 3种集群模式;cluster及sentinel模式中,host用逗号分隔,如host,host,host...;sentinel模式需要设置master
master master 地址 当 mode = 'sentinel' 时才需要
password redis 密码 当 mode 为 single、sentinel 可选填
db.index 对应 redis 的 db 选填;默认为0;
connector.lookup.cache.type 缓存类型 选填,默认'none',支持:'all', 'lru', 'none'
connector.lookup.cache.max-rows 最大缓存条数 ,默认10000条, type 为 lru 时有效 选填
connector.lookup.cache.ttl 当选择'lru'表示缓存失效时间,默认不过期;当选择'all'表示 reload 间隔时间 选填,默认不 reload
connector.lookup.max-retries dim 表获取失败时最大重试次数 选填,默认3次