首页 > 其他分享 >关于高并发下的数据处理

关于高并发下的数据处理

时间:2024-06-01 13:43:28浏览次数:22  
标签:触发 记录 kafka 并发 关于 数据处理 告警 数据 进行

架构总是在不断修正演变的过程中得到完善!!!

需求背景:

接到了一个判断报文中规则来触发告警的需求,本以为需求很简单,具体的告警逻辑就不赘述了,大体的流程是:

接收到报文,报文中有定义规则和对应的当前值,判断当前值是否需要触发告警,如果需要则触发告警,存储告警,通过mqtt推送告警信息给前端。

实现:

其实实现起来也是比较简单的,在接到报文后,先进行规则逻辑判断,如果需要触发告警,再查询以前是否已经有过告警,或者是是否满足告警的触发窗口时间,又或者是已经在恢复期,是否需要重置恢复窗口时间等,最后再进行告警的记录或者更新,同时推送;

主要涉及到几个时间节点:

1、第一次满足触发条件,进行直接触发告警或开始触发窗口时间记录

2、满足触发窗口时间,进行告警触发

3、满足恢复条件,直接恢复或开始恢复窗口时间记录

4、告警恢复或恢复失败进行窗口时间重置等

逻辑可能相对复杂,不过这不是今天记录的重点。

主要记录在进行不断测试的过程中遇到的各种并发问题及解决方式:

问题1:

刚实现了初版的功能后,发到开发环境进行简单的测试,很快发现了问题,同一条告警的触发时间在不停变化,这明显不正确,触发时间只会有一个怎么可能会变化呢。经过排查发现在同一时间会有大量的数据进入到判断逻辑当中,而针对同一个要判断的物理对象的同一个等级,同一时间只能有一个,所以需要加锁来进行拦截:

此时使用了redisson分布式锁,不能使用单体的synchronized或者Lock类来进行加锁,防止在多节点情况下,锁不住物理对象的问题;

但是这就引出了下一个问题:

问题2:

加了redisson锁之后,性能明显下降了,导致从kafka中拿数据速度变慢,kafka数据造成大量堆积,但此时比较尴尬的是我没办法通过增加redis节点来解决问题,所以只能自己想办法了:

所以就优化取数据的速度,一次从kafka中多拿些数据,将对数据的逻辑判断提前,尽量在进入代码时就进行正确性的判断,让其快速失败,同时对数据库的操作进行各种优化,加索引等,总之就是能提高性能的都加上,总算是能抗住大量数据并发;

本来以为应该是可以了,但是发现在记录历史库时,数据与实时库中的数据状态大量的不一致,这就引出了第三个问题:

问题3:

在产生告警时其实是会在mysql里存一份用于实时数据,同时将告警记录通过springCloud Stream发送到kafka中,下游有另一个服务来消费并将其记录到mongoDB中用作历史数据;经过一段时间的运行,发现mongoDB中的数据出现了大量跟mysql数据库中数据状态不一致的问题,经过排查发现是由于在发送到kafka时,可能很快该告警就已经恢复了,而下游消费时可能在不同的partition中拿到数据,先消费了恢复的记录,后消费了触发的记录,这就会出现状态不一致的问题:

具体的解决方式在另一篇博客中:使用spring.cloud.stream来发送kafka消息,并根据某字段将消息发送到固定partition上

 

当然,这期间还有些其他的坑,比如在对mongoDB进行更新的时候,原来使用的是删除-插入的方式,但是发现出现了大量的消息堆积,因为我为了保证删除-插入的原子性操作,后来改成了使用upsert来进行操作;还有就是对需要更新的字段使用了并发流(同事的误操作),导致出现了有些字段没更新有些字段值丢失等问题,后来又改成使用stream流来进行操作等。

总之在不断更新代码的过程中还是体会挺多的,希望能不断进步不断追求更好的代码质量

标签:触发,记录,kafka,并发,关于,数据处理,告警,数据,进行
From: https://www.cnblogs.com/Silentness/p/18225688

