Watermark

watermark 是flink处理数据延迟及乱序的手段,具体细节参考flink中的的watermark的详细设计及作用

如何在sql中使用watermark

DDL任务

CREATE TABLE user_actions (
  user_name STRING,
  data STRING,
  user_action_time TIMESTAMP(3),
  -- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
  WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
  ...
);

SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);

在ddl中添加watermark定义的字段即可使用watermark

Metahub任务

SET table_name.schema.watermark.field_name = 'watermark_strategy_expression';
-- table_name是定义的流表名
-- field_name 是流表中字段名称,字段类型必须是timestamp(3)
-- watermark_strategy_expression watermark定义表达式,例如field_name - INTERVAL '5' SECOND

如果流表没有timestamp(3)类型的字段,又需要将该字段定义为event_time,需要使用计算列方式转为timestamp(3)。

SET 'table_name.computed.field.expression.tsudf' = 'GetTs(ts_str)';
-- table_name是定义的流表名
-- ts_str是流表中字段名称,通过GetTs udf函数返回tsudf字段
-- tsudf是计算后返回的字段名
SET 'table_name.computed.field.type.tsudf' = 'TIMESTAMP';
-- 定义tsudf字段类型为timestamp

SET table_name.schema.watermark.tsudf = 'tsudf - INTERVAL '5' SECOND';
-- 定义watermark 允许延迟5秒钟。

create function GetTs as 'com.netease.udf.GetTsColumns';
-- 定义udf函数,传入string字段,返回timestamp类型字段。

----以上sql相当于执行了:
create table table_name (
    ts_str string,
    tsudf as GetTs(ts_str)
) WITH (...);

create function GetTs as 'com.netease.udf.GetTsColumns';

Watermark 使用问题

  • watermark 推进问题
    在作业中配置了watermark ,web ui 的算子却显示 no watermark
    一般情况下,kafka topic 多分区,实际有分区没有数据的情况会引发该问题 需要在sql 中加如下配置,以保证在分区没有数据的情况下,不继续等待,而是更新watermark
    table.exec.source.idle-timeout = 10s
    参考文档 https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/config.html