HBase 连接器

官方文档

source 表

简述

离线批的方式读取数据

示例

  • HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names。 e.g. base_info 映射为 HBase 的 column family name,dt、age、behavior 映射为 HBase 的 qualifier name。
  • 必须设置:SET ‘stream_mode’ = ‘false’;
  • 以元数据的方式配置krb认证连接时,需要在set中配置相关的文件信息
  • ddl 方式
SET 'stream_mode' = 'false';

CREATE TABLE source (
    user_id VARCHAR,
    base_info row<age VARCHAR>
) WITH (
    'connector' = 'hbase',
    'connector.version' = '2.2.1',
    'table-name' = 'user_dim',
    'zookeeper.quorum' = 'a1.service.163.org:2181,a2.service.163.org:2181',
    'zookeeper.znode.parent' = '/hbase'
);
  • 元数据方式
SET 'stream_mode' = 'false';

set 'user_dim.connector.version' = '1.4.3';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';
set 'user_dim.core-site' = 'core-site.xml',
set 'user_dim.hdfs-site' = 'hdfs-site.xml',
set 'user_dim.hbase-site' = 'hbase-site.xml',
set 'user_dim.krb.conf' = 'krb5.conf',
set 'user_dim.krb.keytab' = 'hbase.keytab',
set 'user_dim.krb.principal' = 'hbase/sloth-commerce-test1.jd.163.org@BDMS.163.COM',
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';

set 'user_pvuv_sink.connector.version' = '1.4.3';
set 'user_pvuv_sink.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';

insert into hbase_docker_one.`default`.user_pvuv_sink
select a.user_id, ROW(age) as base_info
from  hbase_docker_one.`default`.user_dim a;

维表

  • ddl
-- source
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector' = 'memory',
    'table.name' = 'user_log'
);

--dim
CREATE TABLE dim_table (
    user_id VARCHAR,
    base_info row<age VARCHAR>
) WITH (
    'connector' = 'hbase',
    'connector.version' = '2.2.1',
    'table-name' = 'user_dim',
    'zookeeper.quorum' = 'ddbadmin.service.163.org:2181',
    'zookeeper.znode.parent' = '/hbase',
    'lookup.cache.type' = 'ALL'
    ,'lookup.async' = 'true'
);

-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    pv BIGINT,
    uv BIGINT,
    PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'table-name' = 'pvuv_age_sink_hbase',
    'username' = 'sys',
    'password' = 'netease'
);

INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.base_info.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;
  • 元数据方式 因元数据中心不支持 row 形式的 schema 配置,需要在set中定义schema
set 'user_dim.connector.version' = '2.2.1';
set 'user_dim.lookup.async' = 'true';
set 'user_dim.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<age VARCHAR>>';

-- source
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts VARCHAR
) WITH (
    'connector' = 'memory',
    'table.name' = 'user_log'
);

-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    pv BIGINT,
    uv BIGINT,
    PRIMARY KEY(dt,age) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'table-name' = 'pvuv_age_sink_hbase',
    'username' = 'sys',
    'password' = 'netease',
    'sink.buffer-flush.max-rows' = '1'
);

INSERT INTO pvuvage_sink
select d.ts as dt, d.age, count(*) as pv, count(distinct d.user_id) as uv
from (
SELECT
  u.ts,
  cast(w.base_info.age as int)  as age,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
join hbase_docker_two.`default`.user_dim for system_time as of u.proc as w
on u.user_id = w.user_id
where w.base_info.age > 10
) as d
GROUP BY d.ts, d.age;

sink 表

  • ddl

    -- sink
    CREATE TABLE pvuvage_sink (
      user_id VARCHAR,
      base_info row<age VARCHAR>
    ) WITH (
      'connector' = 'hbase',
      'connector.version' = '2.2.1',
      'zookeeper.quorum' = 'ddbadmin.service.163.org:2181',
      'zookeeper.znode.parent' = '/hbase',
      'table-name' = 'user_pvuv_sink',
      'sink.buffer-flush.max-size' = '10mb',
      'sink.buffer-flush.max-rows' = '1000',
      'sink.buffer-flush.interval' = '1s'
    );
  • 元数据方式

    set 'sloth_ua_one.format.schema' = 'ROW<user_id VARCHAR, base_info ROW<item_id VARCHAR, category_id varchar, behavior varchar, ts VARCHAR>>';
    set 'sloth_ua_one.connector.version' = '2.2.1';
    -- source
    CREATE TABLE user_log (
      user_id VARCHAR,
      item_id VARCHAR,
      category_id VARCHAR,
      behavior VARCHAR,
      ts VARCHAR
    ) WITH (
      'connector' = 'memory',
      'table.name' = 'user_log'
    );
    INSERT INTO
    hbase_docker_two.`default`.sloth_ua_one
    SELECT
    user_id,
    ROW(item_id , category_id, behavior, ts) as base_info
    FROM
    user_log;

    With 参数 / set 参数

  • 针对某个catalog设置参数的,需要在下表的参数前添加catalog名称。如上述的: user_dim.connector.version