相关文章

  • 【Redis】保证redis的高并发高可用的几种策略
    保证Redis的高并发高可用性需要从多方面入手,包括架构设计、配置优化、监控和维护。以下是一些具体的策略和方法:1.Redis集群模式Redis集群模式通过将数据分片分布到多个节点上来实现高并发和高可用性。分片:将数据分布到多个节点,减少单个节点的负载。主从复制:每个主节......
  • 【多进程并发笔记】Python-Multiprocess
    目录调用函数后,函数内的变量如何释放?python2.7怎么使用多线程加速forloop多进程进程池,函数序列化错误的处理Time模块计算程序运行时间使用多进程,Start()后,如何获得返回值?使用多进程并行,每个进程都将结果写入sqlite3数据库,可以么python创建进程池进程池的最大进程数怎么确......
  • 淘宝API接口大全,实时数据接口且支持高并发请求不限制
    前言在开发测试阶段,或者是在写Demo的时候,难免会用到一些测试数据,有时苦于没有可用的接口,需要自己动手去写,但是这样大大降低了效率,前期我也找了一些封装好的接口,这篇文章整理一下,以下接口完全免费测试,返回格式全是JSON,所有接口均可无限制使用,有需要的小伙伴可以进来看看。 ......
  • 关于12306技术相关说明以及暂定计划
    12306项目中包含了缓存、消息队列、分库分表、设计模式等代码,通过这些代码可以全面了解分布式系统的核心知识点。在系统设计中,采用最新JDK17+SpringBoot3&SpringCloud微服务架构,构建高并发、大数据量下仍然能提供高效可靠的12306购票服务。下方的架构图全面描述了项......
  • 微信公众号【原子与分子模拟】: 熔化温度 + 超导电性 + 电子化合物 + 分子动力学模拟 +
    往期内容主要涵盖: 熔化温度 + 超导电性 + 电子化合物 + 分子动力学模拟 + 第一性原理计算 + 数据处理程序【1】熔化温度 +分子动力学+LAMMPS相关内容【文献分享】分子动力学模拟+LAMMPS+熔化温度+晶体缺陷+熔化方法LAMMPS文献:金属熔化行为的局域......
  • JUC并发编程第六章——volatile与JMM
    1被volatile修饰的变量有两大特点特点:可见性有序性:有排序要求,有时需要禁重排内存语义:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值立即刷新回主内存中当读一个volatile变量时,JMM会把该线程对应的本地内存设置为无效,重新回到主内存中读取最新共享......
  • python数据处理
    Python在数据处理方面非常强大,主要得益于其丰富的库,如Pandas、NumPy和Matplotlib等。以下是一些基本的Python代码示例,用于数据加载、处理和可视化。1.导入库importpandasaspdimportnumpyasnpimportmatplotlib.pyplotasplt2.加载数据#从CSV文件加载......
  • Linux下的并发与竞争
    文章目录前言一、原子操作二、自旋锁三、读写自旋锁四、信号量五、互斥体总结前言Linux系统是个多任务操作系统,并发访问带来的问题就是竞争,所谓的临界区就是共享数据段,要保证临界区是原子访问的。主要方法有四种:原子操作,自旋锁,信号量,互斥体。本文主要介绍内核下各方......
  • 关于一道高斯函数题目
    令\(\lfloorx\rfloor\)为不大于\(x\)的最大整数。\(\{x\}=x-\lfloorx\rfloor\)。题目1题面求值:\[\sum\limits_{i=1}^{2015}\left\{\frac{2015i}{2016}\right\}\]标准答案\[\begin{aligned}&\left\{\frac{2015i}{2016}\right\}\\......
  • 机器学习——关于SVM的些许问题的个人思考
    最近在利用python对机器学习进行实践,因为之前我是先完整的刷了一遍周志华老师的《西瓜书》才开始的实践活动,因此,时间跨度很久,以至于对于SVM的相关理论有些生疏了,甚至关于SVM的一些之前没注意到的问题,现在暴露了出来,所以这篇文章主要是想跟大家分享一下个人关于SVM的一些令人纠......