Watermark

简述

语法定义

SET '[TABLENAME].schema.watermark.[FIELDNAME]'='[FIELDNAME] [WATERMARK STRATEGIES]'

TABLENAME 是 Easystream 任务的具体表名

FIELDNAME 是表的字段名称

WATERMARK STRATEGIES 是水印策略,具体可以参考 Flink 官网文档

示例

set 'user_behavior.connections.group.id' = 'test_group_id';

set 'user_behavior.connections.startup-mode' = 'latest-offset';

set 'user_behavior.schema.watermark.ts' = '`ts` - INTERVAL '5' SECOND';

set 'watermark.connector.write.flush.max-rows' = '1';

INSERT INTO sloth_mysql_test.sloth_wangtao.`watermark`
 SELECT
  mod(cast(v2.user_id as bigint), 10) as user_id,
  count(v2.user_id) as pv,
  cast(TUMBLE_START(v2.ts, INTERVAL '5' SECOND ) as timestamp )as water_start,
  cast(TUMBLE_END(v2.ts, INTERVAL '5' SECOND ) as timestamp )as water_end

  FROM wt_kafka_test.user_behavior v2
  group by v2.user_id, tumble(v2.ts, INTERVAL '5' SECOND);