首页 > 其他分享 >flink CEP 讲解 和实例

flink CEP 讲解 和实例

时间:2023-05-18 15:14:26浏览次数:42  
标签:flink danger Flink 实例 CEP get event

 

1,Flink介绍  

Flink 是一个分布式的基于状态计算的流处理计算引擎,或者说框架,可以处理有边界流数据和无边界流数据,在内存中执行计算,而且具有任意扩展计算能力。 最初由柏林工业大学的xxx 小组研发,后被阿里巴巴收购。初略看起来,和spark 功能类似,但是某些特征优于spark。 

Flink 提供了对于复杂事件模式识别的功能,并以提供一个专用库(flink-cep),方便用户开发CEP 应用。

 

2, CEP 定义 和场景

CEP 英文全称是“complex event processing” , 即“复杂事件处理” ,它可以探查无限流数据中的复杂模式,使得抓取其中的重要事件。

通过这种能力,我们可以对大型应用,海量用户行为,海量数据进行实时监控,并提取所需信息,尤其是重要的,危急,欺诈的信息。

适用的场景包括,银行应用的信用欺诈,大型系统的系统失效恢复,用户行为监控(直播平台用户弹幕合规),舆情监控,等等。

 

3,Flink cep 开发流程:

1,定义flink 环境

2,输入流 

3,定义 复杂模式

4,应用模式,

5,提取输出数据

 

 

4,CEP 代码实例:

4.1,功能:

输入流为socket 字符模拟 应用日志流。

自定义需要捕捉的规则为:在1分钟内,如果某个实体依次发送 warn 和 danger级别日志,则捕捉到该数据,并输出到控制板。

 

4.2 ,代码:

public class FraudDetectionJob {
    public static void main(String[] args) throws Exception {
        System.out.println("Hello World!");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

// 输入流定义,从socket 读入模拟的流数据
        DataStream<Event> input = env
                .socketTextStream("localhost", 9000)
                .map(str -> new Event(str.split(",")[0], str.split(",")[1], str.split(",")[2]))
                /*.keyBy(event -> event.getId())*/;

        // 定义需要捕捉的事件规则
        Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
                .where(SimpleCondition.of(event -> event.getLevel().equals("warn")))
                .next("end")
                .where(SimpleCondition.of(event -> event.getLevel().equals("danger")))
                .within(Time.minutes(1));

    //对输入流适配规则
        PatternStream<Event> patternStream = CEP.pattern(input.keyBy(e -> e.getId()), pattern);

        //输出结果流事件
        DataStream<Alert> result = patternStream.process(
                new PatternProcessFunction<Event, Alert>() {
                    @Override
                    public void processMatch(
                            Map<String, List<Event>> match,
                            Context ctx,
                            Collector<Alert> out) throws Exception {

                        Event warn = match.get("middle").get(0);
                        Event danger = match.get("danger").get(0);
                        Alert alter = new Alert(warn, danger);
                        out.collect(alter);
                    }
                });

        result.print();

     }    
}

 

 代码难点:

1,复杂规则底层原理为非确定有限状态机。模式理解和定义过程比较复杂。 

 

 参考:

1, Flink CEP 官方文档: CEP 开发

标签:flink,danger,Flink,实例,CEP,get,event
From: https://www.cnblogs.com/gao1261828/p/17380939.html

相关文章

  • WPF单进程实例
    用互斥量Mutex实现如果已经存在Mutex,则会创建失败。注意:Mutex要声明成全局的,不能是局部变量,否则会判断失败。 重写Startup函数,加上单例判断。参考下面代码:1publicpartialclassApp:Application2{3System.Threading.Mutexmutex;45......
  • OpenFileDialog的使用实例
    'DimexcelFolderPathAsString=""'DimopenFileDialog1AsNewOpenFileDialog()'openFileDialog1.Filter="ExcelFiles|*.xlsx;*.xls"'openFileDialog1.Title="SelectExcelFile"......
  • 33、实例化对象有哪几种方式
    newclone()通过反射机制创建//用Class.forName方法获取类,在调用类的newinstance()方法Class<?>cls=Class.forName("com.dao.User");Useru=(User)cls.newInstance();序列化反序列化//将一个对象实例化后,进行序列化,再反序列化,也可以获得一个对象(远程通信的场景下使用......
  • grafana操作实例3
    dashboard版本管理     dashboard自动刷新      大屏编辑     每个dashboard都需要去设置是否开启编辑         ......
  • 西门子1200plc程序实例,TCP/IP及modbus通讯,版本V15,如有需要也可代写程序。
    西门子1200plc程序实例,TCP/IP及modbus通讯,版本V15,如有需要也可代写程序。功能如下:1,西门子1200控制4台步进电机;2,西门子1200与4台MS300变频器modbus485轮询读写参数;3,西门子1200与上位机TCP/IP通讯控制相机拍照,反馈数据;4,设备为多工位联动控制;5,威纶通人机界面多画面切换可以作为参考......
  • NVRM: Xid (PCI:0000:b1:00): 13, pid=1375637, Graphics SM Global Exception on (GP
    显卡服务器中一个显卡崩溃了:May1605:38:58dellkernel:[14244871.006970]NVRM:Xid(PCI:0000:b1:00):13,pid=1375637,GraphicsSMWarpExceptionon(GPC0,TPC0,SM0):IllegalInstructionEncodingMay1605:38:58dellkernel:[14244871.010256]NVRM:Xid(PC......
  • laravel ServiceProvider 服务提供者使用案例
    1.实例化一个类2.全局注册这个类3.在控制器中使用 publicfunctionregister(){$this->app->singleton('wxminapp',function(){return(newWxServiceProvider)->boot();});}使用注册方法,注册一个服务提供者,wxminapp为服务......
  • 记录一次全局异常告警@ExceptionHandler和HandlerExceptionResolver的问题
         最近有同事说之前写的全局异常告警,如果有@Valid的注解,在接入新写的插件告警后,返回信息不打印了。全局异常是基于@ExceptionHandler的全局异常类,主要是ServletMVC的ModelAndView返回的错误信息的捕获。代码如下:   /***@authorxxx*/@RestControlle......
  • 全注解springMVC实例20230517
     1、pom<dependencies><dependency><groupId>org.springframework</groupId><artifactId>spring-webmvc</artifactId><version>4.3.13.RELEASE</version></dependency><dependency&g......
  • @ExceptionHandler注解
    1,基本使用方法Spring的@ExceptionHandler可以用来统一处理方法抛出的异常,比如这样:@ExceptionHandler()publicStringhandleExeption2(Exceptionex){System.out.println("抛异常了:"+ex);ex.printStackTrace();StringresultStr="异常:默认";returnr......