首页 > 其他分享 >Flink:CEP

Flink:CEP

时间:2023-01-04 17:00:43浏览次数:44  
标签:匹配 LoginEvent Flink 模式 事件 CEP new public

基本概念

CEP

CEP 就是复杂事件处理(Complex Event Processing)的缩写。而 Flink CEP 就是 Flink 实现的一个用于复杂事件处理的库。

具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是复杂事件,然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。

  1. 定义一个匹配规则
  2. 将匹配规则应用到事件流上,检测满足规则的复杂事件
  3. 检测到的复杂事件进行处理,得到结果进行输出

模式

CEP 的第一步所定义的匹配规则,也就是模式。模式的定义主要为两部分内容:

  • 每个简单事件的特征
  • 简单事件之间的组合关系

CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件,模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时。

应用场景

  • 风险控制:可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败、大量下单却不支付,就可以向用户发送通知信息,或是进行报警提示。
  • 用户画像:利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有 特定行为习惯的一些用户,做出相应的用户画像。
  • 运维监控:利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。

快速上手

引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>1.12.1</version>
</dependency>

简单示例

需求:检测用户行为,如果连续三次登录失败,就输出报警信息。

首先定义数据的类型。这里的用户行为不再是之前的访问事件 Event 了,所以应该单独定义一个登录事件 POJO 类。

public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;

    public LoginEvent() {
    }

    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId='" + userId + '\'' +
                ", ipAddress='" + ipAddress + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}
// 1.获取登录数据流
SingleOutputStreamOperator<LoginEvent> loginEventStream = env.fromElements(
        new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
        new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
        new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
        new LoginEvent("user_1", "172.56.23.10", "fail", 5000L),
        new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
        new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
        new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
            @Override
            public long extractTimestamp(LoginEvent loginEvent, long l) {
                return loginEvent.timestamp;
            }
        }));

// 2.定义模式,连续三次失败
Pattern<LoginEvent, LoginEvent> failPattern = Pattern.<LoginEvent>begin("first")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        })
        .next("second")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        })
        .next("third")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        });

// 3.将模式应用到数据流上,检测复杂事件
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(loginEvent -> loginEvent.userId), failPattern);

// 4.将检测到的复杂事件提取出来,进行处理得到报警信息输出
SingleOutputStreamOperator<String> warningStream = patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
    @Override
    public String select(Map<String, List<LoginEvent>> map) throws Exception {
        // 提取复杂事件中的三次登录失败事件
        LoginEvent firstLoginEvent = map.get("first").get(0);
        LoginEvent secondLoginEvent = map.get("second").get(0);
        LoginEvent thirdLoginEvent = map.get("third").get(0);
        return firstLoginEvent.userId + "连续三次登录失败!登录时间:" +
                firstLoginEvent.timestamp + "," +
                secondLoginEvent.timestamp + "," +
                thirdLoginEvent.timestamp;
    }
});

// 5.打印输出
warningStream.print();

模式 API

个体模式

每一个简单事件并不是任意选取的,也需要有一定的条件规则。所以我们就把每个简单事件的匹配规则叫做个体模式。

Pattern.<LoginEvent>begin("first")  // 以第一个登录失败事件开始
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        })

每个个体模式都以一个连接词开始定义的,比如 beginnext 等等,这是 Pattern 对 的一个方法,返回的还是一个 Pattern

个体模式需要一个过滤条件,用来指定具体的匹配规则。这个条件一般是通过调用 where() 方法来实现的,具体的过滤逻辑则通过传入的 SimpleCondition 内的 filter() 方法来定义。

量词

个体模式后面可以跟一个量词,用来指定循环的次数。个体模式可以包括单例模式和循环模式。默认情況下,个体模式是単例模式,匹配接收一个事件,当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。

