复合事件处理(Complex Event Processing,CEP)是一种基于动态环境中事件流的分析技术,事件在这里通常是有意义的状态变化,通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的时序关系和聚合关系制定检测规则,持续地从事件流中查询出符合要求的事件序列,最终分析得到更复杂的复合事件。
Flink 提供了专门的Flink CEP 库,它包含如下组件:Event Stream、Pattern定义、Pattern检测和生成Alert。

Flink CEP 的特征如下

  • 目标:从有序的简单事件流中发现一些高阶特征;
  • 输入:一个或多个简单事件构成的事件流;
  • 处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件;
  • 输出:满足规则的复杂事件。
    ceptrait

场景 CEP 可用于
① 输入的流数据,尽快产生结果;
② 在2个事件流上,基于时间进行聚合类的计算;
③ 提供实时/准实时的警告和通知;
④ 在多样的数据源中产生关联分析模式;
⑤ 高吞吐、低延迟的处理

详细请参考Flink CEP
需求: 检测一个用户在3秒内连续登陆失败。

测试数据


{"userId":5402,"ip":"83.149.11.115","eventType":"fail","eventTime":1558430815000}
{"userId":5402,"ip":"83.149.11.115","eventType":"fail","eventTime":1558430817000}
{"userId":23064,"ip":"66.249.3.15","eventType":"fail","eventTime":1558430826000}
{"userId":5692,"ip":"80.149.25.29","eventType":"fail","eventTime":1558430833000}
{"userId":7233,"ip":"86.226.15.75","eventType":"fail","eventTime":1558430832000}
{"userId":5692,"ip":"80.149.25.29","eventType":"fail","eventTime":1558430840000}
{"userId":29607,"ip":"66.249.73.135","eventType":"fail","eventTime":1558430849000}
{"userId":29607,"ip":"66.249.73.135","eventType":"success","eventTime":1558430859000}
完整sql

create table appLog(
    userId bigint,
     `ip`  varchar,
    eventType varchar, 
    eventTime bigint,
    ts as PROCTIME()
)with(
  'connector' = 'kafka',
  'topic' = 'app_log',
  'properties.bootstrap.servers' = 'sloth-test1.dg.163.org:9092',
  'properties.group.id' = 'logs_moudle',
  'scan.startup.mode' = 'latest-offset',
  'parallelism' = '1',
  'format' = 'json'
);

create table warning_sink(
    userId bigint,
    firstFailTime timestamp, 
    lastFailTime timestamp, 
    warningMsg varchar
)with(
    'connector' = 'print'
);

insert into warning_sink
select 
userId,
firstFailTime,
lastFailTime,
'连续3s 内登陆失败' as warningMsg
from appLog
    match_recognize(
        partition by userId
        order by ts
        MEASURES
            FIRST(A.ts) AS firstFailTime,
            LAST(A.ts) as lastFailTime
        one row per match
        AFTER MATCH SKIP TO LAST
        PATTERN(A+?) WITHIN INTERVAL '3' SECOND
        DEFINE
            A as A.eventType = 'fail'
    );

    -- PATTERN 子句指定了 该需求对以下模式感兴趣:具有开始事件start_row,然后是一个或多个登陆失败typeFail ,
    并以typeSuccess结束。  AFTER MATCH SKIP TO LAST 子句所示,则从最后一个 typeSuccess 事件开始寻找下一个模式匹配


数据结果:

+I(5402,2021-12-09T11:35:22.242,2021-12-09T11:35:22.242,连续3s 内登陆失败)
+I(5402,2021-12-09T11:35:22.244,2021-12-09T11:35:22.244,连续3s 内登陆失败)
+I(23064,2021-12-09T11:35:22.245,2021-12-09T11:35:22.245,连续3s 内登陆失败)
+I(5692,2021-12-09T11:35:22.245,2021-12-09T11:35:22.245,连续3s 内登陆失败)
+I(7233,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败)
+I(5692,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败)
+I(29607,2021-12-09T11:35:22.246,2021-12-09T11:35:22.246,连续3s 内登陆失败)