背压就是流量控制。Reactor提供的背压策略由OverflowStrategy枚举指定:
- IGNORE:完全忽略下游背压请求。
- ERROR:当下游无法跟上时,发出IllegalStateException信号。
- DROP:如果下游没有准备好接收传入信号,则丢弃传入。
- LATEST:下游将仅获得来自上游的最新信号。
- BUFFER:如果下游跟不上,缓冲所有信号。
@Test
public void test() {
CountDownLatch countDownLatch = new CountDownLatch(1);
final int[] a = {0};
Flux.push(t-> {
for (int i=0;i<10;i++){
t.next(a[0]++);
try {
TimeUnit.MICROSECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
System.out.println("generate thread:"+Thread.currentThread().getName());
t.complete();
}, FluxSink.OverflowStrategy.BUFFER)
.publishOn(Schedulers.newSingle("publish-thread-"),1)
.subscribeOn(Schedulers.newSingle("subscribe-thread-"))
.subscribe(new Subscriber<Object>() {
private Subscription subscription = null;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(Object o) {
System.out.println(Thread.currentThread().getName()+":消费数据:"+o);
try {
TimeUnit.MICROSECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
this.subscription.request(1);
}
@Override
public void one rror(Throwable throwable) {
System.out.println("出现错误");
throwable.printStackTrace();
countDownLatch.countDown();
}
@Override
public void onComplete() {
System.out.println("Complete");
countDownLatch.countDown();
}
});
try {
countDownLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Flux.push是发布者,睡眠10毫秒在生产下一个数字,订阅者是睡眠30毫秒才向上游请求数据。来模拟快的发布者,慢的订阅者。同时用publishOn和subscribeOn模拟发布者和订阅者在不同线程。记得publishOn的第二个参数(预取个数设置为1)
。
Flux.push第二个参数是背压策略。这里设置为FluxSink.OverflowStrategy.BUFFER,执行结果:
publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:1
publish-thread--2:消费数据:2
publish-thread--2:消费数据:3
publish-thread--2:消费数据:4
publish-thread--2:消费数据:5
publish-thread--2:消费数据:6
publish-thread--2:消费数据:7
publish-thread--2:消费数据:8
publish-thread--2:消费数据:9
Complete
所有的数据都消费了。
将Flux.push第二个参数设置为FluxSink.OverflowStrategy.DROP,执行结果:
publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:5
Complete
除了0和5被消费外其他都被丢弃。
将Flux.push第二个参数设置为FluxSink.OverflowStrategy.LATEST,执行结果:
publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:4
publish-thread--2:消费数据:9
Complete
每次向上游请求时都是请求最新的数据。在这里是0,4,9。
将Flux.push第二个参数设置为FluxSink.OverflowStrategy.ERROR,执行结果:
publish-thread--2:消费数据:0
出现错误
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:224)
at reactor.core.publisher.FluxCreate$ErrorAsyncSink.onOverflow(FluxCreate.java:708)
at reactor.core.publisher.FluxCreate$NoOverflowBaseAsyncSink.next(FluxCreate.java:673)
at com.example.FluxTest.lambda$test$71(FluxTest.java:609)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
抛出异常:reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
将Flux.push第二个参数设置为FluxSink.OverflowStrategy.IGNORE,执行结果:
publish-thread--2:消费数据:0
generate thread:subscribe-thread--1
publish-thread--2:消费数据:1
出现错误
reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:237)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:233)
at reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:639)
at com.example.FluxTest.lambda$test$71(FluxTest.java:609)
at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:95)
at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
抛出异常:reactor.core.Exceptions$OverflowException: Queue is full: Reactive Streams source doesn't respect backpressure
将Flux.push第二个参数去掉,Reactor还提供了背压的操作来实现背压策略:
- onBackpressureBuffer()实现了FluxSink.OverflowStrategy.BUFFER
- onBackpressureDrop()实现了FluxSink.OverflowStrategy.DROP
- onBackpressureLatest()实现了FluxSink.OverflowStrategy.LATEST
- onBackpressureError()实现了FluxSink.OverflowStrategy.ERROR
Reactor文档的示意图更加直观:
onBackpressureBuffer,对于来自其下游的request采取“缓存”策略。
onBackpressureDrop,元素就绪时,根据下游是否有未满足的request来判断是否发出当前元素。
onBackpressureLatest,当有新的request到来的时候,将最新的元素发出。
onBackpressureError,当有多余元素就绪时,发出错误信号。
参考自:https://blog.csdn.net/get_set/article/details/79562900
标签:core,java,reactor,thread,--,背压,publish,策略 From: https://www.cnblogs.com/shigongp/p/17389177.html