Redis Sink

简述

Easystream 支持输出到 Redis。

示例

- source
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = '***',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'xxx',
    'connector.properties.bootstrap.servers' = 'xxx',
    'connector.properties.group.id' = 'xxx',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);

-- sink
CREATE TABLE pvuvage_sink (
    user_id VARCHAR,
    item_id VARCHAR
) WITH (
    'connector.type' = 'redis',
    'primary.key' = 'user_id',
    'host' = '***',
    'port' = '6379',
    'mode' = 'single',
    'db.index' = '1',
    'password' = '***',
    'connector.command' = 'LPUSH'
);


INSERT INTO pvuvage_sink
select user_id, item_id from user_log;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:redis
primary.key redis 主键 必填
host redis servers 必填,多个直接用个逗号分隔
port redis servers 服务端口 选填,默认值为‘6379’
mode redis 集群模式 选填;默认值为single;支持single, sentinel, cluster 3种集群模式;cluster及sentinel模式中,host用逗号分隔,如host,host,host…;sentinel模式需要设置master
master master 地址 当 mode = ‘sentinel’ 时才需要
password redis 密码 当 mode 为 single、sentinel 可选填
db.index 对应 redis 的 db 选填;默认为0;
connector.command 操作 redis 的类型 必填:RPUSH、LPUSH、SADD、SET、PFADD、PUBLISH、ZADD、ZREM、HSET、INCRBY、DECRBY