首页 > 编程语言 >java flink(二十六) 实战之电商黑名单过滤 Flink CEP编程实现、什么是CEP、CEP组合模式demo、CEP循环模式demo

java flink(二十六) 实战之电商黑名单过滤 Flink CEP编程实现、什么是CEP、CEP组合模式demo、CEP循环模式demo

时间:2024-01-31 19:55:40浏览次数:34  
标签:LoginEvent demo org flink CEP apache import 电商

java flink(二十六) 实战之电商黑名单过滤 Flink CEP编程实现、什么是CEP、CEP组合模式demo、CEP循环模式demo

什么是CEP:

1、复杂事件处理

2、Flink中实现复杂事件处理库

3、CEP允许在无休止的事件中检测事件模式,让我们有机会掌握数据中的重要部分

4、一个或多个由简单事件构成的事件通过一定的规则匹配,然后输出用户想要的数据。

CEP API:

begin 第一个事件定义 

where 条件

next 后一个紧跟着事件定义

subtype 子类型判断

followedBy 后边的时间 不一定紧跟着

CEP 个体模式 :

CEP 模式序列 

 

CEP 超时处理 

CEP组合模式demo 

题目:根据上篇的黑名单过滤功能,我们进行优化,如果两条失败之间穿插了一条乱序的成功登录,那么这两条失败不会被检测。

我们利用CEP,首先第一个事件是检测第一条失败登录,然后第二个事件是检测第二个失败登录,进行筛选打印报警信息。

1、引入jar包:

2、代码展示

  1. package Project;
  2. import Beans.LoginEvent;
  3. import Beans.LoginFailWarning;
  4. import org.apache.flink.cep.CEP;
  5. import org.apache.flink.cep.PatternSelectFunction;
  6. import org.apache.flink.cep.PatternStream;
  7. import org.apache.flink.cep.pattern.Pattern;
  8. import org.apache.flink.cep.pattern.conditions.SimpleCondition;
  9. import org.apache.flink.streaming.api.TimeCharacteristic;
  10. import org.apache.flink.streaming.api.datastream.DataStream;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import java.net.URL;
  16. import java.util.List;
  17. import java.util.Map;
  18. public class LoginFailWithCep {
  19. public static void main(String[] args) throws Exception{
  20. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. env.setParallelism(1);
  22. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  23. //读取数据
  24. URL resource = LoginFailWithCep.class.getResource("/LoginLog.csv");
  25. DataStream<LoginEvent> logEventStream = env.readTextFile(resource.getPath())
  26. .map(line -> {
  27. String[] fields = line.split(",");
  28. return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
  29. })
  30. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
  31. @Override
  32. public long extractTimestamp(LoginEvent loginEvent) {
  33. return loginEvent.getTimestamp() * 1000;
  34. }
  35. });
  36. //定义匹配模式
  37. //匹配第一次失败与第二次失败在2s内
  38. Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("firstFail").where(new SimpleCondition<LoginEvent>() {
  39. @Override
  40. public boolean filter(LoginEvent loginEvent) throws Exception {
  41. return "fail".equals(loginEvent.getLoginState());
  42. }
  43. }).next("secondFail").where(new SimpleCondition<LoginEvent>() {
  44. @Override
  45. public boolean filter(LoginEvent loginEvent) throws Exception {
  46. return "fail".equals(loginEvent.getLoginState());
  47. }
  48. }).within(Time.seconds(2)); //2s内
  49. //将匹配模式应用到数据流上
  50. PatternStream<LoginEvent> patternStream = CEP.pattern(logEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
  51. //检出符合匹配条件的复杂事件 进行转换处理 得到报警信息
  52. SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());
  53. warningStream.print();
  54. env.execute("login fail detect with cep job");
  55. }
  56. //实现自定义的pattern select function
  57. public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning>{
  58. @Override
  59. public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
  60. LoginEvent firstFailEvent = map.get("firstFail").get(0);
  61. LoginEvent secondFailEvent = map.get("secondFail").get(0);
  62. return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"login fail 2 times");
  63. }
  64. }
  65. }

