Watermark
更新时间: 2023-11-13 14:58:32
阅读 45
Watermark
watermark 是flink处理数据延迟及乱序的手段,具体细节参考flink中的的watermark的详细设计及作用
如何在sql中使用watermark
DDL任务
CREATE TABLE user_actions (
user_name STRING,
data STRING,
user_action_time TIMESTAMP(3),
-- declare user_action_time as event time attribute and use 5 seconds delayed watermark strategy
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH (
...
);
SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT user_name)
FROM user_actions
GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
在ddl中添加watermark定义的字段即可使用watermark
Metahub任务
SET table_name.schema.watermark.field_name = 'watermark_strategy_expression';
-- table_name是定义的流表名
-- field_name 是流表中字段名称,字段类型必须是timestamp(3)
-- watermark_strategy_expression 是watermark定义表达式,例如field_name - INTERVAL '5' SECOND
如果流表没有timestamp(3)类型的字段,又需要将该字段定义为event_time,需要使用计算列方式转为timestamp(3)。
SET 'table_name.computed.field.expression.tsudf' = 'GetTs(ts_str)';
-- table_name是定义的流表名
-- ts_str是流表中字段名称,通过GetTs udf函数返回tsudf字段
-- tsudf是计算后返回的字段名
SET 'table_name.computed.field.type.tsudf' = 'TIMESTAMP';
-- 定义tsudf字段类型为timestamp。
SET table_name.schema.watermark.tsudf = 'tsudf - INTERVAL '5' SECOND';
-- 定义watermark 允许延迟5秒钟。
create function GetTs as 'com.netease.udf.GetTsColumns';
-- 定义udf函数,传入string字段,返回timestamp类型字段。
----以上sql相当于执行了:
create table table_name (
ts_str string,
tsudf as GetTs(ts_str)
) WITH (...);
create function GetTs as 'com.netease.udf.GetTsColumns';
Watermark 使用问题
- watermark 推进问题
在作业中配置了watermark ,web ui 的算子却显示 no watermark
一般情况下,kafka topic 多分区,实际有分区没有数据的情况会引发该问题 需要在sql 中加如下配置,以保证在分区没有数据的情况下,不继续等待,而是更新watermark
参考文档 https://nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/config.htmltable.exec.source.idle-timeout = 10s
文档反馈
以上内容对您是否有帮助?