HBase 连接器

官方文档

source 表

简述

离线批的方式读取数据

示例

  • HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names,可以定义多个 column family。 e.g. base_info 映射为 HBase 的 column family name,age 映射为 HBase 的 qualifier name。profile 映射为 HBase 的 column family name,name 映射为 HBase 的 qualifier name。
  • 必须设置:SET 'stream_mode' = 'false';
  • 以元数据的方式配置krb认证连接时,需要在set中配置相关的文件信息
  • ddl 方式
SET 'stream_mode' = 'false';

CREATE TABLE source (
    user_id VARCHAR,
    base_info row<age VARCHAR>,
  `profile` ROW<name String>
) WITH (
    'connector' = 'hbase',
    'connector.version' = '1.4.3',
    'table-name' = 'user_dim',
    'zookeeper.quorum' = 'a1.service.163.org:2181,a2.service.163.org:2181',
    'zookeeper.znode.parent' = '/hbase'
);
  • 元数据方式
SET 'stream_mode' = 'false';

set 'user_dim.connector.version' = '1.4.3';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
set 'user_dim.core-site' = 'core-site.xml',
set 'user_dim.hdfs-site' = 'hdfs-site.xml',
set 'user_dim.hbase-site' = 'hbase-site.xml',
set 'user_dim.krb.conf' = 'krb5.conf',
set 'user_dim.krb.keytab' = 'hbase.keytab',
set 'user_dim.krb.principal' = 'hbase/sloth-commerce-test1.jd.163.org@BDMS.163.COM',
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';

set 'user_pvuv_sink.connector.version' = '1.4.3';
set 'user_pvuv_sink.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';

insert into hbase_docker_one.`default`.user_pvuv_sink
select a.user_id, ROW(age) as base_info
from  hbase_docker_one.`default`.user_dim a;

维表

  • ddl
-- source
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector' = 'memory',
    'table.name' = 'user_log'
);

--dim
CREATE TABLE dim_table (
    user_id VARCHAR,
    base_info row<age VARCHAR>
) WITH (
    'connector' = 'hbase',
    'connector.version' = '2.2.1',
    'table-name' = 'user_dim',
    'zookeeper.quorum' = 'ddbadmin.service.163.org:2181',
    'zookeeper.znode.parent' = '/hbase',
    'lookup.cache.type' = 'ALL'
    ,'lookup.async' = 'true'
);

-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    pv BIGINT,
    uv BIGINT,
    PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'table-name' = 'pvuv_age_sink_hbase',
    'username' = 'sys',
    'password' = 'netease'
);

INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.base_info.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;
  • 元数据方式
set 'user_dim.connector.version' = '2.2.1';
set 'user_dim.lookup.async' = 'true';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';

-- source
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector' = 'memory',
    'table.name' = 'user_log'
);

-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    pv BIGINT,
    uv BIGINT,
    PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'table-name' = 'pvuv_age_sink_hbase',
    'username' = 'sys',
    'password' = 'netease',
    'sink.buffer-flush.max-rows' = '1'
);

INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.base_info.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join hbase_docker_two.`default`.user_dim for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;

sink 表

  • ddl

    -- sink
    CREATE TABLE pvuvage_sink (
      user_id VARCHAR,
      base_info row<age VARCHAR>
    ) WITH (
      'connector' = 'hbase',
      'connector.version' = '2.2.1',
      'zookeeper.quorum' = 'ddbadmin.service.163.org:2181',
      'zookeeper.znode.parent' = '/hbase',
      'table-name' = 'user_pvuv_sink',
      'sink.buffer-flush.max-size' = '10mb',
      'sink.buffer-flush.max-rows' = '1000',
      'sink.buffer-flush.interval' = '1s'
    );
  • 元数据方式

    set 'sloth_ua_one.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<item_id VARCHAR, category_id varchar, behavior varchar, ts VARCHAR>>';
    set 'sloth_ua_one.connector.version' = '2.2.1';
    -- source
    CREATE TABLE user_log (
      user_id VARCHAR,
      item_id VARCHAR,
      category_id VARCHAR,
      behavior VARCHAR,
      ts VARCHAR
    ) WITH (
      'connector' = 'memory',
      'table.name' = 'user_log'
    );
    INSERT INTO
    hbase_docker_two.`default`.sloth_ua_one
    SELECT
    user_id,
    ROW(item_id , category_id, behavior, ts) as base_info
    FROM
    user_log;

    With 参数 / set 参数

  • 针对某个catalog设置参数的,需要在下表的参数前添加catalog名称。如上述的: user_dim.connector.version

