HBase 维表

简述

Easystream 支持使用 HBase 表作为维表进行 join。

示例

  1. CREATE TABLE user_log (
  2. user_id VARCHAR,
  3. item_id VARCHAR,
  4. category_id VARCHAR,
  5. behavior VARCHAR,
  6. ts TIMESTAMP
  7. ) WITH (
  8. 'connector.type' = 'kafka',
  9. 'connector.version' = 'universal',
  10. 'connector.topic' = 'user_behavior',
  11. 'connector.startup-mode' = 'latest-offset',
  12. 'connector.properties.zookeeper.connect' = 'xxx',
  13. 'connector.properties.bootstrap.servers' = 'xxx',
  14. 'connector.properties.group.id' = 'xxx',
  15. 'update-mode' = 'append',
  16. 'format.type' = 'json',
  17. 'format.derive-schema' = 'true'
  18. );
  19. --dim
  20. CREATE TABLE dim_table (
  21. user_id VARCHAR,
  22. base_info row<age VARCHAR, tall varchar>,
  23. info row<gener VARCHAR, children varchar>
  24. ) WITH (
  25. 'connector.type' = 'hbase',
  26. 'connector.version' = '1.4.3',
  27. 'connector.property.version' = '2',
  28. 'connector.table-name' = 'user_dim',
  29. 'connector.zookeeper.quorum' = '*',
  30. 'connector.zookeeper.znode.parent' = '*',
  31. 'connector.lookup.cache.type' = 'lru'
  32. );
  33. -- sink
  34. CREATE TABLE pvuvage_sink (
  35. dt VARCHAR,
  36. age INT,
  37. zhibiao1 BIGINT,
  38. zhibiao2 BIGINT
  39. ) WITH (
  40. 'connector.type' = 'jdbc',
  41. 'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
  42. 'connector.table' = 'pvuv_age_sink_hbase',
  43. 'connector.username' = '*',
  44. 'connector.password' = '*',
  45. 'connector.write.flush.max-rows' = '1'
  46. );
  47. INSERT INTO pvuvage_sink
  48. select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, sum(d.tall) as zhibiao1, sum(d.childNum) as zhibiao2
  49. from (
  50. SELECT
  51. u.ts,
  52. cast(w.age as int) as age,
  53. cast(w.tall as bigint) as tall,
  54. cast(w.children as bigint) as childNum,
  55. u.user_id,
  56. u.behavior
  57. FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
  58. left join dim_table for system_time as of u.proc as w
  59. on u.user_id = w.user_id
  60. where w.age > 10
  61. ) as d
  62. GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:hbase
connector.version hbase client 版本 必填:目前支持:1.4.3, 2.2.1
connector.property.version Easystream 版本 必填,2
connector.table-name hbase 表名 必填
connector.zookeeper.quorum HBase 集群配置的 zk 地址,是以,分隔的主机列表 必填
connector.zookeeper.znode.parent 集群配置在 zk 上的路径 必填
connector.lookup.cache.type 缓存类型 选填,默认’none’,支持:’all’, ‘lru’, ‘none’
connector.lookup.cache.max-rows 最大缓存条数 选填 ,默认10000条
connector.lookup.cache.ttl 当选择’lru’表示缓存失效时间,默认不过期;当选择’all’表示 reload 间隔时间,默认不 reload 选填
connector.lookup.max-retries dim 表获取失败时最大重试次数,默认3次 选填