HBase 维表
更新时间: 2021-08-26 20:42:29
阅读 1477
HBase 维表
简述
Easystream 支持使用 HBase 表作为维表进行 join。
示例
CREATE TABLE user_log (user_id VARCHAR,item_id VARCHAR,category_id VARCHAR,behavior VARCHAR,ts TIMESTAMP) WITH ('connector.type' = 'kafka','connector.version' = 'universal','connector.topic' = 'user_behavior','connector.startup-mode' = 'latest-offset','connector.properties.zookeeper.connect' = 'xxx','connector.properties.bootstrap.servers' = 'xxx','connector.properties.group.id' = 'xxx','update-mode' = 'append','format.type' = 'json','format.derive-schema' = 'true');--dimCREATE TABLE dim_table (user_id VARCHAR,base_info row<age VARCHAR, tall varchar>,info row<gener VARCHAR, children varchar>) WITH ('connector.type' = 'hbase','connector.version' = '1.4.3','connector.property.version' = '2','connector.table-name' = 'user_dim','connector.zookeeper.quorum' = '*','connector.zookeeper.znode.parent' = '*','connector.lookup.cache.type' = 'lru');-- sinkCREATE TABLE pvuvage_sink (dt VARCHAR,age INT,zhibiao1 BIGINT,zhibiao2 BIGINT) WITH ('connector.type' = 'jdbc','connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test','connector.table' = 'pvuv_age_sink_hbase','connector.username' = '*','connector.password' = '*','connector.write.flush.max-rows' = '1');INSERT INTO pvuvage_sinkselect DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, sum(d.tall) as zhibiao1, sum(d.childNum) as zhibiao2from (SELECTu.ts,cast(w.age as int) as age,cast(w.tall as bigint) as tall,cast(w.children as bigint) as childNum,u.user_id,u.behaviorFROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as uleft join dim_table for system_time as of u.proc as won u.user_id = w.user_idwhere w.age > 10) as dGROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;
With 参数
| 参数 | 注释说明 | 备注 |
|---|---|---|
| connector.type | 维表类型 | 必填:hbase |
| connector.version | hbase client 版本 | 必填:目前支持:1.4.3, 2.2.1 |
| connector.property.version | Easystream 版本 | 必填,2 |
| connector.table-name | hbase 表名 | 必填 |
| connector.zookeeper.quorum | HBase 集群配置的 zk 地址,是以,分隔的主机列表 | 必填 |
| connector.zookeeper.znode.parent | 集群配置在 zk 上的路径 | 必填 |
| connector.lookup.cache.type | 缓存类型 | 选填,默认’none’,支持:’all’, ‘lru’, ‘none’ |
| connector.lookup.cache.max-rows | 最大缓存条数 | 选填 ,默认10000条 |
| connector.lookup.cache.ttl | 当选择’lru’表示缓存失效时间,默认不过期;当选择’all’表示 reload 间隔时间,默认不 reload | 选填 |
| connector.lookup.max-retries | dim 表获取失败时最大重试次数,默认3次 | 选填 |
文档反馈
以上内容对您是否有帮助?