在 Flink CEP 中,可以使用不同的方法指定循环模式,主要有:

  • oneOrMore():匹配事件出现一次或多次。
  • times(times):匹配事件发生特定次数 times。
  • times(fromTimes, toTimes):指定匹配事件出现的次数范围,最小次数为 fromTimes,最大次数为 toTimes。
  • greedy():只能用在循环模式后,使当前循坏模式变得贪心,就是总是尽可能多地去匹配。
  • optional():使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
// 匹配事件出现4次
pattern.times(4);

// 匹配事件出现4次,或不出现(0次)
pattern.times(4).optional();

// 匹配事件出现2,3或者4次
pattern.times(2, 4);

// 匹配事件出现2,3或者4次,并且尽可能多地匹配
pattern.times(2, 4).greedy();

// 匹配事件出现2,3或者4次,或不出现
pattern.times(2, 4).optional();

// 匹配事件出现2,3或者4次,或不出现,并且尽可能多的匹配
pattern.times(2, 4).optional().greedy();

// 匹配事件出现1次或多次
pattern.oneOrMore();

// 匹配事件出现1次或多次,并且尽可能多地匹配
pattern.oneOrMore().optional();

// 匹配事件出现1次或多次,或不出现,并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();

// 匹配事件出现2次或多次
pattern.timesOrMore(2);

正是因为个体模式可以通过量词定义为循环模式,一个模式能够匹配到多个事件,所以事件的检测接收才会用 Map 中的一个列表来保存。

条件

对于条件的定义,主要是通过调用 Pattern 对象的 where() 方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用 Pattern 对象的 subtype() 方法来限定匹配事件的子类型。

  • 限定子类型:调用 subtype() 方法可以为当前模式增加子类型限制条件。
  • 简单条件:只根据当前事件的特征来快定是否接受它。
  • 迭代条件:需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。

基本概念

CEP

CEP 就是复杂事件处理(Complex Event Processing)的缩写。而 Flink CEP 就是 Flink 实现的一个用于复杂事件处理的库。

具体的处理过程是,把事件流中的一个个简单事件,通过一定的规则匹配组合起来,这就是复杂事件,然后基于这些满足规则的一组组复杂事件进行转换处理,得到想要的结果进行输出。

  1. 定义一个匹配规则
  2. 将匹配规则应用到事件流上,检测满足规则的复杂事件
  3. 检测到的复杂事件进行处理,得到结果进行输出

模式

CEP 的第一步所定义的匹配规则,也就是模式。模式的定义主要为两部分内容:

  • 每个简单事件的特征
  • 简单事件之间的组合关系

CEP 做的事其实就是在流上进行模式匹配。根据模式的近邻关系条件不同,可以检测连续的事件或不连续但先后发生的事件,模式还可能有时间的限制,如果在设定时间范围内没有满足匹配条件,就会导致模式匹配超时。

应用场景

  • 风险控制:可以对用户的异常行为进行实时检测。当一个用户行为符合了异常行为模式,比如短时间内频繁登录并失败、大量下单却不支付,就可以向用户发送通知信息,或是进行报警提示。
  • 用户画像:利用 CEP 可以用预先定义好的规则,对用户的行为轨迹进行实时跟踪,从而检测出具有 特定行为习惯的一些用户,做出相应的用户画像。
  • 运维监控:利用 CEP 灵活配置多指标、多依赖来实现更复杂的监控模式。

快速上手

引入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-cep_2.12</artifactId>
    <version>1.12.1</version>
</dependency>

简单示例

需求:检测用户行为,如果连续三次登录失败,就输出报警信息。

首先定义数据的类型。这里的用户行为不再是之前的访问事件 Event 了,所以应该单独定义一个登录事件 POJO 类。

public class LoginEvent {
    public String userId;
    public String ipAddress;
    public String eventType;
    public Long timestamp;

    public LoginEvent() {
    }

    public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
        this.userId = userId;
        this.ipAddress = ipAddress;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "LoginEvent{" +
                "userId='" + userId + '\'' +
                ", ipAddress='" + ipAddress + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}