注意

目前对于带kerberos认证的场景只支持:

  1. 使用一套 kerberos配置运行任务,即连接的数据源和flink任务运行的hadoop需要相同的kerberos配置、principal等。此时数据源上无须配置krb认证信息。
  2. hbase与任务运行仅有一个需要kerberos。如果hbase需krb,平台任务提交运行不需要,则按下述参数配置krb认证信息。否则无须配置krb认证信息。
参数 注释说明 备注
stream_mode set 的参数,任务类型 选填,作为source时必填,且值为false
connector source 数据源 必填:hbase
connector.version hbase client 版本 选填:目前支持:1.4.3, 2.2.1,默认2.2.1
table-name hbase 表名 必填
zookeeper.quorum HBase 集群配置的 zk 地址,是以,分隔的主机列表加端口号 必填
zookeeper.znode.parent 集群配置在 zk 上的路径 选填,默认:/hbase
sink.buffer-flush.max-size 定义多大 size 缓存在 buffer 选填:默认2mb。e.g. 3b/k/kb/m/mb/g/gb/t/tb
sink.buffer-flush.max-rows 定义多少条缓存在 buffer,一次性 flush 选填;默认1000
sink.buffer-flush.interval 定义 flush 时间间隔 选填;默认1s。’0s’表示同步 flush。e.g. 3d/day/h/hour/min/s/ms
lookup.async 作为维表时查询的同异步配置 选填。true/false。只有在2.2.1时true才生效,默认false
lookup.cache.type 作为维表时查询的缓存策略 选填。不区分大小写,all/none/lru。默认none
lookup.max-retries 作为维表时查询的缓存最大失败重试次数 选填。默认3
lookup.cache.ttl 作为维表时查询的缓存ttl 选填。默认常驻,不会失效
lookup.cache.max-rows 作为维表时查询的缓存最大行数 选填。默认10000
lookup.cache.metric.enable 是否开启缓存监控。监控指标有:XXX.cache.hit-rate:命中率=hit-count/(hit-count+miss-count);XXX.cache.hit-count:命中数;XXX.cache.miss-count:未命中数,=加载成功数+加载异常数;XXX.cache.average-load-cost:每条记录平均耗时,单位ms,total-load-time/(load-success-count+load-exception-count);XXX.cache.load-success-count:缓存加载成功数;XXX.cache.load-exception-count:缓存加载异常数,外部数据库未匹配到join key;XXX.cache.load-exception-rate:缓存加载异常率,当异常率很高时建议开启缓存空值;XXX.cache.total-load-time:总的缓存加载耗时,单位s;XXX.cache.cache-record-count:缓存的记录数 选填。默认false
null-string-literal 空串存储方式 选填。默认为 null字符串
sink.parallelism sink并行度 整数,选填。默认为1
hbase-site hbase-site文件名 选填,带 krb 认证时必填
core-site core-site文件名 选填,带 krb 认证时必填
hdfs-site hdfs-site文件名 选填,带 krb 认证时必填
krb.conf krb5.conf文件名 选填,带 krb 认证时必填
krb.keytab krb.keytab文件名 选填,带 krb 认证时必填
krb.principal krb.principal 选填,带 krb 认证时必填
CREATE TABLE pvuvage_sink (
    hbase_rowkey_name VARCHAR,
    base_info row<dt VARCHAR, age INT, behavior VARCHAR>
) WITH (
    'connector.type' = 'hbase',
    'connector.version' = '1.4.3',
    'connector.property.version' = '2',
    'connector.zookeeper.quorum' = '*',
    'connector.zookeeper.znode.parent' = '*',
    'connector.table-name' = 'user_pvuv_sink',
    'connector.write.buffer-flush.max-size' = '10mb',
    'connector.write.buffer-flush.max-rows' = '1000',
    'connector.write.buffer-flush.interval' = '1s'
);
  • HBase 中 column families 必须声明为 Row 类型,可以不用定义所有column families 和 qualifier names。 e.g. base_info 映射为 HBase 的 column family name,dt、age、behavior 映射为 HBase 的 qualifier name。

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:hbase
connector.version hbase client 版本 必填:目前支持:1.4.3, 2.2.1
connector.property.version Easystream 版本 必填,2
connector.table-name hbase 表名 必填
connector.zookeeper.quorum HBase 集群配置的 zk 地址,是以,分隔的主机列表 必填
connector.zookeeper.znode.parent 集群配置在 zk 上的路径 必填
connector.write.buffer-flush.max-size 定义多大 size 缓存在 buffer 选填;默认2mb
connector.write.buffer-flush.max-rows 定义多少条缓存在 buffer,一次性 flush 选填;没有默认值
connector.write.buffer-flush.interval 定义 flush 间隔时间,’0s’表示同步 flush 选填

