title : webflux延迟队列逻辑更改过程记录
author : simonLee
date : 2022/11/22 10:26
目录
webflux延迟队列逻辑更改过程记录
一、问题背景
一个业务使用了webflux.delayElement(Duration delay)来发送延迟消息,由于是异步,发送出去无法通过别的途径拿到该延迟消息(webflux没有相关api),但新需求功能上需要对发出去的延迟消息提供终止功能,且规定时间内可重复无限次
老功能简单来说就是有一个数字,数字在选择时间内按照正态分布有规律的一定时间间隔发送延迟消息,并对消息进行后续处理。例:设定数字10000,在半小时内,按一定时间间隔平分10000发送延迟消息。
新功能就是在老功能的基础上,支持中途中断往后的所有消息。
二、当前实现代码
可以见到,在delayElement后使用了doOnNext()做后续操作以及flatMap重新封装流对象,而想要做中途终止,必须能拿到延迟队列。"理解是第一步",我们需要先了解这几行代码的底层逻辑,才能对其进行修改。
三、逻辑详解
3.1创建事件链
方法开头使用了一个Mono对象作为事件链的头,由于webflux都是使用流式编程,使用一个对象便可把所有遍历对象连接起来。
3.2delayElement
先附上delayElement源码
该入参是一个Duration类,意为多久之后消费。在内部又调用了一个重载方法,入参为Duration和Scheduler,时间以及reactor的调度器。
Schedulers.parallel()是调度器的一个线程模型。
从这里我们大概可以知道webflux封装的delayElement也是通过线程延时执行的
3.3publishOn
下一步的publishOn则是将链中的调度器调整为boundedElastic,是一个有界可复用线程池,可动态调节。该线程池可适当解决"且规定时间内可重复无限次"产生的内存问题。
该线程池容量是CPU核心数的10倍,最多提交10万条任务。如果是延时调度,那么延时开始时间是在有线程可用时才开始计算。
3.4doOnNext
再下一步的doOnNext是在不对序列造成改变的情况下,发出元素,相当于做一些额外的逻辑。
在这里doOnNext里是做调用腾讯云操作
3.5flapMap
最后一步是flapMap,即重新执行逻辑修改元素内容。这里还做了落库操作并返回落库方法返回的元素。
四、代码总结
// 由调用链可知道
/*
代码相当于做了这样的操作,封装流,交由webflux执行
*/
eventChain.
.delayElement()
.publishOn()
.doOnNext()
.flatMap()
.delayElement()
.publishOn()
.doOnNext()
.flatMap()
.delayElement()
.publishOn()
.doOnNext()
.flatMap()
...重复
从Mono模型上来看,当链路中有一个元素终止或者抛错那么整条链路就会中断
五、逻辑修改总结
由于该方法是封装整个链路给到webflux,所以我们只需在封装前给定一个批次号用作是否终止判断,在发送延迟消息后加上一个终止条件判断(暂定redisKey)的元素,若是已终止则在此元素抛出流异常,则整个流会捕获异常结束被gc掉,内存也得以保障。
标签:逻辑,delayElement,队列,webflux,线程,doOnNext,延迟 From: https://www.cnblogs.com/checkcode/p/webflux_check.html