Watermark
更新时间: 2021-08-26 20:42:29
阅读 1272
Watermark
简述
语法定义
SET '[TABLENAME].schema.watermark.[FIELDNAME]'='[FIELDNAME] [WATERMARK STRATEGIES]'
TABLENAME 是 Easystream 任务的具体表名
FIELDNAME 是表的字段名称
WATERMARK STRATEGIES 是水印策略,具体可以参考 Flink 官网文档
示例
set 'user_behavior.connections.group.id' = 'test_group_id';
set 'user_behavior.connections.startup-mode' = 'latest-offset';
set 'user_behavior.schema.watermark.ts' = '`ts` - INTERVAL '5' SECOND';
set 'watermark.connector.write.flush.max-rows' = '1';
INSERT INTO sloth_mysql_test.sloth_wangtao.`watermark`
SELECT
mod(cast(v2.user_id as bigint), 10) as user_id,
count(v2.user_id) as pv,
cast(TUMBLE_START(v2.ts, INTERVAL '5' SECOND ) as timestamp )as water_start,
cast(TUMBLE_END(v2.ts, INTERVAL '5' SECOND ) as timestamp )as water_end
FROM wt_kafka_test.user_behavior v2
group by v2.user_id, tumble(v2.ts, INTERVAL '5' SECOND);
文档反馈
以上内容对您是否有帮助?