java flink(二十六) 实战之电商黑名单过滤 Flink CEP编程实现、什么是CEP、CEP组合模式demo、CEP循环模式demo
什么是CEP:
1、复杂事件处理
2、Flink中实现复杂事件处理库
3、CEP允许在无休止的事件中检测事件模式,让我们有机会掌握数据中的重要部分
4、一个或多个由简单事件构成的事件通过一定的规则匹配,然后输出用户想要的数据。
CEP API:
begin 第一个事件定义
where 条件
next 后一个紧跟着事件定义
subtype 子类型判断
followedBy 后边的时间 不一定紧跟着
CEP 个体模式 :
CEP 模式序列
CEP 超时处理
CEP组合模式demo
题目:根据上篇的黑名单过滤功能,我们进行优化,如果两条失败之间穿插了一条乱序的成功登录,那么这两条失败不会被检测。
我们利用CEP,首先第一个事件是检测第一条失败登录,然后第二个事件是检测第二个失败登录,进行筛选打印报警信息。
1、引入jar包:
2、代码展示
- package Project;
-
- import Beans.LoginEvent;
- import Beans.LoginFailWarning;
- import org.apache.flink.cep.CEP;
- import org.apache.flink.cep.PatternSelectFunction;
- import org.apache.flink.cep.PatternStream;
- import org.apache.flink.cep.pattern.Pattern;
- import org.apache.flink.cep.pattern.conditions.SimpleCondition;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
- import org.apache.flink.streaming.api.windowing.time.Time;
-
- import java.net.URL;
- import java.util.List;
- import java.util.Map;
-
- public class LoginFailWithCep {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- //读取数据
- URL resource = LoginFailWithCep.class.getResource("/LoginLog.csv");
- DataStream<LoginEvent> logEventStream = env.readTextFile(resource.getPath())
- .map(line -> {
- String[] fields = line.split(",");
- return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
- })
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
- @Override
- public long extractTimestamp(LoginEvent loginEvent) {
- return loginEvent.getTimestamp() * 1000;
- }
- });
- //定义匹配模式
- //匹配第一次失败与第二次失败在2s内
- Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
- @Override
- public boolean filter(LoginEvent loginEvent) throws Exception {
- return "fail".equals(loginEvent.getLoginState());
- }
- }).next("secondFail").where(new SimpleCondition<LoginEvent>() {
- @Override
- public boolean filter(LoginEvent loginEvent) throws Exception {
- return "fail".equals(loginEvent.getLoginState());
- }
- }).within(Time.seconds(2)); //2s内
- //将匹配模式应用到数据流上
- PatternStream<LoginEvent> patternStream = CEP.pattern(logEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
- //检出符合匹配条件的复杂事件 进行转换处理 得到报警信息
- SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());
- warningStream.print();
- env.execute("login fail detect with cep job");
- }
- //实现自定义的pattern select function
- public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning>{
- @Override
- public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
- LoginEvent firstFailEvent = map.get("firstFail").get(0);
- LoginEvent secondFailEvent = map.get("secondFail").get(0);
- return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"login fail 2 times");
- }
- }
- }
题目二:如果是检测连续三条失败,那么我们还要继续编写组合模式的条件,这样比较麻烦,我们利用CEP循环模式进行实现
- package Project;
-
- import Beans.LoginEvent;
- import Beans.LoginFailWarning;
- import org.apache.flink.cep.CEP;
- import org.apache.flink.cep.PatternSelectFunction;
- import org.apache.flink.cep.PatternStream;
- import org.apache.flink.cep.pattern.Pattern;
- import org.apache.flink.cep.pattern.conditions.SimpleCondition;
- import org.apache.flink.streaming.api.TimeCharacteristic;
- import org.apache.flink.streaming.api.datastream.DataStream;
- import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
- import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
- import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
- import org.apache.flink.streaming.api.windowing.time.Time;
-
- import java.net.URL;
- import java.util.List;
- import java.util.Map;
-
- public class LoginFailWithCep {
- public static void main(String[] args) throws Exception{
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
- //读取数据
- URL resource = LoginFailWithCep.class.getResource("/LoginLog.csv");
- DataStream<LoginEvent> logEventStream = env.readTextFile(resource.getPath())
- .map(line -> {
- String[] fields = line.split(",");
- return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
- })
- .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
- @Override
- public long extractTimestamp(LoginEvent loginEvent) {
- return loginEvent.getTimestamp() * 1000;
- }
- });
- //定义匹配模式
- //匹配第一次失败与第二次失败在2s内
- Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("failEvents")
- .where(new SimpleCondition<LoginEvent>() {
- @Override
- public boolean filter(LoginEvent loginEvent) throws Exception {
- return "fail".equals(loginEvent.getLoginState());
- }})
- //.times(3) //连续三次 非严格近邻(只要后边有就算)
- .times(3).consecutive() //必须严格近邻
- .within(Time.seconds(5));
-
- //将匹配模式应用到数据流上
- PatternStream<LoginEvent> patternStream = CEP.pattern(logEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
- //检出符合匹配条件的复杂事件 进行转换处理 得到报警信息
- SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());
- warningStream.print();
- env.execute("login fail detect with cep job");
- }
- //实现自定义的pattern select function
- public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning>{
- @Override
- public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
- // LoginEvent firstFailEvent = map.get("firstFail").get(0);
- // LoginEvent secondFailEvent = map.get("secondFail").get(0);
- // return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"login fail 2 times");
- LoginEvent firstFailEvent = map.get("failEvents").get(0);
- LoginEvent lastFailEvent = map.get("failEvents").get(map.size()-1);
- return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),lastFailEvent.getTimestamp(),"login fail 3 times");
- }
- }
- }
原文链接:https://blog.csdn.net/qq_40771567/article/details/117109794 标签:LoginEvent,demo,org,flink,CEP,apache,import,电商 From: https://www.cnblogs.com/sunny3158/p/17999980