注意

目前对于带kerberos认证的场景只支持:

  1. 使用一套 kerberos配置运行任务,即连接的数据源和flink任务运行的hadoop需要相同的kerberos配置、principal等。此时数据源上无须配置krb认证信息。
  2. hbase与任务运行仅有一个需要kerberos。如果hbase需krb,平台任务提交运行不需要,则按下述参数配置krb认证信息。否则无须配置krb认证信息。

配置名称 是否必填 配置生效类型 参数值字段类型 参数值官方默认值 参数说明(用于用户手册)
connector 必填 源表、目标表 String - 必填:hbase
table-name 必填 源表、目标表 String - 连接的 HBase 表名。默认该表在 "default" 命名空间下,指定命名空间下的表需要使用 "namespace:table"。
zookeeper.quorum 必填 源表、目标表 String - HBase Zookeeper 集群信息
zookeeper.znode.parent 可选 源表、目标表 String /hbase HBase 集群的 Zookeeper 根目录
connector.version 可选 源表、目标表 String 2.2.1 指定使用的连接器版本
null-string-literal 可选 源表、目标表 String null 当字符串值为 null 时的存储形式,默认存成 "null" 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。
sink.buffer-flush.max-size 可选 目标表 MemorySize 2mb 每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。
sink.buffer-flush.max-rows 可选 目标表 Integer 1000 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。
sink.buffer-flush.interval 可选 目标表 Duration 1s 刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 "0" 关闭此选项。
sink.parallelism 可选 目标表 Integer - 定义 sink 算子的并行度。默认情况下,并行度由框架定义为与上游串联的算子相同。
lookup.async 可选 维表 Boolean false 是否启用异步查找。注意:异步方式只支持 hbase-2.2 连接器
lookup.cache.max-rows 可选 维表 Long -1 查找缓存的最大行数,超过这个值,最旧的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的
lookup.cache.ttl 可选 维表 Duration 0s 查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:"lookup.cache.max-rows" 和 "lookup.cache.ttl" 必须同时被设置。默认情况下,查找缓存是禁用的。
lookup.max-retries 可选 维表 Integer 3 查找数据库失败时的最大重试次数。
lookup.cache.type 可选 维表 CacheTypeEnum none 维表的缓存策略。 目前支持 NONE(不缓存)和 PARTIAL(只在外部数据库中查找数据时缓存)
lookup.cache.metric.enable 可选 维表 Boolean false 是否启用维表缓存指标。默认关闭。
is.related.mammunt 可选 源表、目标表 Boolean false 是否关联猛犸。默认为false
krb.conf 可选 源表、目标表 String - 使用flink-conf配置的krb信息登录 (包括security.kerberos.login.keytab, security.kerberos.login.principal, -Djava.security.krb5.conf) 由于语法检查无法获得上述配置信息,因此需要在connector中配置。如不使用语法检查,可不配置: krb.conf, krb.principal, krb.keytab。
krb.principal 可选 源表、目标表 String - krb.principal。带krb认证时必填。
krb.keytab 可选 源表、目标表 String - krb.leytab文件。带krb认证时必填。
hbase-site 可选 源表、目标表 String - hbase-site 文件名。带krb认证时必填。
core-site 可选 源表、目标表 String - core-site 文件名。带krb认证时必填。
hdfs-site 可选 源表、目标表 String - hdfs-site 文件名。带krb认证时必填。

