TSDB Sink

简述

Easystream 支持输出到 TSDB。

示例

  1. create table sink(
  2. test_int INT,
  3. test_varchar VARCHAR,
  4. test_boolean BOOLEAN,
  5. test_tinyint TINYINT,
  6. test_smallint SMALLINT,
  7. test_bigint BIGINT,
  8. test_float FLOAT,
  9. test_decimal DECIMAL(38,18),
  10. test_double DOUBLE,
  11. `tag.tag_field1` VARCHAR,
  12. `time.updateTime` TIMESTAMP(3)
  13. ) with(
  14. 'connector.type'='tsdb',
  15. 'url' = 'sloth-tsdb0.dg.163.org:8086',
  16. 'username' = 'flink-metrics',
  17. 'password' = '***',
  18. 'database' = 'test-yxx',
  19. 'measurement' = 'testtable',
  20. 'mode' = 'cluster',
  21. 'batch.enabled'.' = 'true',
  22. 'actions' = '1000',
  23. 'buffer.limit' = '10000',
  24. 'flush.duration' = '1000',
  25. 'jitter.duration' = '0',
  26. 'consistency.level' = 'one',
  27. 'enable.gzip' = 'false',
  28. 'create.database' = 'false',
  29. 'retention.policy' = 'autogen'
  30. );

TSDB key 说明

  • tsdb tag key :e.g. tag.tag_field1 使用 tag. 作为前缀表明 tag_field1 是 tag key;

  • tsdb time key:默认使用 time 字段存储于 tsdb;如果自行指定 point 的时间戳,请使用 time. Timestamp,使用 time. 作为前缀,且为 Timestamp 类型

  • tsdb field key:除了以上两种字段,其他都为 filed key,且必须含有一个 field key。

With 参数

参数 注释说明 备注
connector.property-version connector 配置的版本。 必填:1
connector.type Sink 数据源 必填:tsdb
url tsdb url,ip:port 必填,tsdb 实例连接,port:tsdb HTTP 端口, 集群模式多个 url 之间用逗号隔开
username tsdb 用户名 必填
password 数据库密码 选填
database tsdb database 必填
measurement tsdb 表名 必填
mode 连接tsdb模式,single:单点,cluster:集群 选填,默认cluster
update-mode sink 方式 必填;append、upsert,当设置为 upsert时,根据 time、tag fields 作为主键
batch.enabled 是否批量插入 选填;默认 true
actions 批量 flush 的条数 选填;默认1000,单位 ms,batch.enabled=true 时生效
buffer.limit 插入失败的队列长度,系统会对队列数据进行重试,当队列满了之后,最先进入队列的数据将会丢失 选填; 默认10000,单位 ms,batch.enabled=true 时生效
flush.duration 周期性 flush 时间间隔 选填; 默认1000,单位 ms,batch.enabled=true 时生效
jitter.duration 周期性 flush 时间间隔波动 选填; 默认0,单位 ms,a jitter of 5s and flush duration 10s means flushes will happen every 10-15s,batch.enabled=true 时生效
consistency.level 集群一致性设置,设置多少 node 存储了 points 才算插入成功 选填; 默认’one’,枚举:’one’ ‘all’ ‘any’ ‘quorum’,batch.enabled=true 时生效
enable.gzip 当 http 请求体是否使用 gzip 压缩 选填;默认 false
create.database 当 database 不存在时是否自动创建 database 选填;默认 false
retention.policy 数据保存时长 选填;tsdb 默认的 rp