// 1.获取登录数据流
SingleOutputStreamOperator<LoginEvent> loginEventStream = env.fromElements(
        new LoginEvent("user_1", "192.168.0.1", "fail", 2000L),
        new LoginEvent("user_1", "192.168.0.2", "fail", 3000L),
        new LoginEvent("user_2", "192.168.1.29", "fail", 4000L),
        new LoginEvent("user_1", "172.56.23.10", "fail", 5000L),
        new LoginEvent("user_2", "192.168.1.29", "success", 6000L),
        new LoginEvent("user_2", "192.168.1.29", "fail", 7000L),
        new LoginEvent("user_2", "192.168.1.29", "fail", 8000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<LoginEvent>() {
            @Override
            public long extractTimestamp(LoginEvent loginEvent, long l) {
                return loginEvent.timestamp;
            }
        }));

// 2.定义模式,连续三次失败
Pattern<LoginEvent, LoginEvent> failPattern = Pattern.<LoginEvent>begin("first")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        })
        .next("second")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        })
        .next("third")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        });

// 3.将模式应用到数据流上,检测复杂事件
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(loginEvent -> loginEvent.userId), failPattern);

// 4.将检测到的复杂事件提取出来,进行处理得到报警信息输出
SingleOutputStreamOperator<String> warningStream = patternStream.select(new PatternSelectFunction<LoginEvent, String>() {
    @Override
    public String select(Map<String, List<LoginEvent>> map) throws Exception {
        // 提取复杂事件中的三次登录失败事件
        LoginEvent firstLoginEvent = map.get("first").get(0);
        LoginEvent secondLoginEvent = map.get("second").get(0);
        LoginEvent thirdLoginEvent = map.get("third").get(0);
        return firstLoginEvent.userId + "连续三次登录失败!登录时间:" +
                firstLoginEvent.timestamp + "," +
                secondLoginEvent.timestamp + "," +
                thirdLoginEvent.timestamp;
    }
});

// 5.打印输出
warningStream.print();

模式 API

个体模式

每一个简单事件并不是任意选取的,也需要有一定的条件规则。所以我们就把每个简单事件的匹配规则叫做个体模式。

Pattern.<LoginEvent>begin("first")  // 以第一个登录失败事件开始
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        })

每个个体模式都以一个连接词开始定义的,比如 beginnext 等等,这是 Pattern 对 的一个方法,返回的还是一个 Pattern

个体模式需要一个过滤条件,用来指定具体的匹配规则。这个条件一般是通过调用 where() 方法来实现的,具体的过滤逻辑则通过传入的 SimpleCondition 内的 filter() 方法来定义。

量词

个体模式后面可以跟一个量词,用来指定循环的次数。个体模式可以包括单例模式和循环模式。默认情況下,个体模式是単例模式,匹配接收一个事件,当定义了量词之后,就变成了循环模式,可以匹配接收多个事件。

在 Flink CEP 中,可以使用不同的方法指定循环模式,主要有:

  • oneOrMore():匹配事件出现一次或多次。
  • times(times):匹配事件发生特定次数 times。
  • times(fromTimes, toTimes):指定匹配事件出现的次数范围,最小次数为 fromTimes,最大次数为 toTimes。
  • greedy():只能用在循环模式后,使当前循坏模式变得贪心,就是总是尽可能多地去匹配。
  • optional():使当前模式成为可选的,也就是说可以满足这个匹配条件,也可以不满足。
// 匹配事件出现4次
pattern.times(4);

// 匹配事件出现4次,或不出现(0次)
pattern.times(4).optional();

// 匹配事件出现2,3或者4次
pattern.times(2, 4);

// 匹配事件出现2,3或者4次,并且尽可能多地匹配
pattern.times(2, 4).greedy();

// 匹配事件出现2,3或者4次,或不出现
pattern.times(2, 4).optional();

// 匹配事件出现2,3或者4次,或不出现,并且尽可能多的匹配
pattern.times(2, 4).optional().greedy();

