HBase SQL 连接器
更新时间: 2024-01-18 15:53:43
阅读 251
HBase 连接器
Flink-1.12/FLINK-1.14
source 表
简述
离线批的方式读取数据
示例
- HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names,可以定义多个 column family。 e.g. base_info 映射为 HBase 的 column family name,age 映射为 HBase 的 qualifier name。profile 映射为 HBase 的 column family name,name 映射为 HBase 的 qualifier name。
- 必须设置:SET 'stream_mode' = 'false';
- 以元数据的方式配置krb认证连接时,需要在set中配置相关的文件信息
- ddl 方式
SET 'stream_mode' = 'false';
CREATE TABLE source (
user_id VARCHAR,
base_info row<age VARCHAR>,
`profile` ROW<name String>
) WITH (
'connector' = 'hbase',
'connector.version' = '1.4.3',
'table-name' = 'user_dim',
'zookeeper.quorum' = 'a1.service.163.org:2181,a2.service.163.org:2181',
'zookeeper.znode.parent' = '/hbase'
);
- 元数据方式
SET 'stream_mode' = 'false';
set 'user_dim.connector.version' = '1.4.3';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
set 'user_dim.core-site' = 'core-site.xml',
set 'user_dim.hdfs-site' = 'hdfs-site.xml',
set 'user_dim.hbase-site' = 'hbase-site.xml',
set 'user_dim.krb.conf' = 'krb5.conf',
set 'user_dim.krb.keytab' = 'hbase.keytab',
set 'user_dim.krb.principal' = 'hbase/sloth-commerce-test1.jd.163.org@BDMS.163.COM',
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
set 'user_pvuv_sink.connector.version' = '1.4.3';
set 'user_pvuv_sink.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
insert into hbase_docker_one.`default`.user_pvuv_sink
select a.user_id, ROW(age) as base_info
from hbase_docker_one.`default`.user_dim a;
维表
- ddl
-- source
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts VARCHAR
) WITH (
'connector' = 'memory',
'table.name' = 'user_log'
);
--dim
CREATE TABLE dim_table (
user_id VARCHAR,
base_info row<age VARCHAR>
) WITH (
'connector' = 'hbase',
'connector.version' = '2.2.1',
'table-name' = 'user_dim',
'zookeeper.quorum' = 'ddbadmin.service.163.org:2181',
'zookeeper.znode.parent' = '/hbase',
'lookup.cache.type' = 'ALL'
,'lookup.async' = 'true'
);
-- sink
CREATE TABLE pvuvage_sink (
dt VARCHAR,
age INT,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
'table-name' = 'pvuv_age_sink_hbase',
'username' = 'sys',
'password' = 'netease'
);
INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
u.ts,
cast(w.base_info.age as int) as age,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;
- 元数据方式
set 'user_dim.connector.version' = '2.2.1';
set 'user_dim.lookup.async' = 'true';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
-- source
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts VARCHAR
) WITH (
'connector' = 'memory',
'table.name' = 'user_log'
);
-- sink
CREATE TABLE pvuvage_sink (
dt VARCHAR,
age INT,
pv BIGINT,
uv BIGINT,
PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
'table-name' = 'pvuv_age_sink_hbase',
'username' = 'sys',
'password' = 'netease',
'sink.buffer-flush.max-rows' = '1'
);
INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
u.ts,
cast(w.base_info.age as int) as age,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join hbase_docker_two.`default`.user_dim for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;
sink 表
ddl
-- sink CREATE TABLE pvuvage_sink ( user_id VARCHAR, base_info row<age VARCHAR> ) WITH ( 'connector' = 'hbase', 'connector.version' = '2.2.1', 'zookeeper.quorum' = 'ddbadmin.service.163.org:2181', 'zookeeper.znode.parent' = '/hbase', 'table-name' = 'user_pvuv_sink', 'sink.buffer-flush.max-size' = '10mb', 'sink.buffer-flush.max-rows' = '1000', 'sink.buffer-flush.interval' = '1s' );
元数据方式
set 'sloth_ua_one.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<item_id VARCHAR, category_id varchar, behavior varchar, ts VARCHAR>>'; set 'sloth_ua_one.connector.version' = '2.2.1'; -- source CREATE TABLE user_log ( user_id VARCHAR, item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts VARCHAR ) WITH ( 'connector' = 'memory', 'table.name' = 'user_log' ); INSERT INTO hbase_docker_two.`default`.sloth_ua_one SELECT user_id, ROW(item_id , category_id, behavior, ts) as base_info FROM user_log;
With 参数 / set 参数
- 针对某个catalog设置参数的,需要在下表的参数前添加catalog名称。如上述的: user_dim.connector.version
注意
目前对于带kerberos认证的场景只支持:
- 使用一套 kerberos配置运行任务,即连接的数据源和flink任务运行的hadoop需要相同的kerberos配置、principal等。此时数据源上无须配置krb认证信息。
- hbase与任务运行仅有一个需要kerberos。如果hbase需krb,平台任务提交运行不需要,则按下述参数配置krb认证信息。否则无须配置krb认证信息。
配置名称 | 是否必填 | 配置生效类型 | 参数值字段类型 | 参数值官方默认值 | 参数说明(用于用户手册) |
---|---|---|---|---|---|
connector | 必填 | 源表、目标表 | String | - | 必填:hbase |
table-name | 必填 | 源表、目标表 | String | - | 连接的 HBase 表名。默认该表在 "default" 命名空间下,指定命名空间下的表需要使用 "namespace:table"。 |
zookeeper.quorum | 必填 | 源表、目标表 | String | - | HBase Zookeeper 集群信息 |
zookeeper.znode.parent | 可选 | 源表、目标表 | String | /hbase | HBase 集群的 Zookeeper 根目录 |
connector.version | 可选 | 源表、目标表 | String | 2.2.1 | 指定使用的连接器版本 |
null-string-literal | 可选 | 源表、目标表 | String | null | 当字符串值为 null 时的存储形式,默认存成 "null" 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。 |
sink.buffer-flush.max-size | 可选 | 目标表 | MemorySize | 2mb | 每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.buffer-flush.max-rows | 可选 | 目标表 | Integer | 1000 | 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.buffer-flush.interval | 可选 | 目标表 | Duration | 1s | 刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。 |
sink.parallelism | 可选 | 目标表 | Integer | - | 定义 sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。 |
lookup.async | 可选 | 维表 | Boolean | false | 是否启用异步查找。注意:异步方式只支持 hbase-2.2 连接器 |
lookup.cache.max-rows | 可选 | 维表 | Long | -1 | 查找缓存的最大行数,超过这个值,最旧的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的 |
lookup.cache.ttl | 可选 | 维表 | Duration | 0s | 查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的。 |
lookup.max-retries | 可选 | 维表 | Integer | 3 | 查找数据库失败时的最大重试次数。 |
lookup.cache.type | 可选 | 维表 | CacheTypeEnum | none | 维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存) |
lookup.cache.metric.enable | 可选 | 维表 | Boolean | false | 是否启用维表缓存指标。默认关闭。 |
is.related.mammunt | 可选 | 源表、目标表 | Boolean | false | 是否关联猛犸。默认为false |
krb.conf | 可选 | 源表、目标表 | String | - | 使用flink-conf配置的krb信息登录 (包括security.kerberos.login.keytab, security.kerberos.login.principal, -Djava.security.krb5.conf) 由于语法检查无法获得上述配置信息,因此需要在connector中配置。如不使用语法检查,可不配置: krb.conf, krb.principal, krb.keytab。 |
krb.principal | 可选 | 源表、目标表 | String | - | krb.principal。带krb认证时必填。 |
krb.keytab | 可选 | 源表、目标表 | String | - | krb.leytab文件。带krb认证时必填。 |
hbase-site | 可选 | 源表、目标表 | String | - | hbase-site 文件名。带krb认证时必填。 |
core-site | 可选 | 源表、目标表 | String | - | core-site 文件名。带krb认证时必填。 |
hdfs-site | 可选 | 源表、目标表 | String | - | hdfs-site 文件名。带krb认证时必填。 |
Flink-1.10
- Flink HBase Connector 官方文档
HBase Sink
简述
Easystream 支持输出到 HBase。示例
CREATE TABLE pvuvage_sink (
hbase_rowkey_name VARCHAR,
base_info row<dt VARCHAR, age INT, behavior VARCHAR>
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.property.version' = '2',
'connector.zookeeper.quorum' = '*',
'connector.zookeeper.znode.parent' = '*',
'connector.table-name' = 'user_pvuv_sink',
'connector.write.buffer-flush.max-size' = '10mb',
'connector.write.buffer-flush.max-rows' = '1000',
'connector.write.buffer-flush.interval' = '1s'
);
- HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names。 e.g. base_info 映射为 HBase 的 column family name,dt、age、behavior 映射为 HBase 的 qualifier name。
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:hbase |
connector.version | hbase client 版本 | 必填:目前支持:1.4.3, 2.2.1 |
connector.property.version | Easystream 版本 | 必填,2 |
connector.table-name | hbase 表名 | 必填 |
connector.zookeeper.quorum | HBase 集群配置的 zk 地址,是以,分隔的主机列表 | 必填 |
connector.zookeeper.znode.parent | 集群配置在 zk 上的路径 | 必填 |
connector.write.buffer-flush.max-size | 定义多大 size 缓存在 buffer | 选填;默认2mb |
connector.write.buffer-flush.max-rows | 定义多少条缓存在 buffer,一次性 flush | 选填;没有默认值 |
connector.write.buffer-flush.interval | 定义 flush 间隔时间,'0s'表示同步 flush | 选填 |
HBase 维表
简述
Easystream 支持使用 HBase 表作为维表进行 join。
示例
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'latest-offset',
'connector.properties.zookeeper.connect' = 'xxx',
'connector.properties.bootstrap.servers' = 'xxx',
'connector.properties.group.id' = 'xxx',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
--dim
CREATE TABLE dim_table (
user_id VARCHAR,
base_info row<age VARCHAR, tall varchar>,
info row<gener VARCHAR, children varchar>
) WITH (
'connector.type' = 'hbase',
'connector.version' = '1.4.3',
'connector.property.version' = '2',
'connector.table-name' = 'user_dim',
'connector.zookeeper.quorum' = '*',
'connector.zookeeper.znode.parent' = '*',
'connector.lookup.cache.type' = 'lru'
);
-- sink
CREATE TABLE pvuvage_sink (
dt VARCHAR,
age INT,
zhibiao1 BIGINT,
zhibiao2 BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
'connector.table' = 'pvuv_age_sink_hbase',
'connector.username' = '*',
'connector.password' = '*',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO pvuvage_sink
select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, sum(d.tall) as zhibiao1, sum(d.childNum) as zhibiao2
from (
SELECT
u.ts,
cast(w.age as int) as age,
cast(w.tall as bigint) as tall,
cast(w.children as bigint) as childNum,
u.user_id,
u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
left join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | 维表类型 | 必填:hbase |
connector.version | hbase client 版本 | 必填:目前支持:1.4.3, 2.2.1 |
connector.property.version | Easystream 版本 | 必填,2 |
connector.table-name | hbase 表名 | 必填 |
connector.zookeeper.quorum | HBase 集群配置的 zk 地址,是以,分隔的主机列表 | 必填 |
connector.zookeeper.znode.parent | 集群配置在 zk 上的路径 | 必填 |
connector.lookup.cache.type | 缓存类型 | 选填,默认'none',支持:'all', 'lru', 'none' |
connector.lookup.cache.max-rows | 最大缓存条数 | 选填 ,默认10000条 |
connector.lookup.cache.ttl | 当选择'lru'表示缓存失效时间,默认不过期;当选择'all'表示 reload 间隔时间,默认不 reload | 选填 |
connector.lookup.max-retries | dim 表获取失败时最大重试次数,默认3次 | 选填 |
connector.lookup.async | 作为维表时查询的同异步配置 | 选填。true/false。只有在2.2.1时true才生效,默认false |
- 带 krb 认证的配置同版本1.12
文档反馈
以上内容对您是否有帮助?