Redis Sink

简述

Easystream 支持输出到 Redis。

示例

  1. - source
  2. CREATE TABLE user_log (
  3. user_id VARCHAR,
  4. item_id VARCHAR,
  5. category_id VARCHAR,
  6. behavior VARCHAR,
  7. ts VARCHAR
  8. ) WITH (
  9. 'connector.type' = 'kafka',
  10. 'connector.version' = 'universal',
  11. 'connector.topic' = '***',
  12. 'connector.startup-mode' = 'latest-offset',
  13. 'connector.properties.zookeeper.connect' = 'xxx',
  14. 'connector.properties.bootstrap.servers' = 'xxx',
  15. 'connector.properties.group.id' = 'xxx',
  16. 'update-mode' = 'append',
  17. 'format.type' = 'json',
  18. 'format.derive-schema' = 'true'
  19. );
  20. -- sink
  21. CREATE TABLE pvuvage_sink (
  22. user_id VARCHAR,
  23. item_id VARCHAR
  24. ) WITH (
  25. 'connector.type' = 'redis',
  26. 'primary.key' = 'user_id',
  27. 'host' = '***',
  28. 'port' = '6379',
  29. 'mode' = 'single',
  30. 'db.index' = '1',
  31. 'password' = '***',
  32. 'connector.command' = 'LPUSH'
  33. );
  34. INSERT INTO pvuvage_sink
  35. select user_id, item_id from user_log;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:redis
primary.key redis 主键 必填
host redis servers 必填,多个直接用个逗号分隔
port redis servers 服务端口 选填,默认值为‘6379’
mode redis 集群模式 选填;默认值为single;支持single, sentinel, cluster 3种集群模式;cluster及sentinel模式中,host用逗号分隔,如host,host,host…;sentinel模式需要设置master
master master 地址 当 mode = ‘sentinel’ 时才需要
password redis 密码 当 mode 为 single、sentinel 可选填
db.index 对应 redis 的 db 选填;默认为0;
connector.command 操作 redis 的类型 必填:RPUSH、LPUSH、SADD、SET、PFADD、PUBLISH、ZADD、ZREM、HSET、INCRBY、DECRBY