首页 > 其他分享 >背压策略

背压策略

时间:2023-05-10 20:46:13浏览次数:43  
标签:core java reactor thread -- 背压 publish 策略

背压就是流量控制。Reactor提供的背压策略由OverflowStrategy枚举指定:

  • IGNORE:完全忽略下游背压请求。
  • ERROR:当下游无法跟上时,发出IllegalStateException信号。
  • DROP:如果下游没有准备好接收传入信号,则丢弃传入。
  • LATEST:下游将仅获得来自上游的最新信号。
  • BUFFER:如果下游跟不上,缓冲所有信号。

 

@Test
public void test() {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    final int[] a = {0};
    Flux.push(t-> {
        for (int i=0;i<10;i++){
            t.next(a[0]++);
            try {
                TimeUnit.MICROSECONDS.sleep(10);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        System.out.println("generate thread:"+Thread.currentThread().getName());
        t.complete();
    }, FluxSink.OverflowStrategy.BUFFER)
            .publishOn(Schedulers.newSingle("publish-thread-"),1)
            .subscribeOn(Schedulers.newSingle("subscribe-thread-"))
            .subscribe(new Subscriber<Object>() {
                private Subscription subscription = null;

                @Override
                public void onSubscribe(Subscription subscription) {
                    this.subscription = subscription;
                    subscription.request(1);
                }

                @Override
                public void onNext(Object o) {
                    System.out.println(Thread.currentThread().getName()+":消费数据:"+o);
                    try {
                        TimeUnit.MICROSECONDS.sleep(30);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }

                    this.subscription.request(1);
                }

                @Override
                public void one rror(Throwable throwable) {
                    System.out.println("出现错误");
                    throwable.printStackTrace();
                    countDownLatch.countDown();
                }

                @Override
                public void onComplete() {
                    System.out.println("Complete");
                    countDownLatch.countDown();
                }
            });
    try {
        countDownLatch.await();
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}

Flux.push是发布者,睡眠10毫秒在生产下一个数字,订阅者是睡眠30毫秒才向上游请求数据。来模拟快的发布者,慢的订阅者。同时用publishOn和subscribeOn模拟发布者和订阅者在不同线程。记得publishOn的第二个参数(预取个数设置为1)
 
Flux.push第二个参数是背压策略。这里设置为FluxSink.OverflowStrategy.BUFFER,执行结果:

publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:1
publish-thread--2:消费数据:2
publish-thread--2:消费数据:3
publish-thread--2:消费数据:4
publish-thread--2:消费数据:5
publish-thread--2:消费数据:6
publish-thread--2:消费数据:7
publish-thread--2:消费数据:8
publish-thread--2:消费数据:9
Complete

所有的数据都消费了。

 

将Flux.push第二个参数设置为FluxSink.OverflowStrategy.DROP,执行结果:

publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:5
Complete

除了0和5被消费外其他都被丢弃。

 

将Flux.push第二个参数设置为FluxSink.OverflowStrategy.LATEST,执行结果:

publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:4
publish-thread--2:消费数据:9
Complete

每次向上游请求时都是请求最新的数据。在这里是0,4,9。

 

将Flux.push第二个参数设置为FluxSink.OverflowStrategy.ERROR,执行结果:

publish-thread--2:消费数据:0
出现错误
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:224)
	at reactor.core.publisher.FluxCreate$ErrorAsyncSink.onOverflow(FluxCreate.java:708)
	at reactor.core.publisher.FluxCreate$NoOverflowBaseAsyncSink.next(FluxCreate.java:673)
	at com.example.FluxTest.lambda$test$71(FluxTest.java:609)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
	at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

抛出异常:reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)

 

将Flux.push第二个参数设置为FluxSink.OverflowStrategy.IGNORE,执行结果:

publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:1
出现错误
reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:237)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:233)
	at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:639)
	at com.example.FluxTest.lambda$test$71(FluxTest.java:609)
	at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
	at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
	at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

抛出异常:reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure

 

 

将Flux.push第二个参数去掉,Reactor还提供了背压的操作来实现背压策略:

  • onBackpressureBuffer()实现了FluxSink.OverflowStrategy.BUFFER
  • onBackpressureDrop()实现了FluxSink.OverflowStrategy.DROP
  • onBackpressureLatest()实现了FluxSink.OverflowStrategy.LATEST
  • onBackpressureError()实现了FluxSink.OverflowStrategy.ERROR

Reactor文档的示意图更加直观:

onBackpressureBuffer,对于来自其下游的request采取“缓存”策略。