HBase 维表

简述

Easystream 支持使用 HBase 表作为维表进行 join。

示例

CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
    'connector.type' = 'kafka',
    'connector.version' = 'universal',
    'connector.topic' = 'user_behavior',
    'connector.startup-mode' = 'latest-offset',
    'connector.properties.zookeeper.connect' = 'xxx',
    'connector.properties.bootstrap.servers' = 'xxx',
    'connector.properties.group.id' = 'xxx',
    'update-mode' = 'append',
    'format.type' = 'json',
    'format.derive-schema' = 'true'
);
--dim
CREATE TABLE dim_table (
    user_id VARCHAR,
    base_info row<age VARCHAR, tall varchar>,
    info row<gener VARCHAR, children varchar>
) WITH (
    'connector.type' = 'hbase',
    'connector.version' = '1.4.3',
    'connector.property.version' = '2',
    'connector.table-name' = 'user_dim',
    'connector.zookeeper.quorum' = '*',
    'connector.zookeeper.znode.parent' = '*',
    'connector.lookup.cache.type' = 'lru'
);

-- sink
CREATE TABLE pvuvage_sink (
    dt VARCHAR,
    age INT,
    zhibiao1 BIGINT,
    zhibiao2 BIGINT
) WITH (
    'connector.type' = 'jdbc',
    'connector.url' = 'jdbc:mysql://10.122.173.167:3306/flink-test',
    'connector.table' = 'pvuv_age_sink_hbase',
    'connector.username' = '*',
    'connector.password' = '*',
    'connector.write.flush.max-rows' = '1'
);

INSERT INTO pvuvage_sink
select DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') as dt, d.age, sum(d.tall) as zhibiao1, sum(d.childNum) as zhibiao2
from (
SELECT
  u.ts,
  cast(w.age as int)  as age,
  cast(w.tall as bigint) as tall,
  cast(w.children as bigint) as childNum,
  u.user_id,
  u.behavior
FROM (select user_id,item_id,category_id,behavior,ts,PROCTIME() as proc from user_log) as u
left join dim_table for system_time as of u.proc as w
on u.user_id = w.user_id
where w.age > 10
) as d
GROUP BY DATE_FORMAT(d.ts, 'yyyy-MM-dd HH:00') , d.age;

With 参数

参数 注释说明 备注
connector.type 维表类型 必填:hbase
connector.version hbase client 版本 必填:目前支持:1.4.3, 2.2.1
connector.property.version Easystream 版本 必填,2
connector.table-name hbase 表名 必填
connector.zookeeper.quorum HBase 集群配置的 zk 地址,是以,分隔的主机列表 必填
connector.zookeeper.znode.parent 集群配置在 zk 上的路径 必填
connector.lookup.cache.type 缓存类型 选填,默认’none’,支持:’all’, ‘lru’, ‘none’
connector.lookup.cache.max-rows 最大缓存条数 选填 ,默认10000条
connector.lookup.cache.ttl 当选择’lru’表示缓存失效时间,默认不过期;当选择’all’表示 reload 间隔时间,默认不 reload 选填
connector.lookup.max-retries dim 表获取失败时最大重试次数,默认3次 选填
connector.lookup.async 作为维表时查询的同异步配置 选填。true/false。只有在2.2.1时true才生效,默认false
  • 带 krb 认证的配置同版本1.12