首页 > 其他分享 >webflux延迟队列逻辑更改过程记录

webflux延迟队列逻辑更改过程记录

时间:2022-12-06 15:25:00浏览次数:39  
标签:逻辑 delayElement 队列 webflux 线程 doOnNext 延迟

title : webflux延迟队列逻辑更改过程记录
author : simonLee
date : 2022/11/22 10:26

目录

webflux延迟队列逻辑更改过程记录

一、问题背景

一个业务使用了webflux.delayElement(Duration delay)来发送延迟消息,由于是异步,发送出去无法通过别的途径拿到该延迟消息(webflux没有相关api),但新需求功能上需要对发出去的延迟消息提供终止功能,且规定时间内可重复无限次

老功能简单来说就是有一个数字,数字在选择时间内按照正态分布有规律的一定时间间隔发送延迟消息,并对消息进行后续处理。例:设定数字10000,在半小时内,按一定时间间隔平分10000发送延迟消息。
新功能就是在老功能的基础上,支持中途中断往后的所有消息。

二、当前实现代码

可以见到,在delayElement后使用了doOnNext()做后续操作以及flatMap重新封装流对象,而想要做中途终止,必须能拿到延迟队列。"理解是第一步",我们需要先了解这几行代码的底层逻辑,才能对其进行修改。

三、逻辑详解

3.1创建事件链

方法开头使用了一个Mono对象作为事件链的头,由于webflux都是使用流式编程,使用一个对象便可把所有遍历对象连接起来。

3.2delayElement

先附上delayElement源码

该入参是一个Duration类,意为多久之后消费。在内部又调用了一个重载方法,入参为Duration和Scheduler,时间以及reactor的调度器。

Schedulers.parallel()是调度器的一个线程模型。

从这里我们大概可以知道webflux封装的delayElement也是通过线程延时执行的

3.3publishOn

下一步的publishOn则是将链中的调度器调整为boundedElastic,是一个有界可复用线程池,可动态调节。该线程池可适当解决"且规定时间内可重复无限次"产生的内存问题。

该线程池容量是CPU核心数的10倍,最多提交10万条任务。如果是延时调度,那么延时开始时间是在有线程可用时才开始计算。

3.4doOnNext

再下一步的doOnNext是在不对序列造成改变的情况下,发出元素,相当于做一些额外的逻辑。

在这里doOnNext里是做调用腾讯云操作

3.5flapMap

最后一步是flapMap,即重新执行逻辑修改元素内容。这里还做了落库操作并返回落库方法返回的元素。

四、代码总结

// 由调用链可知道
/*
	代码相当于做了这样的操作,封装流,交由webflux执行
*/
eventChain.
    .delayElement()
    .publishOn()
    .doOnNext()
    .flatMap()
    
    .delayElement()
    .publishOn()
    .doOnNext()
    .flatMap()
    
    .delayElement()
    .publishOn()
    .doOnNext()
    .flatMap()
    
    ...重复

从Mono模型上来看,当链路中有一个元素终止或者抛错那么整条链路就会中断

img

五、逻辑修改总结

由于该方法是封装整个链路给到webflux,所以我们只需在封装前给定一个批次号用作是否终止判断,在发送延迟消息后加上一个终止条件判断(暂定redisKey)的元素,若是已终止则在此元素抛出流异常,则整个流会捕获异常结束被gc掉,内存也得以保障。

标签:逻辑,delayElement,队列,webflux,线程,doOnNext,延迟
From: https://www.cnblogs.com/checkcode/p/webflux_check.html

相关文章

  • 代码随想录——栈与队列
    用栈实现队列题目简单把pop()和peek()中可复用的部分提取出来classMyQueue{Stack<Integer>stackIn;Stack<Integer>stackOut;/**Initialize......
  • Java使用LinkedList模拟一个堆栈或者队列数据结构
    用Java模拟一个堆栈或者队列数据结构。首先得明白堆栈和队列的数据结构:堆栈:先进后出队列:先进先出LinkedList中刚好有addFirst()和addLast()方法。1.publicclassStac......
  • 和平河东物理机租用供应线路稳定,延迟低
    机房环境、带宽大小等等。那些价格极低的供应商也大都是些私人商家、二道贩子,并又没独立运营的能力,他们手中的资源都不知道转了多少手,价格还那么低,那他们不用挣钱了?因......
  • 和平河东高防物理机,线路稳延迟低
    服务器贵不贵?硬件配置好性能就好带宽防护真实只有选的对没有贵不贵。服务器质量呢?老铁我们是一手机房五星级别TB级别防护你说质量好不好金窝里怎么可能放银......
  • 一种有界队列(Bounded Buffer)的实现
    一、概述在有CPU和GPU参与的一种运算中,比如深度学习推理,CPU需要预处理数据,然后交给GPU处理,最后CPU对GPU的运算结果进行后处理。在整个过程中都是FIFO,即数据......
  • leetcode 1687. 从仓库到码头运输箱子 动态规划 + 单调队列
    你有一辆货运卡车,你需要用这一辆车把一些箱子从仓库运送到码头。这辆卡车每次运输有 箱子数目的限制 和总重量的限制 。给你一个箱子数组 boxes 和三个整数portsCo......
  • 延庆门头沟物理机托管,线路稳定,延迟低
    了解自己企业需要什么样类型的服务器,在你选择之前,必须要考虑以下问题:1.我的服务器的主要目的是什么?主要是用于电子邮件目的?还是认为数据传输更重要?当然,选择多功能的服务......
  • 延庆门头沟高防物理机,线路稳延迟低
    后24小时在线无惧攻击 稳定服务器 高档服务器 拥有独立的双PCI通道和内存扩展板设计,高内存带宽,大容量热插拔硬盘和热插拔电源,具有超强的数据处理能力。适用于需要......
  • 单调队列学习笔记
    用处:滑动窗口维护区间最值核心思想:双端队列,队首放最大值/最小值的下标,1.清除不是更优的(队尾弹出)。2.清除过期的(队首弹出)。例题:https://www.luogu.com.cn/problem/P3088......
  • 三阶段:第13周 分布式消息队列-Kafka 没用
                   ......