Redis SQL 连接器
更新时间: 2022-01-10 10:25:44
阅读 1500
Redis
Redis连接器支持流式写,维表join
FLINK-1.12
DDL写法
1、建表
当redis表作为sink时,需要配置connector.command
CREATE TABLE dim_table (
user_id VARCHAR,
age varchar
) WITH (
'connector' = 'redis',
'host' = '10.122.173.131',
'port' = '6379',
'mode' = 'single',
'db.index' = '0',
'password' = 'xxx',
'lookup.cache.type' = 'all',
'lookup.cache.ttl' = '3000',
'primary.key' = 'user_id'
);
CREATE TABLE sink (
item_id VARCHAR,
item_type varchar
) WITH (
'connector' = 'redis',
'host' = '10.122.173.131',
'port' = '6379',
'mode' = 'single',
'db.index' = '0',
'password' = 'xxx',
'lookup.cache.type' = 'all',
'lookup.cache.ttl' = '3000',
'primary.key' = 'item_id',
'connector.command' = 'SET'
);
2、维表join
INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
u.ts,
cast(w.age as int) as age,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY d.ts, d.age;
Metahub写法
1、建表
redis命令名作为属性头
set 'SADD.db.index' ='4';
set 'SADD.primary.key' = 'item_id';
2、维表join
使用metahub方式的redis作为维表时,redis命令使用GET,关联键名是k,值名是v
INSERT INTO sloth_redis_test.mem.`HSET`
select d.ts as item_id, cast(d.age as string) as item_type, cast(count(*) as string) as `redis.value`
from (
SELECT
u.ts,
cast(w.v as int) as age,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join sloth_redis_test.`default`.`GET` for system_time as of u.proc as w
on u.user_id = w.k
where w.v > 10
) as d
GROUP BY d.ts, d.age;
支持的命令
数据类型 | 支持的命令 |
---|---|
LIST |
LPUSH、RPUSH |
SET |
SADD |
STRING |
SET、GET、SETEX、INCRBY、DECRBY、DESCRBY_EX |
SORTED_SET |
ZADD、ZREM |
HASH |
HSET |
HYPER_LOG_LOG |
PFADD |
PUBSUB |
PUBLISH |
属性
Key | Default | Type | Description |
---|---|---|---|
host |
(none) | String | redis服务端主机名,可以配置成ip1:port1,ip2:port2,此时port属性无效,也可以配置成ip1,ip2,此时使用port属性里的端口号 |
port |
(none) | Integer | redis服务端的缺省端口号 |
password |
(none) | String | redis服务端用户密码 |
mode |
(none) | String | redis服务端模式,比如single、cluster、sentinel |
db.index |
(none) | Integer | 使用的redis库的索引值 |
primary.key |
(none) | String | 指定某字段映射成redis键 |
connector.command |
(none) | String | redis命令类型 |
ttl |
(none) | Integer | 存活时长 |
master |
(none) | String | 主库 |
lookup.cache.max-rows |
(none) | Integer | 最多缓存键的个数 |
lookup.cache.ttl |
(none) | Integer | 配置缓存类型为lru后,可配置缓存保留时长 |
lookup.max-retries |
(none) | Integer | 当没找到缓存后,最多从redis查找的次数 |
lookup.cache.type |
(none) | String | 缓存类型,可配置none、lru、all |
FLINK-1.10
示例
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'latest-offset',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
--dim
CREATE TABLE dim_table (
user_id VARCHAR,
age varchar
) WITH (
'connector.type' = 'redis',
'host' = '*',
'port' = '*',
'mode' = 'single',
'db.index' = '0',
'password' = '*',
'connector.lookup.cache.type' = 'all',
'connector.lookup.cache.ttl' = '3000'
);
-- sink
CREATE TABLE pvuvage_sink (
dt VARCHAR,
age INT,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
'connector.table' = 'pvuv_age_sink_redis',
'connector.username' = '*',
'connector.password' = '*',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO pvuvage_sink
select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
u.ts,
cast(w.age as int) as age,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
left join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:redis |
host | redis servers | 必填,多个直接用个逗号分隔 |
port | redis servers 服务端口 | 选填,默认值为‘6379’ |
mode | redis 集群模式 | 选填;默认值为single;支持single, sentinel, cluster 3种集群模式;cluster及sentinel模式中,host用逗号分隔,如host,host,host…;sentinel模式需要设置master |
master | master 地址 | 当 mode = ‘sentinel’ 时才需要 |
password | redis 密码 | 当 mode 为 single、sentinel 可选填 |
db.index | 对应 redis 的 db | 选填;默认为0; |
connector.lookup.cache.type | 缓存类型 | 选填,默认’none’,支持:’all’, ‘lru’, ‘none’ |
connector.lookup.cache.max-rows | 最大缓存条数 ,默认10000条, type 为 lru 时有效 | 选填 |
connector.lookup.cache.ttl | 当选择’lru’表示缓存失效时间,默认不过期;当选择’all’表示 reload 间隔时间 | 选填,默认不 reload |
connector.lookup.max-retries | dim 表获取失败时最大重试次数 | 选填,默认3次 |
文档反馈
以上内容对您是否有帮助?