4_Flink CEP
Flink CEP
1、何为CEP?
CEP,全称为复杂事件处理(Complex Event Processing),是一种用于实时监测和分析数据流的技术。
CEP详细讲解:
CEP是基于动态环境的事件流的分析技术,事件是状态变化(持续生成数据)的。通过分析事件间的关系,利用过滤、关联、聚合等技术,根据事件间的【时序关系和聚合关系】制定检测规则,持续地从事件流中查询出【符合规则要求】的事件序列,最终分析得到更复杂的复合事件。
简单概括:CEP就是对于复杂事件制定规则,筛选所需的事件。
复杂事件的定义:
复杂事件是由一个或多个简单事件按照特定的模式、关系和逻辑组合而成的高级事件,比如说“连续登录失败”,或者“订单支付超时”等等【通常会对这些事件进行警报处理】。
在“连续登录失败”这个案例中,每个单独的登录失败事件都可以被视为一个简单事件,而当这些简单事件在特定时间窗口内连续发生时(例如,连续两次或三次登录失败),它们就构成了一个复杂事件。
2、核心组件
Flink为 CEP 提供了专门的Flink CEP library,其中主要包含了以下的核心组件:Pattern Event Stream
、Pattern Definition(模式定义)
、Pattern Detection(模式检测)
和生成Alert
。
1. 模式定义【规则的具体化过程】
根据业务需求,在Flink的DataStream API上定义出复杂事件的模式条件。
2. 模式检测【规则的执行】
一旦模式条件被定义并应用到DataStream上,Flink CEP引擎就会开始对这些数据流(Event Stream)中的事件进行实时检测。引擎会监听输入流中的事件,并根据之前定义的模式条件来匹配和识别复杂事件。
3. 生成警告(或其他处理逻辑)
当Flink CEP引擎**【检测到】符合模式条件的事件序列时,它会根据定义的【处理逻辑】来执行相应的动作**。其中通常会进行警告处理,如”连续登录失败“这个事件,考虑到会是黑客入侵等操作,从而生成警报(Alter Generation),并通知(Notification)相关人员进行处理。当然也会进行其他处理逻辑,如数据聚合、统计计算、事件转换等。
3、CEP基本流程
目标:从有序的简单事件流中发现一些高阶特征。
复杂事件处理(CEP)的流程可以分成三个步骤:
(1) 定义一个匹配规则
(2) 将匹配规则应用到事件流(由一个或多个简单事件构成)上,检测满足规则的复杂事件
(3) 对检测到的复杂事件进行处理,得到结果进行输出
1.目标:从有序的简单事件流中发现一些高阶特征。
2.输入:一个或多个由简单事件构成的事件流。
3.处理:识别简单事件之间的内在联系,多个符合一定规则的简单事件构成复杂事件。
4.输出:满足规则的复杂事件。
如图所示,输入是不同形状的事件流,我们可以定义一个匹配规则:连续两个圆。那么将这个规则应用到输入流上,就可以检测到两组匹配的复杂事件。它们构成了一个新的“复杂事件流”,流中的数据就变成了一组一组的复杂事件,每个数据都包含了两个圆。接下来,我们就可以针对检测到的复杂事件,处理之后输出一个提示或报警信息了。
4、CEP相关依赖
<properties><flink.version>1.13.2</flink.version>
</properties><!-- flink-CEP(复杂事件规则引擎):制定规则,筛选数据 -->
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
5、Pattern API(模拟API)
实际操作:一般使用Java来编写CEP规则,因为CEP对Scala编写不友好。
模式API可以让你定义想从输入流中抽取的复杂模式序列。
1.1. 基本含义
含义:处理事件的【规则】通常由【多个相互关联的模式(Pattern)组成】。Flink CEP提供了Pattern API用于对输入流数据进行复杂事件规则【制定规则】的定义,用来提取符合规则的事件序列。
一般模式大致分为三类:单个模式,组合模式,模式组。
1.2. 三种模式讲解
1)子事件
/**UserAction 为主事件UserLogin 继承自 UserAction 为子事件
*/
Pattern.<UserAction>begin(...).subType(UserLogin.class).where(new SimpleCondition<UserLogin>() {@Overridepublic boolean filter(UserLogin userAction) throws Exception {return userAction.actType().equals(UserAction.LOGIN());}})...
2)个体模式(Individual Patterns)
“个体模式”实际上是单一对象(单一规则)或简单模式。
定义:个体模式就是组成复杂规则的每一个单独的模式。
特点:个体模式可以是单例模式或者循环模式。
- 单例模式:只接受一个事件。
- 循环模式:可以接受多个事件,并通过指定循环次数来定义接收事件的数量。
个体模式默认为单例模式,可以使用【量词】转换成【循环模式】,每个模式可以有一个或者多个条件来决定它接受哪些事件。
一:量词
含义:量词是给定符合特定条件的事件出现的次数。比如:定义个体模式为“匹配形状为三角形的事件”,再让它循环多次,就变成了“匹配连续多个三角形的事件”。注意这里的“连续”,只要保证前后顺序即可,中间可以有其他事件,所以是“宽松近邻”关系。
在Flink CEP中,你可以通过这些方法指定循环模式(有以下四种基本形式):
pattern.times(#ofTimes)
,指定期望一个给定事件出现特定次数的模式;pattern.times(#fromTimes, #toTimes)
,指定期望一个给定事件出现次数在一个最小值和最大值间;pattern.oneOrMore()
,指定期望一个给定事件出现一次或者多次的模式【无上限】;pattern.timesOrMore(#ofTimes)
,指定一个给定事件出现特定次数或多次的模式;应用范围:量词(如
.oneOrMore()
)是应用于整个由begin()
开始到某个结束条件(如果有的话,比如.until()
)之间的事件序列的。如:当你看到oneOrMore()
应用于某个next
定义的模式时,它意味着该模式(即next
指向的事件类型)可以在其前面的模式(可能是begin
或另一个next
)之后重复出现一次或多次。
下面以 start 为名字的模式对象,解释所涉及到的量词意义【举例】。
greedy():指定期望一个给定事件出现尽可能多次的模式;
optional():指定期望一个给定事件出现0或1次的模式;
// 期望出现4次
start.times(4)// 期望出现0次或者4次
start.times(4).optional()// 期望出现2次、3次或者4次
start.times(2, 4)// 期望出现2次、3次或者4次,并且尽可能多【但不得超过4次】
start.times(2, 4).greedy()// 期望出现0次、2次、3次或者4次
start.times(2, 4).optional()// 期望出现0、2、3或者4次,并且尽可能多【但不得超过4次】
start.times(2, 4).optional().greedy()// 期望出现1到多次
start.oneOrMore()// 期望出现1到多次,并且尽可能多【无上限】
start.oneOrMore().greedy()// 期望出现0到多次
start.oneOrMore().optional()// 期望出现0到多次,并且尽可能多【无上限】
start.oneOrMore().optional().greedy()// 期望出现2到多次
start.timesOrMore(2)// 期望出现2到多次,并且尽可能多【无上限】
start.timesOrMore(2).greedy()// 期望出现0、2或多次
start.timesOrMore(2).optional()// 期望出现0、2或多次,并且尽可能多【无上限】
start.timesOrMore(2).optional().greedy()
二:条件
对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,指定判断事件属性的条件可以通过
pattern.where()
【与】、pattern.or()
【或】或者pattern.until()
【终止】方法。
-
迭代条件【默认】
含义:这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件【上一个模式流入此处的事件】的属性,或者它们的一个子集的统计数据来决定是否接受时间序列的条件。
应用场景:将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。
分析:迭代条件能够获取已经匹配的事件,如果自身又是循环模式(比如量词oneOrMore),那么两者结合就可以捕获自身之前接收的数据,据此来判断是否接受当前事件。这个功能非常强大,我们可以由此实现更加复杂的需求,比如可以要求“只有大于之前数据的平均值,才接受当前事件”。
案例讲解:模式名称为“middle”的循环模式,其可以接受事件发生一次或多次。因此在下面的迭代条件中,我们通过
ctx.getEventsForPattern(“middle”)
获取当前模式已经接受的事件,计算它们的总价(getMoney()
)之和;再加上当前事件中的数量,如果总和小于100,就接受当前事件,否则就不匹配。最终我们的匹配规则就是:循环匹配的所有事件价格(getMoney()
)之和并且总和必须小于 100。middle.oneOrMore().where(new IterativeCondition<Event>() { @Overridepublic boolean filter(Event value, Context<Event> ctx) throws Exception {int sum = value.getMoney();// 获取当前模式之前已经匹配的事件,求所有事件 amount 之和// middle为已定义的模式名称for (Event event : ctx.getEventsForPattern("middle")) { sum += event.getMoney();}// 在总数量小于 100 时,当前事件满足匹配规则,可以匹配成功return sum < 100;}});
-
简单条件
这种类型的条件扩展了前面提到的IterativeCondition类
它决定是否接受一个事件只取决于事件自身的属性。
// 规则定义【简单条件】:2~4个的温度都在40度以上的事件 Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 锁定泛型 => 一个事件.times(2,4) // 量词:2~4个.where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});
-
组合条件
定义:对同一个事件而言,可将当前条件和其他的条件(
where()
,or()
,until()
)结合起来使用。注意:这适用于任何条件,你可以通过依次调用
where()
来组合条件,最终的结果是每个单一条件的结果的逻辑and。如果想使用or来组合条件,你可以像下面这样使用or()方法。// 组合条件:2个[温度都在大于等于40度 or 温度小于等于20度]的事件【非连续】 // or:其中两个判断条件满足其一即可 Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 锁定泛型 => 一个事件.times(2) // 量词:2个.where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40; // 判断条件:温度都在大于等于40度}}).or(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature()<=20; // 判断条件:温度小于等于20度}});
-
停止条件
如果使用循环模式(
oneOrMore()
和oneOrMore().optional()
),可以指定一个停止条件,建议使用.until()
作为停止条件,以便清理状态。until在Flink CEP中的工作方式:一旦until条件被满足,则当前匹配的模式序列将结束,并且不会包含导致until条件满足的那个事件。之后,CEP引擎会开始尝试匹配新的模式序列。
// 终止条件:一旦出现满足温度小于20的事件,则之前所有连续的温度事件的序列结束,开始匹配新的模式序列。 // 注意:不包括满足条件的哪个事件本身。 Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0") // 模式 + 锁定泛型 => 一个事件.oneOrMore().until(new SimpleCondition<TemperatureEvent>() {/*until(condition); 为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。只适用于和oneOrMore()同时使用。另外:在基于事件的条件中,它可用于清理对应模式的状态。*/@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() < 20;}});
3)组合模式(Combining Patterns)
“组合模式”实际是多对象(多规则)组合。
组合模式由一个初始模式作为开头,如下所示:
val start : Pattern[Event, _] = Pattern.begin("start")
严格连续(严格紧邻)
next()
:期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
// 严格连续: 【连续】3个温度在40度以上的事件
Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).next("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).next("stage-2").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});
松散连续(宽松近邻)
followedBy()
:忽略匹配的事件之间的不匹配的事件。理解:当且仅当数据为a,c,b,b时,对于followedBy模式而言命中的为{a,b}
// 模拟数据
new TemperatureEvent("江宁1", 33.0, 1723626529138L),
new TemperatureEvent("江宁2", 36.0, 1723626529569L),
new TemperatureEvent("江宁3", 43.0, 1723626531234L),
new TemperatureEvent("江宁4", 18.0, 1723626532685L),
new TemperatureEvent("江宁5", 18.0, 1723626532996L),// 松散连续: 两个都大于40度的温度【非连续,允许忽略某些元素】
Pattern<TemperatureEvent, TemperatureEvent> rule3 = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).followedBy("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}});最终结果:
--------------------
江宁3 江宁4
--------------------
非确定的松散连续(非确定性宽松近邻)
followedByAny()
:更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。理解:当且仅当数据为a,c,b,b时,对于followedByAny而言会有两次命中{a,b},{a,b}
// 模拟数据
new TemperatureEvent("江宁1", 33.0, 1723626529138L),
new TemperatureEvent("江宁2", 36.0, 1723626529569L),
new TemperatureEvent("江宁3", 43.0, 1723626531234L),
new TemperatureEvent("江宁4", 18.0, 1723626532685L),
new TemperatureEvent("江宁5", 18.0, 1723626532996L),// 不确定的松散连续: 两个都大于40度的温度【非连续,允许忽略某些元素】
Pattern<TemperatureEvent, TemperatureEvent> rule3 = Pattern.<TemperatureEvent>begin("stage-0").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() >= 40;}}).followedByAny("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() == 18;}});最终结果:
--------------------
江宁3 江宁4
江宁3 江宁5
--------------------
除了以上模式序列外,还可以定义“不希望出现某种近邻关系”:
notNext()
:希望后面不紧接着是某个特定事件。notFollowedBy()
:不希望某个特定事件发生在两个事件之间的任何地方。
【温馨提示】①所有模式序列必须以.begin()开始;②模式序列不能以.notFollowedBy()结束;③“not”类型的模式不能被optional所修饰;④可以为模式指定时间约束,用来要求在多长时间内匹配有效。
3)模式组(Pattern Groups)
使用模式组的目的:复用性。
也可以定义一个模式序列作为begin,followedBy,followedByAny和next的条件。这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个
GroupPattern
,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。
// start
val start: Pattern[Event, _] = Pattern.begin(Pattern.begin[Event]("start").where(...).followedBy("start_middle").where(...)
)// 严格连续:通过start生成strict
val strict: Pattern[Event, _] = start.next(Pattern.begin[Event]("next_start").where(...).followedBy("next_middle").where(...)
).times(3)// 松散连续:通过start生成relaxed
val relaxed: Pattern[Event, _] = start.followedBy(Pattern.begin[Event]("followedby_start").where(...).followedBy("followedby_middle").where(...)
).oneOrMore()// 不确定松散连续:通过start生成nonDetermin
val nonDetermin: Pattern[Event, _] = start.followedByAny(Pattern.begin[Event]("followedbyany_start").where(...).followedBy("followedbyany_middle").where(...)
).optional()
1.3. 匹配后跳过策略
对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。 有五种跳过策略,如下:
-
NO_SKIP
: 每个成功的匹配都会被输出,不跳过任何环节【默认】。 -
SKIP_TO_NEXT
: 保留相同事件开始的第一条数据。目的:避免重复处理相同起点的多个匹配,从而节省计算资源和提高处理效率。
具体体现:如果检测到一个匹配,并且该匹配以某个特定事件(事件A)为起始点,那么系统将会丢弃(即不再考虑)所有其他以该事件(事件A)开始但尚未完成或正在进行的其他部分匹配。
举例:在(A1,A2,A3,C1,A4,A5,C2,C3,C4)中,条件是两个连续A事件,后面跟随C事件[followedByAny()]。若不使用
SKIP_TO_NEXT
跳过策略,输出结果为:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2),(A4,A5,C3),(A4,A5,C4)。若使用SKIP_TO_NEXT
跳过策略,输出结果为:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2)。 -
SKIP_PAST_LAST_EVENT
: 只保留第一条数据。目的:减少冗余匹配,明确匹配边界。
具体体现:在成功匹配一个复杂事件模式后,系统将丢弃(不再考虑)从当前匹配的开始事件到结束事件之间的所有其他部分匹配或未完成的匹配序列。
举例:在(A1,A2,A3,C1,A4,A5,C2,C3,C4)中,条件是两个连续A事件,后面跟随C事件[followedByAny()]。若不使用
SKIP_PAST_LAST_EVENT
跳过策略,则输出的结果为:(A1,A2,C1),(A2,A3,C1),(A4,A5,C2),(A4,A5,C3),(A4,A5,C4)。若使用SKIP_PAST_LAST_EVENT
跳过策略,则输出的结果为:(A1,A2,C1),(A4,A5,C2)。 -
SKIP_TO_FIRST
: 当匹配到某个模式的开始事件时,如果在匹配过程中遇到无法匹配的事件,它会跳到模式中第一个匹配事件之后继续处理。目的:处理和过滤事件流,修改起始点
具体体现:一旦模式匹配到定义了
SKIP_TO_FIRST
的相应阶段,就会跳过之前的阶段,直接从指定的模式处(a2
)开始处理后续事件,但已经匹配到的序列仍然被认为是有效的。举例:
假设事件序列是
(a1 a2 a3 b)
,并且你使用SKIP_TO_FIRST
策略,以a2
为SKIP_TO_FIRST
指定的开始事件。若中途遇到无法匹配的,它会跳到模式中第一个匹配事件(也就是a2
)之后继续处理,相当于只留下 a2 开始的匹配【初始时,已经匹配到的序列(a1 a2 a3 b)中a1仍然被认为是有效的。遇到无法匹配后,起始点从a1
变为a2
,此后就从a2
开始,前面的a1
就不在考虑了】。最终得到的匹配顺序是(a1 a2 a3 b),(a2 a3 b),(a2 b)。若未遇到不匹配现象,则顺序为(a1 a2 a3 b)。 -
SKIP_TO_LAST
: 跳过模式的开始事件的所有匹配,从模式的最后一个事件之后的事件开始新的匹配。目的:避免重复匹配
具体体现:跳过所有从模式开始的事件,而从模式中的最后一个事件之后的位置继续处理后续事件。
举例:
假设事件序列是
(a1 a2 a3 b)
,并且你使用SKIP_TO_LAST
策略,找到 a1 开始的匹配(a1 a2 a3 b)后,跳过所有 a1、a2 开始的匹配,跳到以最后一个 a(也就是 a3)为开始的匹配。最终得到(a1 a2 a3 b),(a3 b)。其中若未出现不匹配现象,则最终得到(a1 a2 a3 b)。
【温馨提示】当使用SKIP_TO_FIRST
和SKIP_TO_LAST
策略时,需要指定一个合法的PatternName。
应用基本格式:
// 定义跳过策略
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("stage-2");Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-0",skipStrategy) // 添加跳过策略.where(...)
1.4. 有效时间约束(within)
可以为模式定义一个有效时间约束。 例如,你可以通过pattern.within()
方法指定一个模式应该在10秒内发生。 这种时间模式支持处理时间和事件时间。
【温馨提示】一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。
next.within(Time.seconds(10))
错误总结
①
Option not applicable to NOT pattern
: 量词不和not连用②
Optional pattern cannot be preceded by greedy pattern
:greedy 之后不能接 optional③
NotFollowedBy is not supported as a last part of a Pattern
:notFollowedBy不能是规则的最后部分
2、模式的检测处理
一、Pattern检测(将模式应用于流上)
含义:将匹配规则应用到事件流(由一个或多个简单事件构成)上,检测满足规则的复杂事件
为了在事件流上运行所创建的模式(rule),需要创建一个patStream。 给定一个输入流input,一个模式rule, 可以通过调用如下方法来创建patStream:
PatternStream<UserAction> patStream = CEP.pattern(stream, rule);
二、处理匹配事件
含义:创建 patStream 之后,就可以应用 select 或者 flatselect 方法,从检测到的事件序列中提取事件。
- select() 方法需要输入一个
PatternSelectFunction
作为参数,返回一个结果。 - flatselect() 方法需要输入一个
PatternFlatSelectFunction
作为参数,返回任意多个结果。
select() 以一个 Map[String,Iterable [IN]] 来接收匹配到的事件序列,其中 【key】 就是每个【模式名称】,而 【value】 就是所有接收到的【所有事件】的 Iterable 类型。
patStream.select(new PatternSelectFunction<TemperatureEvent, String>() {// key:模式;value:事件@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {List<TemperatureEvent> events = pattern.get("stage-0"); // 根据key来获取value// 业务:获取事件的第一条和最后一条List<TemperatureEvent> sortedEvents = events.stream().sorted((a, b) -> (int) (a.getTimestamp() - b.getTimestamp())) // 排序.collect(Collectors.toList());TemperatureEvent first = sortedEvents.get(0); // 第一条事件TemperatureEvent last = sortedEvents.get(sortedEvents.size() - 1); // 最后一条事件double diff = (last.getTimestamp() - first.getTimestamp()) / 1000.0;return String.format("%s %s %.2f",first.getDistrict(),last.getDistrict(),diff);}
})
三、超时事件提取
应用原因:当一个模式通过within关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度被丢弃,为了能够处理这些超时的部分匹配,select和flatSelectAPI调用允许指定超时处理程序。
Pattern<UserLoginEvent, ?> pattern = Pattern.<UserLoginEvent>begin("start") .where(event -> !event.isSuccess()) // 第一个事件是登录失败 .next("middle").where(new SimpleCondition<UserLoginEvent>() { @Override public boolean filter(UserLoginEvent event) { return !event.isSuccess(); // 中间事件也是登录失败 } }) .oneOrMore() // 表示“middle”模式可以重复一次或多次 .consecutive() // 强调这些事件必须是连续的 .next("end").where(new SimpleCondition<UserLoginEvent>() { @Override public boolean filter(UserLoginEvent event) { return !event.isSuccess(); // 最后一个事件也是登录失败 } }) .within(Time.seconds(10)); // 所有这些事件必须在10秒内发生(时间的限制)
解决方案:超时处理程序会被接收到目前为止由模式匹配到的所有事件,由一个OutputTag定义接收到的超时事件序列(超时事件会放到另外【侧输出流】中)。
// Pattern检测
PatternStream<Event> patStream = CEP.pattern(stream, rule);// 侧输出流
OutputTag<String> outputTag = new OutputTag<String>("late-data"){};// 处理模式流并捕获迟到数据
SingleOutputStreamOperator<ComplexEvent> result = patStream.sideOutputLateData(outputTag).select(new PatternSelectFunction<Event, ComplexEvent>() {...});// 获取侧输出流
DataStream<String> lateData = result.getSideOutput(outputTag);
3、实际应用
实例1:监测某地的温度并告警
监测某地的温度在一小时内连续三次大于设定温度,则进行告警处理。
步骤一:定义事件:温度数据
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
import java.io.Serializable;@Data
@AllArgsConstructor
@ToString
// 模拟天气数据
public class TemperatureEvent implements Serializable {private String mechineName; // 区域private Double temperature; // 温度private Long timestamp; // 数据生成时间
}
步骤二:创建Flink环境并配置CEP
import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.util.Arrays;
import java.util.List;
import java.util.Map;// 监测某地的温度在一小时内连续三次大于设定温度,则进行告警处理
public class TempCEP {public static void main(String[] args) throws Exception {// 创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 模拟事件SingleOutputStreamOperator<TemperatureEvent> stream = see.fromCollection(Arrays.asList(new TemperatureEvent("江宁1", 33.0, 1723626529138L),new TemperatureEvent("江宁2", 36.0, 1723626529569L),new TemperatureEvent("江宁3", 43.0, 1723626531234L),new TemperatureEvent("江宁4", 18.0, 1723626532996L),new TemperatureEvent("江宁5", 39.0, 1723626533247L),new TemperatureEvent("江宁6", 41.0, 1723626533888L),new TemperatureEvent("江宁7", 41.0, 1723626534096L),new TemperatureEvent("江宁8", 42.0, 1723626535011L),new TemperatureEvent("江宁9", 42.0, 1723626535011L),new TemperatureEvent("江宁10", 41.0, 1723626535011L),new TemperatureEvent("江宁11", 52.0, 1723626535555L))).assignTimestampsAndWatermarks(// 生成单调递增的水印,其中10 表示最大允许的延迟时间(即水印的间隔时间)【毫秒】Timer.monotonous(60));// 设定预警温度double TEMPERATURE_SETTING = 40;// 定义CEP模式Pattern<TemperatureEvent, TemperatureEvent> rule = Pattern.<TemperatureEvent>begin("stage-1").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).next("stage-2").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).next("stage-3").where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent event) throws Exception {return event.getTemperature() > TEMPERATURE_SETTING;}}).within(Time.hours(1));// 在模式匹配时执行操作PatternStream<TemperatureEvent> patStream = CEP.pattern(stream, rule);patStream.select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {String district1 = pattern.get("stage-1").get(0).getDistrict();String district2 = pattern.get("stage-2").get(0).getDistrict();String district3 = pattern.get("stage-3").get(0).getDistrict();Double first = pattern.get("stage-1").get(0).getTemperature();Double second = pattern.get("stage-2").get(0).getTemperature();Double third = pattern.get("stage-3").get(0).getTemperature();return String.format("告警: 地点 %s -> %s -> %s,温度在一小时内连续三次超过 40 度!,分别是%.2f度,%.2f度,%.2f度",district1,district2,district3,first,second,third);}}).print();see.execute("CEP_TEMPERATURE");}
}
结果展示:
案例2:异常检测 - 机箱温度检测
需求:同一个机箱连续两次温度超标,报警
拓展需求:锅炉房温度检测;信用卡反欺诈:连续大额消费;反作弊:同一个用户短时间内连续登陆失败
- flink cep
- pattern定义
- pattern匹配
- select 选出匹配到的事件,报警
步骤一:定义事件:机箱数据
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;import java.io.Serializable;@Data
@AllArgsConstructor
@ToString
// 模拟机箱数据
public class TemperatureEvent implements Serializable {private String chassis; // 机箱private Double temperature; // 温度
}
步骤二:具体情况分析
1)连续三次高于26度
package cases.test05;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Detection {public static void main(String[] args) throws Exception {// 环境配置StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟机箱的数据DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),new TemperatureEvent("xyz", 20.1),new TemperatureEvent("xyz", 21.1),new TemperatureEvent("xyz", 22.2),new TemperatureEvent("xyz", 22.1),new TemperatureEvent("xyz", 26.3),new TemperatureEvent("xyz", 28.1),new TemperatureEvent("xyz", 27.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 26.7),new TemperatureEvent("xyz", 27.0))// 设置水位线.assignTimestampsAndWatermarks(WatermarkStrategy.<TemperatureEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TemperatureEvent>() {@Overridepublic long extractTimestamp(TemperatureEvent temperatureEvent, long l) {return System.currentTimeMillis();}}));// 连续三次高于26度Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("fgt261").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}}).next("fgt262").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}}).next("fgt263").where(new SimpleCondition<TemperatureEvent>() {private static final long serialVersionUID = 1L;@Overridepublic boolean filter(TemperatureEvent value) throws Exception {return value.getTemperature()>=26;}});DataStream<String> patternStream = CEP.pattern(inputEventStream,warningPattern).select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {Double first = pattern.get("fgt261").get(0).getTemperature();Double second = pattern.get("fgt262").get(0).getTemperature();Double third = pattern.get("fgt263").get(0).getTemperature();return String.format("告警: 机箱连续三次高于26度,分别是%.2f度,%.2f度,%.2f度",first,second,third);}});patternStream.print();env.execute("Detection");}
}
结果展示:
2)3秒内3次以上平均温度超过26度
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;import java.time.Duration;
import java.util.List;
import java.util.Map;public class Detection {public static void main(String[] args) throws Exception {// 环境配置StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 模拟机箱的数据DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0),new TemperatureEvent("xyz", 20.1),new TemperatureEvent("xyz", 21.1),new TemperatureEvent("xyz", 22.2),new TemperatureEvent("xyz", 22.1),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 25.1),new TemperatureEvent("xyz", 27.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 26.7),new TemperatureEvent("xyz", 27.0),new TemperatureEvent("xyz", 24.4),new TemperatureEvent("xyz", 22.3),new TemperatureEvent("xyz", 21.0))// 设置水位线.assignTimestampsAndWatermarks(WatermarkStrategy.<TemperatureEvent>forBoundedOutOfOrderness(Duration.ofSeconds(2)).withTimestampAssigner(new SerializableTimestampAssigner<TemperatureEvent>() {@Overridepublic long extractTimestamp(TemperatureEvent temperatureEvent, long l) {return System.currentTimeMillis();}}));// 3秒内3次以上平均温度超过26度Pattern<TemperatureEvent,?> warningPattern = Pattern.<TemperatureEvent>begin("fgt261") // 3秒内至少3次.timesOrMore(3).where(new SimpleCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent temperatureEvent) throws Exception {return temperatureEvent.getTemperature()>26;}}).within(Time.seconds(3)).next("fgt262") // 平均温度超过26度.where(new IterativeCondition<TemperatureEvent>() {@Overridepublic boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {int count = 0;double avg = 0;for (TemperatureEvent te : ctx.getEventsForPattern("fgt261")) {count++;avg += te.getTemperature();}return count>=3 && avg/count>=26;}});DataStream<String> patternStream = CEP.pattern(inputEventStream,warningPattern).select(new PatternSelectFunction<TemperatureEvent, String>() {@Overridepublic String select(Map<String, List<TemperatureEvent>> pattern) throws Exception {return pattern.toString();}});patternStream.print();env.execute("Detection");}
}
结果展示:
案例3:登录事件异常检测
同一个用户连续两次登陆失败,报警
- flink cep
- pattern定义
- pattern匹配
- select输出报警事件
//需求: 如果同一个userid在三秒之内连续两次登陆失败,报警。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);// 这里mock了事件流,这个事件流一般从Kafka过来
DataStream<LoginEvent> loginEventStream = env.fromCollection(Arrays.asList( //自定义一个pojo类:userId、ip、typenew LoginEvent("1", "192.168.0.1", "fail"),new LoginEvent("1", "192.168.0.2", "fail"),new LoginEvent("1", "192.168.0.3", "fail"),new LoginEvent("2", "192.168.10.10", "success")
));Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("start")//泛型类或泛型接口上的泛型形参是不能用于静态成员的,那么当静态方法需要用到泛型时,只能用泛型方法。.where(new IterativeCondition<LoginEvent>() { // 模式开始事件的匹配条件为事件类型为fail, 为迭代条件@Overridepublic boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {return loginEvent.getType().equals("fail");}}).next("next").where(new IterativeCondition<LoginEvent>() { // 事件的匹配条件为事件类型为fail@Overridepublic boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {return loginEvent.getType().equals("fail");}}).within(Time.seconds(3));// 要求紧邻的两个事件发生的时间间隔不能超过3秒钟// 以userid分组,形成keyedStream,然后进行模式匹配 ::方法引用
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);DataStream<LoginWarning> loginFailDataStream = patternStream.select((Map<String, List<LoginEvent>> pattern) -> {List<LoginEvent> first = pattern.get("start");List<LoginEvent> second = pattern.get("next");return new LoginWarning(first.get(0).getUserId(), first.get(0).getIp(), first.get(0).getType());}
);loginFailDataStream.print();
env.execute();