Watermark

简述

语法定义

SET '[TABLENAME].schema.watermark.[FIELDNAME]'='[FIELDNAME] [WATERMARK STRATEGIES]'

TABLENAME 是 Easystream 任务的具体表名

FIELDNAME 是表的字段名称

WATERMARK STRATEGIES 是水印策略,具体可以参考 Flink 官网文档

示例

  1. set 'user_behavior.connections.group.id' = 'test_group_id';
  2. set 'user_behavior.connections.startup-mode' = 'latest-offset';
  3. set 'user_behavior.schema.watermark.ts' = '`ts` - INTERVAL '5' SECOND';
  4. set 'watermark.connector.write.flush.max-rows' = '1';
  5. INSERT INTO sloth_mysql_test.sloth_wangtao.`watermark`
  6. SELECT
  7. mod(cast(v2.user_id as bigint), 10) as user_id,
  8. count(v2.user_id) as pv,
  9. cast(TUMBLE_START(v2.ts, INTERVAL '5' SECOND ) as timestamp )as water_start,
  10. cast(TUMBLE_END(v2.ts, INTERVAL '5' SECOND ) as timestamp )as water_end
  11. FROM wt_kafka_test.user_behavior v2
  12. group by v2.user_id, tumble(v2.ts, INTERVAL '5' SECOND);