onBackpressureDrop,元素就绪时,根据下游是否有未满足的request来判断是否发出当前元素。

onBackpressureLatest,当有新的request到来的时候,将最新的元素发出。

onBackpressureError,当有多余元素就绪时,发出错误信号。

 
 

参考自:https://blog.csdn.net/get_set/article/details/79562900

标签:core,java,reactor,thread,--,背压,publish,策略
From: https://www.cnblogs.com/shigongp/p/17389177.html

相关文章

  • 神策数据携手网易云商,打造懂策略的智能外呼系统
    随着免费流量和付费流量的获取成本越来越高,企业开始寻找更高效率的获客方式;与此同时,聚焦存量用户,通过精细化运营等手段实现用户运营也成为企业发力数字化的另一重要方向。即围绕“用户”这一运营增长的核心,致力于全面实现用户洞察、用户分层、用户触达、用户交互和用户服务五大环节......
  • chatgpt训练策略之RLHF 技术
      OpenAI推出的ChatGPT对话模型掀起了新的AI热潮,它面对多种多样的问题对答如流,似乎已经打破了机器和人的边界。这一工作的背后是大型语言模型(LargeLanguageModel,LLM)生成领域的新训练范式:RLHF(ReinforcementLearningfromHumanFeedback),即以强化学习方式......
  • PMP-4.9-1 规划采购管理-合同类型、采购管理计划、采购策略、招标文件、采购工作说明
    ##############################################################前一章,我们说到了规划采购管理的一些基础内容,本章我们对采购过程中的一些具体文件内容做说明。这里强调一下,本次内容依旧是买方视角的采购管理流程,也就是甲方的视角。如需乙方视角,可留言评论,需求足够的话,我们再......
  • 可变策略的拟人式三维装箱算法实现-开源
    问题给定一个长方体容器和较多不同形态的长方体货物,需确定装箱策略,使货物尽可能多地装填到容器中。假设与约束1、货物可向上码放;2、货物必须完全包含在容器中;3、任意两个货物内的任意一点不可在空间中的同一位置;4、货物不可悬空放置,即货物下方必须有其他货物或容器底部支撑;5、......
  • 通过创新增长取得成功:避免“增长痛苦”的 5 种策略
    公司在开发和孵化新业务计划方面进行了大量投资,但很少有公司遵循严格的途径来扩大新业务规模。虽然80%的公司声称构思和孵化新企业,但只有16%的公司成功扩大了规模。典型案例是百思买在许多失败倒闭的扩大新业务取得了成功。它经历了建立新业务所需的3个阶段:先提出了一个解决客户......
  • MATLB/Simulink仿真平台直流微电网并网运行控制策略
    MATLB/Simulink仿真平台直流微电网并网运行控制策略包括风机(MPPT)、光伏(MPPT)、蓄电池、直流负载、交流负载、并网逆变器及电网并网逆变器采用电流下垂控制,锁相环、风机和光伏MPPT自建,子单元可适当修改,参数可适当修改ID:48550651550391718......
  • 了解Redis过期策略及实现原理
    大约阅读4分钟如果你使用过redis,那你一定知道过期策略这个命令吧,如果让你设计一个过期键接口,你有什么想法?我们在使用redis时,一般会设置一个过期时间,当然也有不设置过期时间的,也就是永久不过期。当我们设置了过期时间,redis是如何判断是否过期,以及根据什么策略来进行删除的。redi......
  • 分布式ID生成策略
    在分布式系统中,肯定避免不了获取全局唯一ID,用于业务主键,本节主要学习分布式ID常用的生成方法。一、UUIDUUID(UniversallyUniqueIdentifier),通用唯一识别码。UUID是基于当前时间、计数器(counter)和硬件标识(通常为无线网卡的MAC地址)等数据计算生成的。UUID是JDK提......
  • Semi-Join Subquery优化策略
    Semi-JoinSubquery优化策略Semi-JoinSubquery(半连接子查询):对应IN或EXISTS子查询,仅需要检查"外表记录"在"子查询结果集"中是否存在匹配记录,不需要计算"子查询结果集"中记录匹配次数,也不需要返回"子查询结果集"中匹配记录内容在MariaDB(MySQL)中,常用优化Semi-Join(半连接)的策......
  • 动态分库分表策略
    关键字:动态分库分表策略参考网址:http://dragonsoar.iteye.com/blog/1769101其他相关软件:matrixOceanus(不支持spring)matrix没开源所以很多人还是用mycatdiamond里面可以配置读写比读写比权重那个是atom和group的作用吧国美好牛,以前后台ora......