题目二:如果是检测连续三条失败,那么我们还要继续编写组合模式的条件,这样比较麻烦,我们利用CEP循环模式进行实现

  1. package Project;
  2. import Beans.LoginEvent;
  3. import Beans.LoginFailWarning;
  4. import org.apache.flink.cep.CEP;
  5. import org.apache.flink.cep.PatternSelectFunction;
  6. import org.apache.flink.cep.PatternStream;
  7. import org.apache.flink.cep.pattern.Pattern;
  8. import org.apache.flink.cep.pattern.conditions.SimpleCondition;
  9. import org.apache.flink.streaming.api.TimeCharacteristic;
  10. import org.apache.flink.streaming.api.datastream.DataStream;
  11. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  12. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  13. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
  14. import org.apache.flink.streaming.api.windowing.time.Time;
  15. import java.net.URL;
  16. import java.util.List;
  17. import java.util.Map;
  18. public class LoginFailWithCep {
  19. public static void main(String[] args) throws Exception{
  20. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  21. env.setParallelism(1);
  22. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  23. //读取数据
  24. URL resource = LoginFailWithCep.class.getResource("/LoginLog.csv");
  25. DataStream<LoginEvent> logEventStream = env.readTextFile(resource.getPath())
  26. .map(line -> {
  27. String[] fields = line.split(",");
  28. return new LoginEvent(new Long(fields[0]), fields[1], fields[2], new Long(fields[3]));
  29. })
  30. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.seconds(3)) {
  31. @Override
  32. public long extractTimestamp(LoginEvent loginEvent) {
  33. return loginEvent.getTimestamp() * 1000;
  34. }
  35. });
  36. //定义匹配模式
  37. //匹配第一次失败与第二次失败在2s内
  38. Pattern<LoginEvent, LoginEvent> loginFailPattern = Pattern.<LoginEvent>begin("failEvents")
  39. .where(new SimpleCondition<LoginEvent>() {
  40. @Override
  41. public boolean filter(LoginEvent loginEvent) throws Exception {
  42. return "fail".equals(loginEvent.getLoginState());
  43. }})
  44. //.times(3) //连续三次 非严格近邻(只要后边有就算)
  45. .times(3).consecutive() //必须严格近邻
  46. .within(Time.seconds(5));
  47. //将匹配模式应用到数据流上
  48. PatternStream<LoginEvent> patternStream = CEP.pattern(logEventStream.keyBy(LoginEvent::getUserId), loginFailPattern);
  49. //检出符合匹配条件的复杂事件 进行转换处理 得到报警信息
  50. SingleOutputStreamOperator<LoginFailWarning> warningStream = patternStream.select(new LoginFailMatchDetectWarning());
  51. warningStream.print();
  52. env.execute("login fail detect with cep job");
  53. }
  54. //实现自定义的pattern select function
  55. public static class LoginFailMatchDetectWarning implements PatternSelectFunction<LoginEvent, LoginFailWarning>{
  56. @Override
  57. public LoginFailWarning select(Map<String, List<LoginEvent>> map) throws Exception {
  58. // LoginEvent firstFailEvent = map.get("firstFail").get(0);
  59. // LoginEvent secondFailEvent = map.get("secondFail").get(0);
  60. // return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),secondFailEvent.getTimestamp(),"login fail 2 times");
  61. LoginEvent firstFailEvent = map.get("failEvents").get(0);
  62. LoginEvent lastFailEvent = map.get("failEvents").get(map.size()-1);
  63. return new LoginFailWarning(firstFailEvent.getUserId(),firstFailEvent.getTimestamp(),lastFailEvent.getTimestamp(),"login fail 3 times");
  64. }
  65. }
  66. }

 

原文链接:https://blog.csdn.net/qq_40771567/article/details/117109794

