HBase SQL 连接器
更新时间: 2022-11-24 13:47:41
阅读 768
HBase 连接器
Flink-1.12
source 表
简述
离线批的方式读取数据
示例
- HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names。 e.g. base_info 映射为 HBase 的 column family name,dt、age、behavior 映射为 HBase 的 qualifier name。
- 必须设置:SET ‘stream_mode’ = ‘false’;
- 以元数据的方式配置krb认证连接时,需要在set中配置相关的文件信息
- ddl 方式
SET 'stream_mode' = 'false';
CREATE TABLE source (
user_id VARCHAR,
base_info row<age VARCHAR>
) WITH (
'connector' = 'hbase',
'connector.version' = '2.2.1',
'table-name' = 'user_dim',
'zookeeper.quorum' = 'a1.service.163.org:2181,a2.service.163.org:2181',
'zookeeper.znode.parent' = '/hbase'
);
- 元数据方式
SET 'stream_mode' = 'false';
set 'user_dim.connector.version' = '1.4.3';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
set 'user_dim.core-site' = 'core-site.xml',
set 'user_dim.hdfs-site' = 'hdfs-site.xml',
set 'user_dim.hbase-site' = 'hbase-site.xml',
set 'user_dim.krb.conf' = 'krb5.conf',
set 'user_dim.krb.keytab' = 'hbase.keytab',
set 'user_dim.krb.principal' = 'hbase/sloth-commerce-test1.jd.163.org@BDMS.163.COM',
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
set 'user_pvuv_sink.connector.version' = '1.4.3';
set 'user_pvuv_sink.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
insert into hbase_docker_one.`default`.user_pvuv_sink
select a.user_id, ROW(age) as base_info
from hbase_docker_one.`default`.user_dim a;
维表
- ddl
-- source
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts VARCHAR
) WITH (
'connector' = 'memory',
'table.name' = 'user_log'
);
--dim
CREATE TABLE dim_table (
user_id VARCHAR,
base_info row<age VARCHAR>
) WITH (
'connector' = 'hbase',
'connector.version' = '2.2.1',
'table-name' = 'user_dim',
'zookeeper.quorum' = 'ddbadmin.service.163.org:2181',
'zookeeper.znode.parent' = '/hbase',
'lookup.cache.type' = 'ALL'
,'lookup.async' = 'true'
);
-- sink
CREATE TABLE pvuvage_sink (
dt VARCHAR,
age INT,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
'table-name' = 'pvuv_age_sink_hbase',
'username' = 'sys',
'password' = 'netease'
);
INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
u.ts,
cast(w.base_info.age as int) as age,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;
- 元数据方式 因元数据中心不支持 row 形式的 schema 配置,需要在set中定义schema
set 'user_dim.connector.version' = '2.2.1';
set 'user_dim.lookup.async' = 'true';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
-- source
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts VARCHAR
) WITH (
'connector' = 'memory',
'table.name' = 'user_log'
);
-- sink
CREATE TABLE pvuvage_sink (
dt VARCHAR,
age INT,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
'table-name' = 'pvuv_age_sink_hbase',
'username' = 'sys',
'password' = 'netease',
'sink.buffer-flush.max-rows' = '1'
);
INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
u.ts,
cast(w.base_info.age as int) as age,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join hbase_docker_two.`default`.user_dim for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;
sink 表
ddl
-- sink CREATE TABLE pvuvage_sink ( user_id VARCHAR, base_info row<age VARCHAR> ) WITH ( 'connector' = 'hbase', 'connector.version' = '2.2.1', 'zookeeper.quorum' = 'ddbadmin.service.163.org:2181', 'zookeeper.znode.parent' = '/hbase', 'table-name' = 'user_pvuv_sink', 'sink.buffer-flush.max-size' = '10mb', 'sink.buffer-flush.max-rows' = '1000', 'sink.buffer-flush.interval' = '1s' );
元数据方式
set 'sloth_ua_one.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<item_id VARCHAR, category_id varchar, behavior varchar, ts VARCHAR>>'; set 'sloth_ua_one.connector.version' = '2.2.1'; -- source CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts VARCHAR ) WITH ( 'connector' = 'memory', 'table.name' = 'user_log' ); INSERT INTO hbase_docker_two.`default`.sloth_ua_one SELECT user_id, ROW(item_id , category_id, behavior, ts) as base_info FROM user_log;
With 参数 / set 参数
- 针对某个catalog设置参数的,需要在下表的参数前添加catalog名称。如上述的: user_dim.connector.version
注意
目前对于带kerberos认证的场景只支持:
- 使用一套 kerberos配置运行任务,即连接的数据源和flink任务运行的hadoop需要相同的kerberos配置、principal等。此时数据源上无须配置krb认证信息。
- hbase与任务运行仅有一个需要kerberos。如果hbase需krb,平台任务提交运行不需要,则按下述参数配置krb认证信息。否则无须配置krb认证信息。
参数 | 注释说明 | 备注 |
---|---|---|
stream_mode | set 的参数,任务类型 | 选填,作为source时必填,且值为false |
connector | source 数据源 | 必填:hbase |
connector.version | hbase client 版本 | 选填:目前支持:1.4.3, 2.2.1,默认2.2.1 |
table-name | hbase 表名 | 必填 |
zookeeper.quorum | HBase 集群配置的 zk 地址,是以,分隔的主机列表加端口号 | 必填 |
zookeeper.znode.parent | 集群配置在 zk 上的路径 | 选填,默认:/hbase |
sink.buffer-flush.max-size | 定义多大 size 缓存在 buffer | 选填:默认2mb。e.g. 3b/k/kb/m/mb/g/gb/t/tb |
sink.buffer-flush.max-rows | 定义多少条缓存在 buffer,一次性 flush | 选填;默认1000 |
sink.buffer-flush.interval | 定义 flush 时间间隔 | 选填;默认1s。’0s’表示同步 flush。e.g. 3d/day/h/hour/min/s/ms |
lookup.async | 作为维表时查询的同异步配置 | 选填。true/false。只有在2.2.1时true才生效,默认false |
lookup.cache.type | 作为维表时查询的缓存策略 | 选填。不区分大小写,all/none/lru。默认none |
lookup.max-retries | 作为维表时查询的缓存最大失败重试次数 | 选填。默认3 |
lookup.cache.ttl | 作为维表时查询的缓存ttl | 选填。默认常驻,不会失效 |
lookup.cache.max-rows | 作为维表时查询的缓存最大行数 | 选填。默认10000 |
lookup.cache.metric.enable | 是否开启缓存监控。监控指标有:XXX.cache.hit-rate:命中率=hit-count/(hit-count+miss-count);XXX.cache.hit-count:命中数;XXX.cache.miss-count:未命中数,=加载成功数+加载异常数;XXX.cache.average-load-cost:每条记录平均耗时,单位ms,total-load-time/(load-success-count+load-exception-count);XXX.cache.load-success-count:缓存加载成功数;XXX.cache.load-exception-count:缓存加载异常数,外部数据库未匹配到join key;XXX.cache.load-exception-rate:缓存加载异常率,当异常率很高时建议开启缓存空值;XXX.cache.total-load-time:总的缓存加载耗时,单位s;XXX.cache.cache-record-count:缓存的记录数 | 选填。默认false |
null-string-literal | 空串存储方式 | 选填。默认为 null字符串 |
sink.parallelism | sink并行度 | 整数,选填。默认为1 |
hbase-site | hbase-site文件名 | 选填,带 krb 认证时必填 |
core-site | core-site文件名 | 选填,带 krb 认证时必填 |
hdfs-site | hdfs-site文件名 | 选填,带 krb 认证时必填 |
krb.conf | krb5.conf文件名 | 选填,带 krb 认证时必填 |
krb.keytab | krb.keytab文件名 | 选填,带 krb 认证时必填 |
krb.principal | krb.principal | 选填,带 krb 认证时必填 |
Flink-1.10
- Flink HBase Connector 官方文档
HBase Sink
简述
Easystream 支持输出到 HBase。示例
CREATE TABLE pvuvage_sink (
hbase_rowkey_name VARCHAR,
base_info row<dt VARCHAR, age INT, behavior VARCHAR>
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.property.version' = '2',
'connector.zookeeper.quorum' = '*',
'connector.zookeeper.znode.parent' = '*',
'connector.table-name' = 'user_pvuv_sink',
'connector.write.buffer-flush.max-size' = '10mb',
'connector.write.buffer-flush.max-rows' = '1000',
'connector.write.buffer-flush.interval' = '1s'
);
- HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names。 e.g. base_info 映射为 HBase 的 column family name,dt、age、behavior 映射为 HBase 的 qualifier name。
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:hbase |
connector.version | hbase client 版本 | 必填:目前支持:1.4.3, 2.2.1 |
connector.property.version | Easystream 版本 | 必填,2 |
connector.table-name | hbase 表名 | 必填 |
connector.zookeeper.quorum | HBase 集群配置的 zk 地址,是以,分隔的主机列表 | 必填 |
connector.zookeeper.znode.parent | 集群配置在 zk 上的路径 | 必填 |
connector.write.buffer-flush.max-size | 定义多大 size 缓存在 buffer | 选填;默认2mb |
connector.write.buffer-flush.max-rows | 定义多少条缓存在 buffer,一次性 flush | 选填;没有默认值 |
connector.write.buffer-flush.interval | 定义 flush 间隔时间,’0s’表示同步 flush | 选填 |
HBase 维表
简述
Easystream 支持使用 HBase 表作为维表进行 join。
示例
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'latest-offset',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
--dim
CREATE TABLE dim_table (
user_id VARCHAR,
base_info row<age VARCHAR, tall varchar>,
info row<gener VARCHAR, children varchar>
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.property.version' = '2',
'connector.table-name' = 'user_dim',
'connector.zookeeper.quorum' = '*',
'connector.zookeeper.znode.parent' = '*',
'connector.lookup.cache.type' = 'lru'
);
-- sink
CREATE TABLE pvuvage_sink (
dt VARCHAR,
age INT,
zhibiao1 BIGINT,
zhibiao2 BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
'connector.table' = 'pvuv_age_sink_hbase',
'connector.username' = '*',
'connector.password' = '*',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO pvuvage_sink
select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, sum(d.tall) as zhibiao1, sum(d.childNum) as zhibiao2
from (
SELECT
u.ts,
cast(w.age as int) as age,
cast(w.tall as bigint) as tall,
cast(w.children as bigint) as childNum,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
left join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:hbase |
connector.version | hbase client 版本 | 必填:目前支持:1.4.3, 2.2.1 |
connector.property.version | Easystream 版本 | 必填,2 |
connector.table-name | hbase 表名 | 必填 |
connector.zookeeper.quorum | HBase 集群配置的 zk 地址,是以,分隔的主机列表 | 必填 |
connector.zookeeper.znode.parent | 集群配置在 zk 上的路径 | 必填 |
connector.lookup.cache.type | 缓存类型 | 选填,默认’none’,支持:’all’, ‘lru’, ‘none’ |
connector.lookup.cache.max-rows | 最大缓存条数 | 选填 ,默认10000条 |
connector.lookup.cache.ttl | 当选择’lru’表示缓存失效时间,默认不过期;当选择’all’表示 reload 间隔时间,默认不 reload | 选填 |
connector.lookup.max-retries | dim 表获取失败时最大重试次数,默认3次 | 选填 |
connector.lookup.async | 作为维表时查询的同异步配置 | 选填。true/false。只有在2.2.1时true才生效,默认false |
- 带 krb 认证的配置同版本1.12
文档反馈
以上内容对您是否有帮助?