// 匹配事件出现1次或多次
pattern.oneOrMore();

// 匹配事件出现1次或多次,并且尽可能多地匹配
pattern.oneOrMore().optional();

// 匹配事件出现1次或多次,或不出现,并且尽可能多地匹配
pattern.oneOrMore().optional().greedy();

// 匹配事件出现2次或多次
pattern.timesOrMore(2);

正是因为个体模式可以通过量词定义为循环模式,一个模式能够匹配到多个事件,所以事件的检测接收才会用 Map 中的一个列表来保存。

条件

对于条件的定义,主要是通过调用 Pattern 对象的 where() 方法来实现的,主要可以分为简单条件、迭代条件、复合条件、终止条件几种类型。此外,也可以调用 Pattern 对象的 subtype() 方法来限定匹配事件的子类型。

  • 限定子类型:调用 subtype() 方法可以为当前模式增加子类型限制条件。
  • 简单条件:只根据当前事件的特征来快定是否接受它。
  • 迭代条件:需要将当前事件跟之前的事件做对比,才能判断出要不要接受当前事件。
failPattern.oneOrMore()
        .where(new IterativeCondition<LoginEvent>() { // 迭代条件
            @Override
            public boolean filter(LoginEvent loginEvent, Context<LoginEvent> context) throws Exception {
                // 事件的user必须以A开头
                if (!loginEvent.userId.startsWith("A")) {
                    return false;
                }

                Long sum = loginEvent.timestamp;
                // 获取当前模式之前已经匹配的事件,求所有事件timestamp之和
                for (LoginEvent event: context.getEventsForPattern("failPattern")) {
                    sum += event.timestamp;
                }
                // 总时间小于10000时,当前事件满足匹配规则,可以匹配成功
                return sum < 10000;
            }
        });
  • 终止条件:表示遇到某个特定事件时,当前模式就不再继续循环匹配了。终止条件的定义是通过调用模式对象的 until() 方法来实现的,同样传入一个 IterativeCondition 作为参数。

组合模式

将多个个体模式组合起来的完整模式,就叫作组合模式。

Pattern<Event, ?> pattern = Pattern
  .<Event>begin("start").where(...)
  .next("next").where(...)
  .followedBy("follow").where(...)
  ...

初始模式

所有的组合模式,都必须以一个初始模式开头,而初始模式必须通过调用 Pattern 的静态方法 begin() 来创建。

Pattern<Event, ?> pattern = Pattern.<Event>begin("start");

传入的 String 类型的参数就是模式的名称,而 begin() 方法需要传入一个类型参数,就是模式要检测流中事件的基本类型, 这里定义的是 Event

Pattern 有两个泛型参数,第一个就是检测事件的基本类型 Event,跟 begin() 指定的类型一致,第二个则是当前模式里事件的子类型,由子类型限制条件指定。

近邻条件

模式之间的组合是通过一些连接词方法实现的,这些连接词指明了先后事件之间有着怎样的近邻关系。

  • 严格近邻:匹配的事件严格地按顺序一个接一个出现,中间不会有任何其他事件,对应的 next() 方法。
  • 宽松近邻:只关心事件发生的顺序,而放宽了对匹配事件的距离要求,对应的 followedBy() 方法。
  • 非确定下宽松近邻:非确定性是指可以重复使用之前已经匹配过的事件。这种近邻条件下匹配到的不同复杂事件,可以以同一个事件作为开始,所以匹配结果一般会比宽松近邻更多,对应的 followedByAny() 方法。
  • 其他限制条件
    • notNext():前一个模式匹配到的事件后面不能紧跟着某种事件。
    • notFollowedBy():前一个模式匹配到的事件后面,不会出现某种事件。
    • within():第一个事件到最后一个事件之间的最大时间间隔,只有在这期间成功匹配的复杂事件才是有效的。

模式组

Flink CEP 允许我们以嵌套的方式来定义模式。