标签:LoginEvent,demo,org,flink,CEP,apache,import,电商
From: https://www.cnblogs.com/sunny3158/p/17999980

相关文章

  • WMS系统与电商平台快速拉通库存数量
    什么是WMS系统WMS系统是指仓储管理系统(Warehouse Management System)。它是一种用于管理和控制仓库运营的软件系统。WMS系统通过集成信息技术,提供仓库内货物的存储、出入库、库存管理、订单处理等功能,优化仓库的运作效率和准确性,并提供实时的库存可视化和数据分析,同时还可以协调和......
  • WMS系统与电商平台快速拉通库存数量
    什么是WMS系统WMS系统是指仓储管理系统(Warehouse Management System)。它是一种用于管理和控制仓库运营的软件系统。WMS系统通过集成信息技术,提供仓库内货物的存储、出入库、库存管理、订单处理等功能,优化仓库的运作效率和准确性,并提供实时的库存可视化和数据分析,同时还可以协调......
  • tomcat启动时报错:Caused by: java.lang.IllegalArgumentException: AJP连接器配置secr
    31-Jan-202414:01:13.812信息[main]org.apache.coyote.AbstractProtocol.start开始协议处理句柄["http-nio-8080"]31-Jan-202414:01:13.818严重[main]org.apache.catalina.core.StandardService.startInternalFailedtostartconnector[Connector[AJP/1.3-8009]]......
  • Python中HTTPException(基于werkzeug.exceptions包)
    当我们在开发HTTP服务时(接口服务),由于很多从内部引发的Python异常,会触发标准HTTP非200响应的视图。为了让前端有着更好的视图体验(如果因为内部异常,会返回给前端/调用方更好的一个页面/返回)。对于我们来说,给予调用方一个固定的返回格式时非常重要的(因此通过HTTPException......
  • Simple-BEV_ What Really Matters for Multi-Sensor BEV Perception_
    title:"Simple-BEV:WhatReallyMattersforMulti-SensorBEVPerception?"tags:-paperSimple-BEV:WhatReallyMattersforMulti-SensorBEVPerception?ZoteroAbstractBuilding3Dperceptionsystemsforautonomousvehiclesthatdonotrelyo......
  • 从数据库更新模型时出现System.ArgumentException
    尝试从数据库进行更新时,遇到类型未system.argumentexception的异常 来自热心网友的提醒:初看这个问题的时候以为有相同的表、主键啊之类的冲突排除了很久后检查了一下EntitySetMapping发现存在相同的节点呢删除了就ok了检查了一下EntitySetMapping发现存在相同的节......
  • Day61 异常机制Error和Exception
    异常机制Error和Exception什么是异常?软件程序在运行过程中,出现的意外,我们叫异常,英文是:Exception,意思是例外。这些,例外情况,或者叫异常,怎么让我们写的程序做出合理的处理。而不至于程序崩溃。异常指程序运行中出现的不期而至的各种状况,如:文件找不到、网络连接失败、非法参数等。......
  • 物流平台如何与电商平台进行自动化流程管理
    为什么要实现物流与电商平台进行自动化管理实现物流平台与电商平台的自动化流程管理对企业和消费者都有着重要的意义,比如以下几点:提高效率:自动化流程管理可以减少人为操作的错误和延误,提高订单处理和物流配送的效率。通过定义清晰的流程图和自动化工具,可以快速而准确地完成订单处理......
  • 通过Demo学WPF—数据绑定(一)✨
    前言✨想学习WPF,但是看视频教程觉得太耗时间,直接看文档又觉得似懂非懂,因此想通过看Demo代码+文档的方式进行学习。准备✨微软官方其实提供了WPF的一些Demo,地址为:microsoft/WPF-Samples:RepositoryforWPFrelatedsamples(github.com)将其克隆到本地,有很多的Demo代码:新建......
  • MySql执行Sql语句时出现“MySqlException: Parameter ‘@maxNo‘ must be defined.”
    1、......