Kudu 连接器

简述

Easystream 支持输出到 Kudu,包括upsert、insert。支持CDC

示例

  • ddl
create table kuduTable(
  v_string VARCHAR,
  v_boolean BOOLEAN,
  v_byte TINYINT,
  v_short SMALLINT,
  v_int int,
  v_long BIGINT,
  v_float FLOAT,
  v_double DOUBLE,
  v_decimal DECIMAL(38,18),
  v_unixTime TIMESTAMP(3),
  v_unixTime1 TIMESTAMP(9) with local time zone,
  v_binary VARBINARY
) with(
    'connector' = 'kudu',
    'update-mode' = 'upsert',
    'masters' = 'innosql1.dg.163.org:7051,innosql2.dg.163.org:7051,innosql3.dg.163.org:7051',
    'table.name'='sloth-kudu-sink-test',
    'flush.mode'='autoFlushSync',
    'primary.keys'='v_int'
    ,'create.table.auto'='true'
    ,'ignore.delete'='true'
    ,'ignore.delete.not.exist.error'='true'

);
  • 元数据方式
SET 'a.primary.keys' = 'v_string';
SET 'a.update-mode' = 'upsert';

INSERT INTO sloth_kudu_yxx.sloth_kudu_db.a
 (v_string, v_long )
 SELECT v2.v_string as v_string, cast(v2.v_long as bigint) as v_long FROM v2;

配置参数

配置名称 是否必填 配置生效类型 参数值字段类型 参数默认值 参数说明
connector 必填 目标表 String - 必填:kudu
masters 必填 目标表 String - kudu地址
table.name 必填 目标表 String - kudu表名
primary.keys 必填 目标表 String - kudu表主键,多个使用逗号分隔
update-mode 可选 目标表 String upsert sink 方式,支持的可选项为:append, upsert。
admin.operation.timeout.ms 可选 目标表 Long 30000 设置 creatTable 等操作的 timeout。
operation.timeout.ms 可选 目标表 Long 30000 设置sessions等操作的timeout。
boss.count 可选 目标表 Integer 1 设置最大boss线程数。
worker.count 可选 目标表 Integer - 设置最大worker线程数。默认:jvm中有效线程数的两倍
flush.interval.millis 可选 目标表 Integer 1000 每隔多长时间 flush 数据到 kudu
mutation.buffer.max.ops 可选 目标表 Integer 1000 当 buffer 数据量达到多少时 flush 数据到 kudu
ignore.delete 可选 目标表 Boolean false 是否忽略删除操作
ignore.delete.not.exist.error 可选 目标表 Boolean true 忽略删除不存在数据的异常
flush.mode 可选 目标表 String autoFlushBackground flush 方式,支持到可选项为:autoFlushSync, autoFlushBackground。
create.table.auto 可选 目标表 Boolean true 设置表不存在时自动建表
replicas 可选 目标表 Integer 1 table 副本数

简述

Easystream 支持输出到 Kudu。

示例

  • DDL方式
create table sink(
  test_int INT,
  test_varchar VARCHAR,
  test_boolean BOOLEAN,
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_bigint BIGINT,
  test_float FLOAT,
  test_decimal DECIMAL(38,18),
  test_double DOUBLE,
  test_timestamp TIMESTAMP(3),
  test_varbinary VARBINARY
) with (
  'connector.type'='kudu',
  'masters'='kudu1.lt.163.org:7051,kudu2.lt.163.org:7051,kudu3.lt.163.org:7051',
  'update-mode'='append',
  'primary.keys'='test_varchar',
  'flush.mode'='autoFlushSync',
  'connector.property-version'='1',
  'table.name'='kudu_table_name'
);
  • 元数据方式

目标表不需要在数仓定义表 schema 信息,只需定义数据源即可

insert into [catalog].[database].[table] select * from source;

数据源类型 Catalog database table
Kudu 数据源登记的名称 kudu 表名规范:impala::[database].[tableName] kudu 表名规范:impala::[database].[tableName]
-- music_useraction_queue_3表的配置信息
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
-- kudu表的配置信息
SET 'sloth_kudu_ua.primary.keys' = 'v_string';
SET 'sloth_kudu_ua.update-mode' = 'upsert';
CREATE VIEW v2 AS
    SELECT os, userid, appver,logtime
    from music_useraction_queue_3;
-- 因为kudu没有database的概念,所以在表名中区分databasekudu表名规范:impala::[database].[tableName]
INSERT INTO sloth_kudu_yxx.sloth_kudu_db.sloth_kudu_ua
 (v_string, v_long )
 SELECT v2.userid as v_string, cast(v2.userid as bigint) as v_long FROM v2;

With 参数

参数 注释说明 备注
connector.type Sink 数据源 必填:kudu
masters kudu 地址 必填:kudu 链接地址
connector.property-version connector 配置的版本 必填 :1
table.name kudu 表名 必填
primary.keys kudu 表主键 必填;多个用逗号隔开
replicas table 副本数 选填;默认:1
flush.mode flush 方式 选填;默认 'autoFlushBackground',枚举: 'autoFlushSync' 每次 apply 都同步执行 flush,'autoFlushBackground' 异步批量 flush
update-mode sink 方式 选填;默认'upsert',枚举:'append'(如果主键重复会抛异常),'upsert' ,'retract'
admin.operation.timeout.ms 设置 creatTable 等操作的 timeout 选填;默认'30000',单位 ms
operation.timeout.ms 设置sessions等操作的timeout 选填;默认'30000',单位 ms
boss.count 设置最大boss线程数 选填; 默认'1'
worker.count 设置最大worker线程数 选填; 默认:jvm中有效线程数的两倍,2 * Runtime.getRuntime().availableProcessors()
flush.interval.millis 每隔多长时间 flush 数据到 kudu 选填; 默认'1000',单位ms,flush.mode=autoFlushBackground 时生效
mutation.buffer.max.ops 当 buffer 数据量达到多少时 flush 数据到 kudu 选填; 默认'1000',flush.mode=autoFlushBackground 时生效
ignore.delete 是否忽略删除操作 选填;默认 false,为 true 时仅在 sink.mode=upsert 有效,表示忽略删除操作
create.table.auto 设置表不存在时自动建表 选填; 默认'true',元数据方式时无效