TSDB SQL 连接器
更新时间: 2022-05-16 15:27:08
阅读 552
TSDB Sink
简述
Easystream 支持输出到 TSDB。
Flink-1.12
示例
--SQL
--********************************************************************--
--Author: 孙婷婷
--CreateTime: 2021-11-17 11:06:01
--Comment: 请输入业务注释信息
--********************************************************************--
CREATE TABLE source (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_bigint BIGINT,
test_float FLOAT,
-- testDecimal DECIMAL(38, 18),
test_double DOUBLE,
PRIMARY KEY(test_int) NOT ENFORCED
) WITH ('connector' = 'datagen');
CREATE TABLE sink (
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_bigint BIGINT,
test_float FLOAT,
-- testDecimal DECIMAL(38, 18),
test_double DOUBLE
) WITH (
'connector' = 'tsdb',
-- required: specify this table type is jdbc
'master' = 'xxxx:xx',
-- required: JDBC DB url
'username' = 'xxxx',
-- required: jdbc table name
'password' = 'xxxxxx',
-- optional: jdbc user name and password
'database' = 'xxxxx',
'measurement' = 'xxxxxx',
'tags' = 'test_varchar',
'time-field' = 'test_bigint'
);
INSERT INTO
sink
select
test_int,
test_varchar,
test_boolean,
1111111 as test_bigint,
test_float,
-- testDecimal DECIMAL(38, 18),
test_double
FROM
source;
With 参数
参数 | 是否必填 | 默认值 | 类型 | 描述 |
---|---|---|---|---|
connector |
必填 | (none) | String | 指定使用什么类型的连接器,这里应该是‘tsdb’ 。 |
master |
必填 | (none) | String | tsdb的链接地址,多个用逗号隔开 |
username |
必填 | (none) | String | 用户名 |
database |
必填 | (none) | String | databases |
time-field |
必填 | (none) | String | 指定数据的时间字段 |
measurement |
可选 | (none) | String | 指定所有数据的measurement |
measurementField |
可选 | 0 | String | 数据进入哪个measurement由数据的measurementField指定的字段确定 |
username |
可选 | (none) | String | JDBC 用户名。如果指定了 ‘username’ 和 ‘password’ 中的任一参数,则两者必须都被指定。 |
password |
可选 | (none) | String | JDBC 密码。 |
batch-actions |
可选 | 1000 | Integer | 批量操作大小 |
flush-duration |
可选 | 1 | Integer | 数据落盘的间隔,单位秒 |
enable-gzip |
可选 | false | Boolean | 是否开启gzip压缩 |
create-database |
可选 | true | Boolean | 如果database不存在的时候是否自带创建 |
tags |
可选 | 是tag的列,多个用逗号隔开 | String | 例如tag1,tag2 |
Flink-1.10
示例
create table sink(
test_int INT,
test_varchar VARCHAR,
test_boolean BOOLEAN,
test_tinyint TINYINT,
test_smallint SMALLINT,
test_bigint BIGINT,
test_float FLOAT,
test_decimal DECIMAL(38,18),
test_double DOUBLE,
`tag.tag_field1` VARCHAR,
`time.updateTime` TIMESTAMP(3)
) with(
'connector.type'='tsdb',
'url' = 'sloth-tsdb0.dg.163.org:8086',
'username' = 'flink-metrics',
'password' = '***',
'database' = 'test-yxx',
'measurement' = 'testtable',
'mode' = 'cluster',
'batch.enabled'.' = 'true',
'actions' = '1000',
'buffer.limit' = '10000',
'flush.duration' = '1000',
'jitter.duration' = '0',
'consistency.level' = 'one',
'enable.gzip' = 'false',
'create.database' = 'false',
'retention.policy' = 'autogen'
);
TSDB key 说明
tsdb tag key :e.g. tag.tag_field1 使用 tag. 作为前缀表明 tag_field1 是 tag key;
tsdb time key:默认使用 time 字段存储于 tsdb;如果自行指定 point 的时间戳,请使用 time. Timestamp,使用 time. 作为前缀,且为 Timestamp 类型
tsdb field key:除了以上两种字段,其他都为 filed key,且必须含有一个 field key。
With 参数
参数 | 注释说明 | 备注 |
---|---|---|
connector.property-version | connector 配置的版本。 | 必填:1 |
connector.type | Sink 数据源 | 必填:tsdb |
url | tsdb url,ip:port | 必填,tsdb 实例连接,port:tsdb HTTP 端口, 集群模式多个 url 之间用逗号隔开 |
username | tsdb 用户名 | 必填 |
password | 数据库密码 | 选填 |
database | tsdb database | 必填 |
measurement | tsdb 表名 | 必填 |
mode | 连接tsdb模式,single:单点,cluster:集群 | 选填,默认cluster |
update-mode | sink 方式 | 必填;append、upsert,当设置为 upsert时,根据 time、tag fields 作为主键 |
batch.enabled | 是否批量插入 | 选填;默认 true |
actions | 批量 flush 的条数 | 选填;默认1000,单位 ms,batch.enabled=true 时生效 |
buffer.limit | 插入失败的队列长度,系统会对队列数据进行重试,当队列满了之后,最先进入队列的数据将会丢失 | 选填; 默认10000,单位 ms,batch.enabled=true 时生效 |
flush.duration | 周期性 flush 时间间隔 | 选填; 默认1000,单位 ms,batch.enabled=true 时生效 |
jitter.duration | 周期性 flush 时间间隔波动 | 选填; 默认0,单位 ms,a jitter of 5s and flush duration 10s means flushes will happen every 10-15s,batch.enabled=true 时生效 |
consistency.level | 集群一致性设置,设置多少 node 存储了 points 才算插入成功 | 选填; 默认’one’,枚举:’one’ ‘all’ ‘any’ ‘quorum’,batch.enabled=true 时生效 |
enable.gzip | 当 http 请求体是否使用 gzip 压缩 | 选填;默认 false |
create.database | 当 database 不存在时是否自动创建 database | 选填;默认 false |
retention.policy | 数据保存时长 | 选填;tsdb 默认的 rp |
文档反馈
以上内容对您是否有帮助?