架构总是在不断修正演变的过程中得到完善!!!
需求背景:
接到了一个判断报文中规则来触发告警的需求,本以为需求很简单,具体的告警逻辑就不赘述了,大体的流程是:
接收到报文,报文中有定义规则和对应的当前值,判断当前值是否需要触发告警,如果需要则触发告警,存储告警,通过mqtt推送告警信息给前端。
实现:
其实实现起来也是比较简单的,在接到报文后,先进行规则逻辑判断,如果需要触发告警,再查询以前是否已经有过告警,或者是是否满足告警的触发窗口时间,又或者是已经在恢复期,是否需要重置恢复窗口时间等,最后再进行告警的记录或者更新,同时推送;
主要涉及到几个时间节点:
1、第一次满足触发条件,进行直接触发告警或开始触发窗口时间记录
2、满足触发窗口时间,进行告警触发
3、满足恢复条件,直接恢复或开始恢复窗口时间记录
4、告警恢复或恢复失败进行窗口时间重置等
逻辑可能相对复杂,不过这不是今天记录的重点。
主要记录在进行不断测试的过程中遇到的各种并发问题及解决方式:
问题1:
刚实现了初版的功能后,发到开发环境进行简单的测试,很快发现了问题,同一条告警的触发时间在不停变化,这明显不正确,触发时间只会有一个怎么可能会变化呢。经过排查发现在同一时间会有大量的数据进入到判断逻辑当中,而针对同一个要判断的物理对象的同一个等级,同一时间只能有一个,所以需要加锁来进行拦截:
此时使用了redisson分布式锁,不能使用单体的synchronized或者Lock类来进行加锁,防止在多节点情况下,锁不住物理对象的问题;
但是这就引出了下一个问题:
问题2:
加了redisson锁之后,性能明显下降了,导致从kafka中拿数据速度变慢,kafka数据造成大量堆积,但此时比较尴尬的是我没办法通过增加redis节点来解决问题,所以只能自己想办法了:
所以就优化取数据的速度,一次从kafka中多拿些数据,将对数据的逻辑判断提前,尽量在进入代码时就进行正确性的判断,让其快速失败,同时对数据库的操作进行各种优化,加索引等,总之就是能提高性能的都加上,总算是能抗住大量数据并发;
本来以为应该是可以了,但是发现在记录历史库时,数据与实时库中的数据状态大量的不一致,这就引出了第三个问题:
问题3:
在产生告警时其实是会在mysql里存一份用于实时数据,同时将告警记录通过springCloud Stream发送到kafka中,下游有另一个服务来消费并将其记录到mongoDB中用作历史数据;经过一段时间的运行,发现mongoDB中的数据出现了大量跟mysql数据库中数据状态不一致的问题,经过排查发现是由于在发送到kafka时,可能很快该告警就已经恢复了,而下游消费时可能在不同的partition中拿到数据,先消费了恢复的记录,后消费了触发的记录,这就会出现状态不一致的问题:
具体的解决方式在另一篇博客中:使用spring.cloud.stream来发送kafka消息,并根据某字段将消息发送到固定partition上
当然,这期间还有些其他的坑,比如在对mongoDB进行更新的时候,原来使用的是删除-插入的方式,但是发现出现了大量的消息堆积,因为我为了保证删除-插入的原子性操作,后来改成了使用upsert来进行操作;还有就是对需要更新的字段使用了并发流(同事的误操作),导致出现了有些字段没更新有些字段值丢失等问题,后来又改成使用stream流来进行操作等。
总之在不断更新代码的过程中还是体会挺多的,希望能不断进步不断追求更好的代码质量
标签:触发,记录,kafka,并发,关于,数据处理,告警,数据,进行 From: https://www.cnblogs.com/Silentness/p/18225688