在模式组中,每一个模式序列就被当作了某一阶段的匹配条件,返回的类型是一个 GroupPatternGroupPattern 本身是 Pattern 的子类。所以个体模式和组合模式能调用的方法,比如 times()oneOrMore()optional() 之类的量词,模式组一般也是可以用的。

// 以模式序列作为初始模式
Pattern<Event, ?> start = Pattern.begin(
        Pattern.<Event>begin("start_start").where(...)
                .followedBy("start_middle").where(...)
);

// 在start后定义严格近邻的模式序列,并重复匹配两次
Pattern<Event, ?> strict = start.next(
        Pattern.<Event>begin("next_start").where(...)
                .followedBy("next_middle").where(...)
).times(2);

// 在start后定义宽松近邻的模式序列,并重复匹配一次或多次
Pattern<Event, ?> relaxed = start.followedBy(
        Pattern.<Event>begin("followedBy_start").where(...)
                .followedBy("followedBy_middle").where(...)
).oneOrMore();

// 在start后定义非确定性宽松近邻的模式序列,可以匹配一次,也可以不匹配
Pattern<Event, ?> nonDeterminRelaxed = start.followedByAny(
        Pattern.<Event>begin("followedByAny_start").where(...)
                .followedBy("followedByAny_middle").where(...)
).optional();

跳过策略

我们如果输入事件序列 a a a b,那么应该检测到 6 个匹配结果:(a1 a2 a3 b)(a1 a2 b)(a1 b)(a2 a3 b)(a2 b)(a3 b)

  • 不跳过:调用 AfterMatchSkipStrategy.noSkip()。这是默认策略,所有可能的匹配都会输出。
  • 跳至下一个:调用 AfterMatchSkipStrategy.skipToNext()。找到一个 a1 开始的最大匹配之后,跳过 a1 开始的所有其他匹配,直接从下一个 a2 开始匹配。最终结果:(a1 a2 a3 b)(a2 a3 b)(a3 b)
  • 跳过所有子匹配:调用 AfterMatchSkipStrategy.skipPastLastEvent()。找到 a1 开始的匹配 (a1 a2 a3 b) 之后,直接跳过所有 a1 直到 a3 开头的匹配,相当于把这些子匹配都跳过了。最终结果: (a1 a2 a3 b)
  • 跳至第一个:调用 AfterMatchSkipStrategy.skipToFirst("a")。指明跳至哪个模式的第一个匹配事件。最终结果:(a1 a2 a3 b)(a1 a2 b)(a1 b)
  • 跳至最后一个:调用 AfterMatchSkipStrategy.skipToLast("a")。指明跳至哪个模式的最后一个匹配事件。最终结果:(a1 a2 a3 b)(a3 b)

检测处理

将模式应用到流上

只要调用 CEP 类的静态方法 pattern(),将数据流和模式作为两个参数传入。最终得到的是一个 PatternStream

// 2.定义模式,连续三次失败
Pattern<LoginEvent, LoginEvent> failPattern = Pattern.<LoginEvent>begin("fail")
        .where(new SimpleCondition<LoginEvent>() {
            @Override
            public boolean filter(LoginEvent loginEvent) throws Exception {
                return loginEvent.eventType.equals("fail");
            }
        }).times(3).consecutive(); // 指定严格近邻的三次登录失败


// 3.将模式应用到数据流上,检测复杂事件
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventStream.keyBy(loginEvent -> loginEvent.userId), failPattern);

// 4.将检测到的复杂事件提取出来,进行处理得到报警信息输出
SingleOutputStreamOperator<String> warningStream = patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
    @Override
    public void processMatch(Map<String, List<LoginEvent>> map, Context context, Collector<String> collector) throws Exception {
        // 提取三次登录失败事件
        LoginEvent firstFailEvent = map.get("fail").get(0);
        LoginEvent secondFailEvent = map.get("fail").get(1);
        LoginEvent thirdFailEvent = map.get("fail").get(2);
        collector.collect(firstFailEvent.userId + "连续三次登录失败!登录时间:" +
                firstFailEvent.timestamp + "," +
                secondFailEvent.timestamp + "," +
                thirdFailEvent.timestamp);
    }
});

