在SQL任务中使用Flink CEP
更新时间: 2021-12-09 19:46:03
阅读 1211
在SQL任务中使用Flink CEP
复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。
Flink 提供了专门的Flink CEP 库,它包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。
Flink CEP 的特征如下
- 目标:从有序的简单事件流中发现一些高阶特征;
- 输入:一个或多个简单事件构成的事件流;
- 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件;
- 输出:满足规则的复杂事件。
场景 CEP 可用于
① 输入的流数据,尽快产生结果;
② 在2个事件流上,基于时间进行聚合类的计算;
③ 提供实时/准实时的警告和通知;
④ 在多样的数据源中产生关联分析模式;
⑤ 高吞吐、低延迟的处理
详细请参考Flink CEP
需求: 检测一个用户在3秒内连续登陆失败。
测试数据
{"userId":5402,"ip":"83.149.11.115","eventType":"fail","eventTime":1558430815000}
{"userId":5402,"ip":"83.149.11.115","eventType":"fail","eventTime":1558430817000}
{"userId":23064,"ip":"66.249.3.15","eventType":"fail","eventTime":1558430826000}
{"userId":5692,"ip":"80.149.25.29","eventType":"fail","eventTime":1558430833000}
{"userId":7233,"ip":"86.226.15.75","eventType":"fail","eventTime":1558430832000}
{"userId":5692,"ip":"80.149.25.29","eventType":"fail","eventTime":1558430840000}
{"userId":29607,"ip":"66.249.73.135","eventType":"fail","eventTime":1558430849000}
{"userId":29607,"ip":"66.249.73.135","eventType":"success","eventTime":1558430859000}
完整sql:
create table appLog(
userId bigint,
`ip` varchar,
eventType varchar,
eventTime bigint,
ts as PROCTIME()
)with(
'connector' = 'kafka',
'topic' = 'app_log',
'properties.bootstrap.servers' = 'sloth-test1.dg.163.org:9092',
'properties.group.id' = 'logs_moudle',
'scan.startup.mode' = 'latest-offset',
'parallelism' = '1',
'format' = 'json'
);
create table warning_sink(
userId bigint,
firstFailTime timestamp,
lastFailTime timestamp,
warningMsg varchar
)with(
'connector' = 'print'
);
insert into warning_sink
select
userId,
firstFailTime,
lastFailTime,
'连续3s 内登陆失败' as warningMsg
from appLog
match_recognize(
partition by userId
order by ts
MEASURES
FIRST(A.ts) AS firstFailTime,
LAST(A.ts) as lastFailTime
one row per match
AFTER MATCH SKIP TO LAST
PATTERN(A+?) WITHIN INTERVAL '3' SECOND
DEFINE
A as A.eventType = 'fail'
);
-- PATTERN 子句指定了 该需求对以下模式感兴趣:具有开始事件start_row,然后是一个或多个登陆失败typeFail ,
并以typeSuccess结束。 如 AFTER MATCH SKIP TO LAST 子句所示,则从最后一个 typeSuccess 事件开始寻找下一个模式匹配
数据结果:
+I(5402,2021-12-09T11:35:22.242,2021-12-09T11:35:22.242,连续3s 内登陆失败)
+I(5402,2021-12-09T11:35:22.244,2021-12-09T11:35:22.244,连续3s 内登陆失败)
+I(23064,2021-12-09T11:35:22.245,2021-12-09T11:35:22.245,连续3s 内登陆失败)
+I(5692,2021-12-09T11:35:22.245,2021-12-09T11:35:22.245,连续3s 内登陆失败)
+I(7233,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败)
+I(5692,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败)
+I(29607,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败)
文档反馈
以上内容对您是否有帮助?