CREATE TABLE pvuvage_sink (
    hbase_rowkey_name VARCHAR,
    base_info row<dt VARCHAR, age INT, behavior VARCHAR>
) WITH (
    'connector.type' = 'hbase',
    'connector.version' = '1.4.3',
    'connector.property.version' = '2',
    'connector.zookeeper.quorum' = '*',
    'connector.zookeeper.znode.parent' = '*',
    'connector.table-name' = 'user_pvuv_sink',
    'connector.write.buffer-flush.max-size' = '10mb',
    'connector.write.buffer-flush.max-rows' = '1000',
    'connector.write.buffer-flush.interval' = '1s'
);
  • HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names。 e.g. base_info 映射为 HBase 的 column family name,dt、age、behavior 映射为 HBase 的 qualifier name。

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:hbase
connector.version hbase client 版本 必填:目前支持:1.4.3, 2.2.1
connector.property.version Easystream 版本 必填,2
connector.table-name hbase 表名 必填
connector.zookeeper.quorum HBase 集群配置的 zk 地址,是以,分隔的主机列表 必填
connector.zookeeper.znode.parent 集群配置在 zk 上的路径 必填
connector.write.buffer-flush.max-size 定义多大 size 缓存在 buffer 选填;默认2mb
connector.write.buffer-flush.max-rows 定义多少条缓存在 buffer,一次性 flush 选填;没有默认值
connector.write.buffer-flush.interval 定义 flush 间隔时间,'0s'表示同步 flush 选填

HBase 维表

简述

Easystream 支持使用 HBase 表作为维表进行 join。

示例

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'xxx',
    'connector.properties.bootstrap.servers' = 'xxx',
    'connector.properties.group.id' = 'xxx',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);
--dim
CREATE TABLE dim_table (
    user_id VARCHAR,
    base_info row<age VARCHAR, tall varchar>,
    info row<gener VARCHAR, children varchar>
) WITH (
    'connector.type' = 'hbase',
    'connector.version' = '1.4.3',
    'connector.property.version' = '2',
    'connector.table-name' = 'user_dim',
    'connector.zookeeper.quorum' = '*',
    'connector.zookeeper.znode.parent' = '*',
    'connector.lookup.cache.type' = 'lru'
);

-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    zhibiao1 BIGINT,
    zhibiao2 BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'connector.table' = 'pvuv_age_sink_hbase',
    'connector.username' = '*',
    'connector.password' = '*',
    'connector.write.flush.max-rows' = '1'
);

INSERT INTO pvuvage_sink
select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, sum(d.tall) as zhibiao1, sum(d.childNum) as zhibiao2
from (
SELECT
  u.ts,
  cast(w.age as int)  as age,
  cast(w.tall as bigint) as tall,
  cast(w.children as bigint) as childNum,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
left join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:hbase
connector.version hbase client 版本 必填:目前支持:1.4.3, 2.2.1
connector.property.version Easystream 版本 必填,2
connector.table-name hbase 表名 必填
connector.zookeeper.quorum HBase 集群配置的 zk 地址,是以,分隔的主机列表 必填
connector.zookeeper.znode.parent 集群配置在 zk 上的路径 必填
connector.lookup.cache.type 缓存类型 选填,默认'none',支持:'all', 'lru', 'none'
connector.lookup.cache.max-rows 最大缓存条数 选填 ,默认10000条
connector.lookup.cache.ttl 当选择'lru'表示缓存失效时间,默认不过期;当选择'all'表示 reload 间隔时间,默认不 reload 选填
connector.lookup.max-retries dim 表获取失败时最大重试次数,默认3次 选填
connector.lookup.async 作为维表时查询的同异步配置 选填。true/false。只有在2.2.1时true才生效,默认false
  • 带 krb 认证的配置同版本1.12