处理超时事件

用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后, 用户支付的意愿会降低。电商网站往往会对订单状态进行监控,设置一个失效时间,如果下单后一段时间仍未支付,订单就会被取消。

首先定义出要处理的数据类型。要包括用户对订单的下单和支付两种行为。可以定义 POJO 类 OrderEvent 如下。

public class OrderEvent {
    public String userId;
    public String orderId;
    public String eventType;
    public Long timestamp;

    public OrderEvent() {
    }

    public OrderEvent(String userId, String orderId, String eventType, Long timestamp) {
        this.userId = userId;
        this.orderId = orderId;
        this.eventType = eventType;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "OrderEvent{" +
                "userId='" + userId + '\'' +
                ", orderId='" + orderId + '\'' +
                ", eventType='" + eventType + '\'' +
                ", timestamp=" + timestamp +
                '}';
    }
}
// 1.获取数据流
SingleOutputStreamOperator<OrderEvent> stream = env.fromElements(
        new OrderEvent("user_1", "order_1", "create", 1000L),
        new OrderEvent("user_2", "order_2", "create", 2000L),
        new OrderEvent("user_1", "order_1", "modify", 10 * 1000L),
        new OrderEvent("user_1", "order_1", "pay", 60 * 1000L),
        new OrderEvent("user_2", "order_3", "create", 10 * 60 * 1000L),
        new OrderEvent("user_2", "order_3", "pay", 20 * 60 * 1000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderEvent>forMonotonousTimestamps()
        .withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
            @Override
            public long extractTimestamp(OrderEvent orderEvent, long l) {
                return orderEvent.timestamp;
            }
        }));

// 2.定义模式
Pattern<OrderEvent, OrderEvent> pattern = Pattern.<OrderEvent>begin("create")
        .where(new SimpleCondition<OrderEvent>() {
            @Override
            public boolean filter(OrderEvent orderEvent) throws Exception {
                return orderEvent.eventType.equals("create");
            }
        })
        .followedBy("pay")
        .where(new SimpleCondition<OrderEvent>() {
            @Override
            public boolean filter(OrderEvent orderEvent) throws Exception {
                return orderEvent.eventType.equals("pay");
            }
        })
        .within(Time.minutes(15));

// 3.将模式应用到数据流上
PatternStream<OrderEvent> patternStream = CEP.pattern(stream.keyBy(orderEvent -> orderEvent.orderId), pattern);

// 4.定义一个侧输出流标签
OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {
};

// 5.将完全匹配和超时部分匹配的复杂事件提取出来,进行处理
SingleOutputStreamOperator<String> result = patternStream.process(new OrderPayMatch());

result.print("payed: ");
result.getSideOutput(timeoutTag).print("timeout: ");
// 自定义PatternProcessFunction
public static class OrderPayMatch extends PatternProcessFunction<OrderEvent, String> implements TimedOutPartialMatchHandler<OrderEvent> {
    @Override
    public void processMatch(Map<String, List<OrderEvent>> map, Context context, Collector<String> collector) throws Exception {
        // 获取当前的支付事件
        OrderEvent payEvent = map.get("pay").get(0);
        collector.collect("用户" + payEvent.userId + "订单" + payEvent.orderId + "已支付");
    }

    @Override
    public void processTimedOutMatch(Map<String, List<OrderEvent>> map, Context context) throws Exception {
        OrderEvent createEvent = map.get("create").get(0);
        OutputTag<String> timeoutTag = new OutputTag<String>("timeout") {};
        context.output(timeoutTag, "用户" + createEvent.userId + "订单" + createEvent.orderId + "超时");
    }
}

标签:匹配,LoginEvent,Flink,模式,事件,CEP,new,public
From: https://www.cnblogs.com/fireonfire/p/17025369.html

相关文章