TSDB Sink

简述

Easystream 支持输出到 TSDB。

示例

--SQL
--********************************************************************--
--Author: 孙婷婷
--CreateTime: 2021-11-17 11:06:01
--Comment: 请输入业务注释信息
--********************************************************************--
CREATE TABLE source (
  test_int INT,
  test_varchar VARCHAR,
  test_boolean BOOLEAN,
  test_bigint BIGINT,
  test_float FLOAT,
  --  testDecimal DECIMAL(38, 18),
  test_double DOUBLE,
  PRIMARY KEY(test_int) NOT ENFORCED
) WITH ('connector' = 'datagen');

CREATE TABLE sink (
  test_int INT,
  test_varchar VARCHAR,
  test_boolean BOOLEAN,
  test_bigint BIGINT,
  test_float FLOAT,
  --  testDecimal DECIMAL(38, 18),
  test_double DOUBLE
) WITH (
  'connector' = 'tsdb',
  -- required: specify this table type is jdbc
  'master' = 'xxxx:xx',
  -- required: JDBC DB url
  'username' = 'xxxx',
  -- required: jdbc table name
  'password' = 'xxxxxx',
  -- optional: jdbc user name and password
  'database' = 'xxxxx',
  'measurement' = 'xxxxxx',
  'tags' = 'test_varchar',
  'time-field' = 'test_bigint'
);
INSERT INTO
  sink
select
  test_int,
  test_varchar,
  test_boolean,
  1111111 as test_bigint,
  test_float,
  --  testDecimal DECIMAL(38, 18),
  test_double
FROM
  source;

With 参数

参数 是否必填 默认值 类型 描述
connector
必填 (none) String 指定使用什么类型的连接器,这里应该是‘tsdb’
master
必填 (none) String tsdb的链接地址,多个用逗号隔开
username
必填 (none) String 用户名
database
必填 (none) String databases
time-field
必填 (none) String 指定数据的时间字段
measurement
可选 (none) String 指定所有数据的measurement
measurementField
可选 0 String 数据进入哪个measurement由数据的measurementField指定的字段确定
username
可选 (none) String JDBC 用户名。如果指定了 ‘username’‘password’ 中的任一参数,则两者必须都被指定。
password
可选 (none) String JDBC 密码。
batch-actions
可选 1000 Integer 批量操作大小
flush-duration
可选 1 Integer 数据落盘的间隔,单位秒
enable-gzip
可选 false Boolean 是否开启gzip压缩
create-database
可选 true Boolean 如果database不存在的时候是否自带创建
tags
可选 是tag的列,多个用逗号隔开 String 例如tag1,tag2

示例

create table sink(
  test_int INT,
  test_varchar VARCHAR,
  test_boolean BOOLEAN,
  test_tinyint TINYINT,
  test_smallint SMALLINT,
  test_bigint BIGINT,
  test_float FLOAT,
  test_decimal DECIMAL(38,18),
  test_double DOUBLE,
  `tag.tag_field1` VARCHAR,
  `time.updateTime` TIMESTAMP(3)
) with(
  'connector.type'='tsdb',
  'url' = 'sloth-tsdb0.dg.163.org:8086',
  'username' = 'flink-metrics',
  'password' = '***',
  'database' = 'test-yxx',
  'measurement' = 'testtable',
  'mode' = 'cluster',
  'batch.enabled'.' = 'true',
  'actions' = '1000',
  'buffer.limit' = '10000',
  'flush.duration' = '1000',
  'jitter.duration' = '0',
  'consistency.level' = 'one',
  'enable.gzip' = 'false',
  'create.database' = 'false',
  'retention.policy' = 'autogen'
);

TSDB key 说明

  • tsdb tag key :e.g. tag.tag_field1 使用 tag. 作为前缀表明 tag_field1 是 tag key;

  • tsdb time key:默认使用 time 字段存储于 tsdb;如果自行指定 point 的时间戳,请使用 time. Timestamp,使用 time. 作为前缀,且为 Timestamp 类型

  • tsdb field key:除了以上两种字段,其他都为 filed key,且必须含有一个 field key。

With 参数

参数 注释说明 备注
connector.property-version connector 配置的版本。 必填:1
connector.type Sink 数据源 必填:tsdb
url tsdb url,ip:port 必填,tsdb 实例连接,port:tsdb HTTP 端口, 集群模式多个 url 之间用逗号隔开
username tsdb 用户名 必填
password 数据库密码 选填
database tsdb database 必填
measurement tsdb 表名 必填
mode 连接tsdb模式,single:单点,cluster:集群 选填,默认cluster
update-mode sink 方式 必填;append、upsert,当设置为 upsert时,根据 time、tag fields 作为主键
batch.enabled 是否批量插入 选填;默认 true
actions 批量 flush 的条数 选填;默认1000,单位 ms,batch.enabled=true 时生效
buffer.limit 插入失败的队列长度,系统会对队列数据进行重试,当队列满了之后,最先进入队列的数据将会丢失 选填; 默认10000,单位 ms,batch.enabled=true 时生效
flush.duration 周期性 flush 时间间隔 选填; 默认1000,单位 ms,batch.enabled=true 时生效
jitter.duration 周期性 flush 时间间隔波动 选填; 默认0,单位 ms,a jitter of 5s and flush duration 10s means flushes will happen every 10-15s,batch.enabled=true 时生效
consistency.level 集群一致性设置,设置多少 node 存储了 points 才算插入成功 选填; 默认’one’,枚举:’one’ ‘all’ ‘any’ ‘quorum’,batch.enabled=true 时生效
enable.gzip 当 http 请求体是否使用 gzip 压缩 选填;默认 false
create.database 当 database 不存在时是否自动创建 database 选填;默认 false
retention.policy 数据保存时长 选填;tsdb 默认的 rp