Schedulers类似Executor,是Reactor的线程调度接口。提供以下几种线程执行环境:
- 当前线程(
Schedulers.immediate()
); - 可重用的单线程(
Schedulers.single()
)。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle(); - 弹性线程池(
Schedulers.elastic()
)。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源; - 固定大小线程池(
Schedulers.parallel()
),所创建线程池的大小与CPU个数等同; - 自定义线程池(
Schedulers.fromExecutorService(ExecutorService)
)基于自定义的ExecutorService创建 Scheduler(虽然不太建议,不过你也可以使用Executor来创建)。
Schedulers类已经预先创建了几种常用的线程池:使用single()、elastic()和parallel()方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()、newElastic()和newParallel()方法。
Executors提供的几种线程池在Reactor中都支持:
Schedulers.single()
和Schedulers.newSingle()
对应Executors.newSingleThreadExecutor()
;Schedulers.elastic()
和Schedulers.newElastic()
对应Executors.newCachedThreadPool()
;Schedulers.parallel()
和Schedulers.newParallel()
对应Executors.newFixedThreadPool()
;
Schedulers提供的以上三种调度器底层都是基于ScheduledExecutorService
的,因此都是支持任务定时和周期性执行的;
Flux和Mono的调度操作符subscribeOn
和publishOn
支持work-stealing
。
最新版本中elastic被废弃了,重新提供了boundedElastic
。
Schedulers#boundedElastic
public static Scheduler boundedElastic() {
return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
}
static final Supplier<Scheduler> BOUNDED_ELASTIC_SUPPLIER =
() -> newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
BOUNDED_ELASTIC, BoundedElasticScheduler.DEFAULT_TTL_SECONDS, true);
DEFAULT_BOUNDED_ELASTIC_SIZE表示全局bounddElastic()调度器的最大线程数,DEFAULT_BOUNDED_ELASTIC_SIZE由属性reactor.schedulers.defaultBoundedElasticSize设置,若未设置则初始化为10倍处理器数。
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE表示全局bounddElastic()调度器的无法创建更多线程时要排队的最大任务数。DEFAULT_BOUNDED_ELASTIC_QUEUESIZE由属性reactor.schedulers.defaultBoundedElasticQueueSize设置,若未设置则初始化为100000。
publishOn 和 subscribeOn
publishOn 和 subscribeOn都是在指定的Scheduler
中运行。当某些操作执行慢,阻碍运行速度时可以在指定的Scheduler中执行。
@Test
public void testPublishOn() {
Flux.just("tom")
.map(s -> {
System.out.println("[map] Thread name: " + Thread.currentThread().getName());
return s.concat("@mail.com");
})
.publishOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-publishOn"))
.filter(s -> {
System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
return s.startsWith("t");
})
.doOnNext((t) -> {
System.out.println("[ doOnNext ] Thread name:" + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-subscribeOn"))
.subscribe(s -> {
System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
});
}
执行结果如下:
[map] Thread name: thread-subscribeOn-1
[filter] Thread name: thread-publishOn-2
[ doOnNext ] Thread name:thread-publishOn-2
[subscribe] Thread name: thread-publishOn-2
tom@mail.com
可以看到map操作在subscribeOn设置的Schedulers中运行,从publishOn以后都是在publishOn设置的Schedulers中运行,即使是subscribeOn操作后面的操作。从上面可知subscribeOn从开头开始影响操作所在的线程,从publishOn操作之后所有的操作都在publishOn设置的Schedulers中运行。
@Test
public void testPublishOn1() {
Flux.just("tom")
.map(s -> {
System.out.println("[map] Thread name: " + Thread.currentThread().getName());
return s.concat("@mail.com");
})
.publishOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-publishOn"))
.filter(s -> {
System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
return s.startsWith("t");
})
.doOnNext((t) -> {
System.out.println("[ doOnNext ] Thread name:" + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-subscribeOn"))
.doOnNext((t) -> {
System.out.println("[ doOnNext1 ] Thread name:" + Thread.currentThread().getName());
})
.subscribeOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-000"))
.doOnNext((t) -> {
System.out.println("[ doOnNext2 ] Thread name:" + Thread.currentThread().getName());
})
.subscribe(s -> {
System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
System.out.println(s);
});
}
执行结果如下:
[map] Thread name: thread-subscribeOn-2
[filter] Thread name: thread-publishOn-3
[ doOnNext ] Thread name:thread-publishOn-3
[ doOnNext1 ] Thread name:thread-publishOn-3
[ doOnNext2 ] Thread name:thread-publishOn-3
[subscribe] Thread name: thread-publishOn-3
tom@mail.com
第二个subscribeOn不起作用。只有第一个subscribeOn才有作用。
参考自:https://blog.csdn.net/get_set/article/details/79480172
标签:Schedulers,name,Thread,subscribeOn,线程,publishOn From: https://www.cnblogs.com/shigongp/p/17382971.html