基本概念
CEP
CEP 就是复杂事件处理(Complex Event Processing)的缩写。而 Flink CEP 就是 Flink 实现的一个用于复杂事件处理的库。
具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是复杂事件,然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。
- 定义一个匹配规则
- 将匹配规则应用到事件流上,检测满足规则的复杂事件
- 检测到的复杂事件进行处理,得到结果进行输出
模式
CEP 的第一步所定义的匹配规则,也就是模式。模式的定义主要为两部分内容:
- 每个简单事件的特征
- 简单事件之间的组合关系
CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件,模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时。
应用场景
- 风险控制:可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败、大量下单却不支付,就可以向用户发送通知信息,或是进行报警提示。
- 用户画像:利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有 特定行为习惯的一些用户,做出相应的用户画像。
- 运维监控:利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。
快速上手
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>1.12.1</version>
</dependency>
简单示例
需求:检测用户行为,如果连续三次登录失败,就输出报警信息。
首先定义数据的类型。这里的用户行为不再是之前的访问事件 Event 了,所以应该单独定义一个登录事件 POJO 类。
public class LoginEvent {
public String userId;
public String ipAddress;
public String eventType;
public Long timestamp;
public LoginEvent() {
}
public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
this.userId = userId;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "LoginEvent{" +
"userId='" + userId + '\'' +
", ipAddress='" + ipAddress + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
'}';
}
}
// 1.获取登录数据流
SingleOutputStreamOperator<LoginEvent> loginEventStream = env.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "172.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
@Override
public long extractTimestamp(LoginEvent loginEvent, long l) {
return loginEvent.timestamp;
}
}));
// 2.定义模式,连续三次失败
Pattern<LoginEvent, LoginEvent> failPattern = Pattern.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("second")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("third")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
});
// 3.将模式应用到数据流上,检测复杂事件
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(loginEvent -> loginEvent.userId), failPattern);
// 4.将检测到的复杂事件提取出来,进行处理得到报警信息输出
SingleOutputStreamOperator<String> warningStream = patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
// 提取复杂事件中的三次登录失败事件
LoginEvent firstLoginEvent = map.get("first").get(0);
LoginEvent secondLoginEvent = map.get("second").get(0);
LoginEvent thirdLoginEvent = map.get("third").get(0);
return firstLoginEvent.userId + "连续三次登录失败!登录时间:" +
firstLoginEvent.timestamp + "," +
secondLoginEvent.timestamp + "," +
thirdLoginEvent.timestamp;
}
});
// 5.打印输出
warningStream.print();
模式 API
个体模式
每一个简单事件并不是任意选取的,也需要有一定的条件规则。所以我们就把每个简单事件的匹配规则叫做个体模式。
Pattern.<LoginEvent>begin("first") // 以第一个登录失败事件开始
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
每个个体模式都以一个连接词开始定义的,比如 begin
、next
等等,这是 Pattern
对 的一个方法,返回的还是一个 Pattern
。
个体模式需要一个过滤条件,用来指定具体的匹配规则。这个条件一般是通过调用 where()
方法来实现的,具体的过滤逻辑则通过传入的 SimpleCondition
内的 filter()
方法来定义。
量词
个体模式后面可以跟一个量词,用来指定循环的次数。个体模式可以包括单例模式和循环模式。默认情況下,个体模式是単例模式,匹配接收一个事件,当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。
在 Flink CEP 中,可以使用不同的方法指定循环模式,主要有:
oneOrMore()
:匹配事件出现一次或多次。times(times)
:匹配事件发生特定次数 times。times(fromTimes, toTimes)
:指定匹配事件出现的次数范围,最小次数为 fromTimes,最大次数为 toTimes。greedy()
:只能用在循环模式后,使当前循坏模式变得贪心,就是总是尽可能多地去匹配。optional()
:使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
// 匹配事件出现4次
pattern.times(4);
// 匹配事件出现4次,或不出现(0次)
pattern.times(4).optional();
// 匹配事件出现2,3或者4次
pattern.times(2, 4);
// 匹配事件出现2,3或者4次,并且尽可能多地匹配
pattern.times(2, 4).greedy();
// 匹配事件出现2,3或者4次,或不出现
pattern.times(2, 4).optional();
// 匹配事件出现2,3或者4次,或不出现,并且尽可能多的匹配
pattern.times(2, 4).optional().greedy();
// 匹配事件出现1次或多次
pattern.oneOrMore();
// 匹配事件出现1次或多次,并且尽可能多地匹配
pattern.oneOrMore().optional();
// 匹配事件出现1次或多次,或不出现,并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();
// 匹配事件出现2次或多次
pattern.timesOrMore(2);
正是因为个体模式可以通过量词定义为循环模式,一个模式能够匹配到多个事件,所以事件的检测接收才会用 Map
中的一个列表来保存。
条件
对于条件的定义,主要是通过调用 Pattern
对象的 where()
方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用 Pattern
对象的 subtype()
方法来限定匹配事件的子类型。
- 限定子类型:调用
subtype()
方法可以为当前模式增加子类型限制条件。 - 简单条件:只根据当前事件的特征来快定是否接受它。
- 迭代条件:需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。
基本概念
CEP
CEP 就是复杂事件处理(Complex Event Processing)的缩写。而 Flink CEP 就是 Flink 实现的一个用于复杂事件处理的库。
具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是复杂事件,然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。
- 定义一个匹配规则
- 将匹配规则应用到事件流上,检测满足规则的复杂事件
- 检测到的复杂事件进行处理,得到结果进行输出
模式
CEP 的第一步所定义的匹配规则,也就是模式。模式的定义主要为两部分内容:
- 每个简单事件的特征
- 简单事件之间的组合关系
CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件,模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时。
应用场景
- 风险控制:可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败、大量下单却不支付,就可以向用户发送通知信息,或是进行报警提示。
- 用户画像:利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有 特定行为习惯的一些用户,做出相应的用户画像。
- 运维监控:利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。
快速上手
引入依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.12</artifactId>
<version>1.12.1</version>
</dependency>
简单示例
需求:检测用户行为,如果连续三次登录失败,就输出报警信息。
首先定义数据的类型。这里的用户行为不再是之前的访问事件 Event 了,所以应该单独定义一个登录事件 POJO 类。
public class LoginEvent {
public String userId;
public String ipAddress;
public String eventType;
public Long timestamp;
public LoginEvent() {
}
public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
this.userId = userId;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "LoginEvent{" +
"userId='" + userId + '\'' +
", ipAddress='" + ipAddress + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
'}';
}
}
// 1.获取登录数据流
SingleOutputStreamOperator<LoginEvent> loginEventStream = env.fromElements(
new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
new LoginEvent("user_1", "172.56.23.10", "fail", 5000L),
new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
@Override
public long extractTimestamp(LoginEvent loginEvent, long l) {
return loginEvent.timestamp;
}
}));
// 2.定义模式,连续三次失败
Pattern<LoginEvent, LoginEvent> failPattern = Pattern.<LoginEvent>begin("first")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("second")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
.next("third")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
});
// 3.将模式应用到数据流上,检测复杂事件
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(loginEvent -> loginEvent.userId), failPattern);
// 4.将检测到的复杂事件提取出来,进行处理得到报警信息输出
SingleOutputStreamOperator<String> warningStream = patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
@Override
public String select(Map<String, List<LoginEvent>> map) throws Exception {
// 提取复杂事件中的三次登录失败事件
LoginEvent firstLoginEvent = map.get("first").get(0);
LoginEvent secondLoginEvent = map.get("second").get(0);
LoginEvent thirdLoginEvent = map.get("third").get(0);
return firstLoginEvent.userId + "连续三次登录失败!登录时间:" +
firstLoginEvent.timestamp + "," +
secondLoginEvent.timestamp + "," +
thirdLoginEvent.timestamp;
}
});
// 5.打印输出
warningStream.print();
模式 API
个体模式
每一个简单事件并不是任意选取的,也需要有一定的条件规则。所以我们就把每个简单事件的匹配规则叫做个体模式。
Pattern.<LoginEvent>begin("first") // 以第一个登录失败事件开始
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
})
每个个体模式都以一个连接词开始定义的,比如 begin
、next
等等,这是 Pattern
对 的一个方法,返回的还是一个 Pattern
。
个体模式需要一个过滤条件,用来指定具体的匹配规则。这个条件一般是通过调用 where()
方法来实现的,具体的过滤逻辑则通过传入的 SimpleCondition
内的 filter()
方法来定义。
量词
个体模式后面可以跟一个量词,用来指定循环的次数。个体模式可以包括单例模式和循环模式。默认情況下,个体模式是単例模式,匹配接收一个事件,当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。
在 Flink CEP 中,可以使用不同的方法指定循环模式,主要有:
oneOrMore()
:匹配事件出现一次或多次。times(times)
:匹配事件发生特定次数 times。times(fromTimes, toTimes)
:指定匹配事件出现的次数范围,最小次数为 fromTimes,最大次数为 toTimes。greedy()
:只能用在循环模式后,使当前循坏模式变得贪心,就是总是尽可能多地去匹配。optional()
:使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
// 匹配事件出现4次
pattern.times(4);
// 匹配事件出现4次,或不出现(0次)
pattern.times(4).optional();
// 匹配事件出现2,3或者4次
pattern.times(2, 4);
// 匹配事件出现2,3或者4次,并且尽可能多地匹配
pattern.times(2, 4).greedy();
// 匹配事件出现2,3或者4次,或不出现
pattern.times(2, 4).optional();
// 匹配事件出现2,3或者4次,或不出现,并且尽可能多的匹配
pattern.times(2, 4).optional().greedy();
// 匹配事件出现1次或多次
pattern.oneOrMore();
// 匹配事件出现1次或多次,并且尽可能多地匹配
pattern.oneOrMore().optional();
// 匹配事件出现1次或多次,或不出现,并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();
// 匹配事件出现2次或多次
pattern.timesOrMore(2);
正是因为个体模式可以通过量词定义为循环模式,一个模式能够匹配到多个事件,所以事件的检测接收才会用 Map
中的一个列表来保存。
条件
对于条件的定义,主要是通过调用 Pattern
对象的 where()
方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用 Pattern
对象的 subtype()
方法来限定匹配事件的子类型。
- 限定子类型:调用
subtype()
方法可以为当前模式增加子类型限制条件。 - 简单条件:只根据当前事件的特征来快定是否接受它。
- 迭代条件:需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。
failPattern.oneOrMore()
.where(new IterativeCondition<LoginEvent>() { // 迭代条件
@Override
public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
// 事件的user必须以A开头
if (!loginEvent.userId.startsWith("A")) {
return false;
}
Long sum = loginEvent.timestamp;
// 获取当前模式之前已经匹配的事件,求所有事件timestamp之和
for (LoginEvent event: context.getEventsForPattern("failPattern")) {
sum += event.timestamp;
}
// 总时间小于10000时,当前事件满足匹配规则,可以匹配成功
return sum < 10000;
}
});
- 终止条件:表示遇到某个特定事件时,当前模式就不再继续循环匹配了。终止条件的定义是通过调用模式对象的
until()
方法来实现的,同样传入一个IterativeCondition
作为参数。
组合模式
将多个个体模式组合起来的完整模式,就叫作组合模式。
Pattern<Event, ?> pattern = Pattern
.<Event>begin("start").where(...)
.next("next").where(...)
.followedBy("follow").where(...)
...
初始模式
所有的组合模式,都必须以一个初始模式开头,而初始模式必须通过调用 Pattern
的静态方法 begin()
来创建。
Pattern<Event, ?> pattern = Pattern.<Event>begin("start");
传入的 String
类型的参数就是模式的名称,而 begin()
方法需要传入一个类型参数,就是模式要检测流中事件的基本类型, 这里定义的是 Event
。
Pattern
有两个泛型参数,第一个就是检测事件的基本类型 Event
,跟 begin()
指定的类型一致,第二个则是当前模式里事件的子类型,由子类型限制条件指定。
近邻条件
模式之间的组合是通过一些连接词方法实现的,这些连接词指明了先后事件之间有着怎样的近邻关系。
- 严格近邻:匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件,对应的
next()
方法。 - 宽松近邻:只关心事件发生的顺序,而放宽了对匹配事件的距离要求,对应的
followedBy()
方法。 - 非确定下宽松近邻:非确定性是指可以重复使用之前已经匹配过的事件。这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽松近邻更多,对应的
followedByAny()
方法。 - 其他限制条件
notNext()
:前一个模式匹配到的事件后面不能紧跟着某种事件。notFollowedBy()
:前一个模式匹配到的事件后面,不会出现某种事件。within()
:第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。
模式组
Flink CEP 允许我们以嵌套的方式来定义模式。
在模式组中,每一个模式序列就被当作了某一阶段的匹配条件,返回的类型是一个 GroupPattern
而 GroupPattern
本身是 Pattern
的子类。所以个体模式和组合模式能调用的方法,比如 times()
、oneOrMore()
、optional()
之类的量词,模式组一般也是可以用的。
// 以模式序列作为初始模式
Pattern<Event, ?> start = Pattern.begin(
Pattern.<Event>begin("start_start").where(...)
.followedBy("start_middle").where(...)
);
// 在start后定义严格近邻的模式序列,并重复匹配两次
Pattern<Event, ?> strict = start.next(
Pattern.<Event>begin("next_start").where(...)
.followedBy("next_middle").where(...)
).times(2);
// 在start后定义宽松近邻的模式序列,并重复匹配一次或多次
Pattern<Event, ?> relaxed = start.followedBy(
Pattern.<Event>begin("followedBy_start").where(...)
.followedBy("followedBy_middle").where(...)
).oneOrMore();
// 在start后定义非确定性宽松近邻的模式序列,可以匹配一次,也可以不匹配
Pattern<Event, ?> nonDeterminRelaxed = start.followedByAny(
Pattern.<Event>begin("followedByAny_start").where(...)
.followedBy("followedByAny_middle").where(...)
).optional();
跳过策略
我们如果输入事件序列 a a a b
,那么应该检测到 6 个匹配结果:(a1 a2 a3 b)
、(a1 a2 b)
、(a1 b)
、(a2 a3 b)
、(a2 b)
、(a3 b)
。
- 不跳过:调用
AfterMatchSkipStrategy.noSkip()
。这是默认策略,所有可能的匹配都会输出。 - 跳至下一个:调用
AfterMatchSkipStrategy.skipToNext()
。找到一个a1
开始的最大匹配之后,跳过a1
开始的所有其他匹配,直接从下一个a2
开始匹配。最终结果:(a1 a2 a3 b)
、(a2 a3 b)
、(a3 b)
。 - 跳过所有子匹配:调用
AfterMatchSkipStrategy.skipPastLastEvent()
。找到a1
开始的匹配(a1 a2 a3 b)
之后,直接跳过所有a1
直到a3
开头的匹配,相当于把这些子匹配都跳过了。最终结果:(a1 a2 a3 b)
。 - 跳至第一个:调用
AfterMatchSkipStrategy.skipToFirst("a")
。指明跳至哪个模式的第一个匹配事件。最终结果:(a1 a2 a3 b)
、(a1 a2 b)
、(a1 b)
。 - 跳至最后一个:调用
AfterMatchSkipStrategy.skipToLast("a")
。指明跳至哪个模式的最后一个匹配事件。最终结果:(a1 a2 a3 b)
、(a3 b)
。
检测处理
将模式应用到流上
只要调用 CEP 类的静态方法 pattern()
,将数据流和模式作为两个参数传入。最终得到的是一个 PatternStream
。
// 2.定义模式,连续三次失败
Pattern<LoginEvent, LoginEvent> failPattern = Pattern.<LoginEvent>begin("fail")
.where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals("fail");
}
}).times(3).consecutive(); // 指定严格近邻的三次登录失败
// 3.将模式应用到数据流上,检测复杂事件
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(loginEvent -> loginEvent.userId), failPattern);
// 4.将检测到的复杂事件提取出来,进行处理得到报警信息输出
SingleOutputStreamOperator<String> warningStream = patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
@Override
public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
// 提取三次登录失败事件
LoginEvent firstFailEvent = map.get("fail").get(0);
LoginEvent secondFailEvent = map.get("fail").get(1);
LoginEvent thirdFailEvent = map.get("fail").get(2);
collector.collect(firstFailEvent.userId + "连续三次登录失败!登录时间:" +
firstFailEvent.timestamp + "," +
secondFailEvent.timestamp + "," +
thirdFailEvent.timestamp);
}
});
处理超时事件
用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后, 用户支付的意愿会降低。电商网站往往会对订单状态进行监控,设置一个失效时间,如果下单后一段时间仍未支付,订单就会被取消。
首先定义出要处理的数据类型。要包括用户对订单的下单和支付两种行为。可以定义 POJO 类 OrderEvent
如下。
public class OrderEvent {
public String userId;
public String orderId;
public String eventType;
public Long timestamp;
public OrderEvent() {
}
public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {
this.userId = userId;
this.orderId = orderId;
this.eventType = eventType;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "OrderEvent{" +
"userId='" + userId + '\'' +
", orderId='" + orderId + '\'' +
", eventType='" + eventType + '\'' +
", timestamp=" + timestamp +
'}';
}
}
// 1.获取数据流
SingleOutputStreamOperator<OrderEvent> stream = env.fromElements(
new OrderEvent("user_1", "order_1", "create", 1000L),
new OrderEvent("user_2", "order_2", "create", 2000L),
new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
@Override
public long extractTimestamp(OrderEvent orderEvent, long l) {
return orderEvent.timestamp;
}
}));
// 2.定义模式
Pattern<OrderEvent, OrderEvent> pattern = Pattern.<OrderEvent>begin("create")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return orderEvent.eventType.equals("create");
}
})
.followedBy("pay")
.where(new SimpleCondition<OrderEvent>() {
@Override
public boolean filter(OrderEvent orderEvent) throws Exception {
return orderEvent.eventType.equals("pay");
}
})
.within(Time.minutes(15));
// 3.将模式应用到数据流上
PatternStream<OrderEvent> patternStream = CEP.pattern(stream.keyBy(orderEvent -> orderEvent.orderId), pattern);
// 4.定义一个侧输出流标签
OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {
};
// 5.将完全匹配和超时部分匹配的复杂事件提取出来,进行处理
SingleOutputStreamOperator<String> result = patternStream.process(new OrderPayMatch());
result.print("payed: ");
result.getSideOutput(timeoutTag).print("timeout: ");
// 自定义PatternProcessFunction
public static class OrderPayMatch extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {
@Override
public void processMatch(Map<String, List<OrderEvent>> map, Context context, Collector<String> collector) throws Exception {
// 获取当前的支付事件
OrderEvent payEvent = map.get("pay").get(0);
collector.collect("用户" + payEvent.userId + "订单" + payEvent.orderId + "已支付");
}
@Override
public void processTimedOutMatch(Map<String, List<OrderEvent>> map, Context context) throws Exception {
OrderEvent createEvent = map.get("create").get(0);
OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {};
context.output(timeoutTag, "用户" + createEvent.userId + "订单" + createEvent.orderId + "超时");
}
}
标签:匹配,LoginEvent,Flink,模式,事件,CEP,new,public
From: https://www.cnblogs.com/fireonfire/p/17026368.html