Kudu SQL 连接器
更新时间: 2023-04-24 20:24:46
阅读 239
Kudu 连接器
Flink-1.12/FLINK-1.14
简述
Easystream 支持输出到 Kudu,包括upsert、insert。支持CDC
示例
- ddl
create table kuduTable(
v_string VARCHAR,
v_boolean BOOLEAN,
v_byte TINYINT,
v_short SMALLINT,
v_int int,
v_long BIGINT,
v_float FLOAT,
v_double DOUBLE,
v_decimal DECIMAL(38,18),
v_unixTime TIMESTAMP(3),
v_unixTime1 TIMESTAMP(9) with local time zone,
v_binary VARBINARY
) with(
'connector' = 'kudu',
'update-mode' = 'upsert',
'masters' = 'innosql1.dg.163.org:7051,innosql2.dg.163.org:7051,innosql3.dg.163.org:7051',
'table.name'='sloth-kudu-sink-test',
'flush.mode'='autoFlushSync',
'primary.keys'='v_int'
,'create.table.auto'='true'
,'ignore.delete'='true'
,'ignore.delete.not.exist.error'='true'
);
- 元数据方式
SET 'a.primary.keys' = 'v_string';
SET 'a.update-mode' = 'upsert';
INSERT INTO sloth_kudu_yxx.sloth_kudu_db.a
(v_string, v_long )
SELECT v2.v_string as v_string, cast(v2.v_long as bigint) as v_long FROM v2;
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector | Sink 数据源 | 必填:kudu |
masters | kudu 地址 | 必填:kudu 链接地址 |
table.name | kudu 表名 | 必填 |
primary.keys | kudu 表主键 | 必填;多个用逗号隔开 |
replicas | table 副本数 | 选填;默认:1 |
flush.mode | flush 方式 | 选填;默认 'autoFlushBackground',枚举: 'autoFlushSync' 每次 apply 都同步执行 flush,'autoFlushBackground' 异步批量 flush |
update-mode | sink 方式 | 选填;默认'upsert',枚举:'append'(如果主键重复会抛异常),'upsert' 。两种模式都支持CDC |
admin.operation.timeout.ms | 设置 creatTable 等操作的 timeout | 选填;默认'30000',单位 ms |
operation.timeout.ms | 设置sessions等操作的timeout | 选填;默认'30000',单位 ms |
boss.count | 设置最大boss线程数 | 选填; 默认'1' |
worker.count | 设置最大worker线程数 | 选填; 默认:jvm中有效线程数的两倍,2 * Runtime.getRuntime().availableProcessors() |
flush.interval.millis | 每隔多长时间 flush 数据到 kudu | 选填; 默认'1000',单位ms,flush.mode=autoFlushBackground 时生效 |
mutation.buffer.max.ops | 当 buffer 数据量达到多少时 flush 数据到 kudu | 选填; 默认'1000',flush.mode=autoFlushBackground 时生效 |
ignore.delete | 是否忽略删除操作 | 选填;默认 false,为 true 时表示忽略删除操作 |
create.table.auto | 设置表不存在时自动建表 | 选填; 默认'true',元数据方式时无效 |
ignore.delete.not.exist.error | 忽略删除不存在数据的异常 | 选填; 默认'true',若不忽略,且有脏数据要删除不存在的数据,则会一直报错,任务无法继续 |
Flink-1.10
简述
Easystream 支持输出到 Kudu。
示例
- DDL方式
create table sink(
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
test_decimal DECIMAL(38,18),
test_double DOUBLE,
test_timestamp TIMESTAMP(3),
test_varbinary VARBINARY
) with (
'connector.type'='kudu',
'masters'='kudu1.lt.163.org:7051,kudu2.lt.163.org:7051,kudu3.lt.163.org:7051',
'update-mode'='append',
'primary.keys'='test_varchar',
'flush.mode'='autoFlushSync',
'connector.property-version'='1',
'table.name'='kudu_table_name'
);
- 元数据方式
目标表不需要在数仓定义表 schema 信息,只需定义数据源即可
insert into [catalog].[database].[table] select * from source;
数据源类型 | Catalog | database | table |
---|---|---|---|
Kudu | 数据源登记的名称 | kudu 表名规范:impala::[database].[tableName] | kudu 表名规范:impala::[database].[tableName] |
-- music_useraction_queue_3表的配置信息
set 'music_useraction_queue_3.connections.group.id' = 'test_group_id';
set 'music_useraction_queue_3.connections.startup-mode' = 'latest-offset';
-- kudu表的配置信息
SET 'sloth_kudu_ua.primary.keys' = 'v_string';
SET 'sloth_kudu_ua.update-mode' = 'upsert';
CREATE VIEW v2 AS
SELECT os, userid, appver,logtime
from music_useraction_queue_3;
-- 因为kudu没有database的概念,所以在表名中区分database。kudu表名规范:impala::[database].[tableName]
INSERT INTO sloth_kudu_yxx.sloth_kudu_db.sloth_kudu_ua
(v_string, v_long )
SELECT v2.userid as v_string, cast(v2.userid as bigint) as v_long FROM v2;
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.type | Sink 数据源 | 必填:kudu |
masters | kudu 地址 | 必填:kudu 链接地址 |
connector.property-version | connector 配置的版本 | 必填 :1 |
table.name | kudu 表名 | 必填 |
primary.keys | kudu 表主键 | 必填;多个用逗号隔开 |
replicas | table 副本数 | 选填;默认:1 |
flush.mode | flush 方式 | 选填;默认 'autoFlushBackground',枚举: 'autoFlushSync' 每次 apply 都同步执行 flush,'autoFlushBackground' 异步批量 flush |
update-mode | sink 方式 | 选填;默认'upsert',枚举:'append'(如果主键重复会抛异常),'upsert' ,'retract' |
admin.operation.timeout.ms | 设置 creatTable 等操作的 timeout | 选填;默认'30000',单位 ms |
operation.timeout.ms | 设置sessions等操作的timeout | 选填;默认'30000',单位 ms |
boss.count | 设置最大boss线程数 | 选填; 默认'1' |
worker.count | 设置最大worker线程数 | 选填; 默认:jvm中有效线程数的两倍,2 * Runtime.getRuntime().availableProcessors() |
flush.interval.millis | 每隔多长时间 flush 数据到 kudu | 选填; 默认'1000',单位ms,flush.mode=autoFlushBackground 时生效 |
mutation.buffer.max.ops | 当 buffer 数据量达到多少时 flush 数据到 kudu | 选填; 默认'1000',flush.mode=autoFlushBackground 时生效 |
ignore.delete | 是否忽略删除操作 | 选填;默认 false,为 true 时仅在 sink.mode=upsert 有效,表示忽略删除操作 |
create.table.auto | 设置表不存在时自动建表 | 选填; 默认'true',元数据方式时无效 |
文档反馈
以上内容对您是否有帮助?