首页 > 其他分享 >Reactor之Schedulers,publishOn 和 subscribeOn

Reactor之Schedulers,publishOn 和 subscribeOn

时间:2023-05-08 20:36:45浏览次数:48  
标签:Schedulers name Thread subscribeOn 线程 publishOn

Schedulers类似Executor,是Reactor的线程调度接口。提供以下几种线程执行环境:

  • 当前线程(Schedulers.immediate());
  • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle();
  • 弹性线程池(Schedulers.elastic())。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源;
  • 固定大小线程池(Schedulers.parallel()),所创建线程池的大小与CPU个数等同;
  • 自定义线程池(Schedulers.fromExecutorService(ExecutorService))基于自定义的ExecutorService创建 Scheduler(虽然不太建议,不过你也可以使用Executor来创建)。

Schedulers类已经预先创建了几种常用的线程池:使用single()、elastic()和parallel()方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()、newElastic()和newParallel()方法。

Executors提供的几种线程池在Reactor中都支持:

  • Schedulers.single()Schedulers.newSingle()对应Executors.newSingleThreadExecutor()
  • Schedulers.elastic()Schedulers.newElastic()对应Executors.newCachedThreadPool()
  • Schedulers.parallel()Schedulers.newParallel()对应Executors.newFixedThreadPool()

Schedulers提供的以上三种调度器底层都是基于ScheduledExecutorService的,因此都是支持任务定时和周期性执行的;
Flux和Mono的调度操作符subscribeOnpublishOn支持work-stealing

 

最新版本中elastic被废弃了,重新提供了boundedElastic

Schedulers#boundedElastic

public static Scheduler boundedElastic() {
	return cache(CACHED_BOUNDED_ELASTIC, BOUNDED_ELASTIC, BOUNDED_ELASTIC_SUPPLIER);
}

static final Supplier<Scheduler> BOUNDED_ELASTIC_SUPPLIER =
		() -> newBoundedElastic(DEFAULT_BOUNDED_ELASTIC_SIZE, DEFAULT_BOUNDED_ELASTIC_QUEUESIZE,
				BOUNDED_ELASTIC, BoundedElasticScheduler.DEFAULT_TTL_SECONDS, true);

DEFAULT_BOUNDED_ELASTIC_SIZE表示全局bounddElastic()调度器的最大线程数,DEFAULT_BOUNDED_ELASTIC_SIZE由属性reactor.schedulers.defaultBoundedElasticSize设置,若未设置则初始化为10倍处理器数。
 
DEFAULT_BOUNDED_ELASTIC_QUEUESIZE表示全局bounddElastic()调度器的无法创建更多线程时要排队的最大任务数。DEFAULT_BOUNDED_ELASTIC_QUEUESIZE由属性reactor.schedulers.defaultBoundedElasticQueueSize设置,若未设置则初始化为100000。

publishOn 和 subscribeOn

publishOn 和 subscribeOn都是在指定的Scheduler中运行。当某些操作执行慢,阻碍运行速度时可以在指定的Scheduler中执行。

@Test
public void testPublishOn() {
    Flux.just("tom")
            .map(s -> {
                System.out.println("[map] Thread name: " + Thread.currentThread().getName());
                return s.concat("@mail.com");
            })
            .publishOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-publishOn"))
            .filter(s -> {
                System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
                return s.startsWith("t");
            })
            .doOnNext((t) -> {
                System.out.println("[ doOnNext ] Thread name:" + Thread.currentThread().getName());
            })
            .subscribeOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-subscribeOn"))
            .subscribe(s -> {
                System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
                System.out.println(s);
            });
}

执行结果如下:

[map] Thread name: thread-subscribeOn-1
[filter] Thread name: thread-publishOn-2
[ doOnNext ] Thread name:thread-publishOn-2
[subscribe] Thread name: thread-publishOn-2
tom@mail.com

可以看到map操作在subscribeOn设置的Schedulers中运行,从publishOn以后都是在publishOn设置的Schedulers中运行,即使是subscribeOn操作后面的操作。从上面可知subscribeOn从开头开始影响操作所在的线程,从publishOn操作之后所有的操作都在publishOn设置的Schedulers中运行。

@Test
public void testPublishOn1() {
    Flux.just("tom")
            .map(s -> {
                System.out.println("[map] Thread name: " + Thread.currentThread().getName());
                return s.concat("@mail.com");
            })
            .publishOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-publishOn"))
            .filter(s -> {
                System.out.println("[filter] Thread name: " + Thread.currentThread().getName());
                return s.startsWith("t");
            })
            .doOnNext((t) -> {
                System.out.println("[ doOnNext ] Thread name:" + Thread.currentThread().getName());
            })
            .subscribeOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-subscribeOn"))
            .doOnNext((t) -> {
                System.out.println("[ doOnNext1 ] Thread name:" + Thread.currentThread().getName());
            })
            .subscribeOn(Schedulers.newBoundedElastic(Runtime.getRuntime().availableProcessors(), 1000,"thread-000"))
            .doOnNext((t) -> {
                System.out.println("[ doOnNext2 ] Thread name:" + Thread.currentThread().getName());
            })
            .subscribe(s -> {
                System.out.println("[subscribe] Thread name: " + Thread.currentThread().getName());
                System.out.println(s);
            });
}

执行结果如下:

[map] Thread name: thread-subscribeOn-2
[filter] Thread name: thread-publishOn-3
[ doOnNext ] Thread name:thread-publishOn-3
[ doOnNext1 ] Thread name:thread-publishOn-3
[ doOnNext2 ] Thread name:thread-publishOn-3
[subscribe] Thread name: thread-publishOn-3
tom@mail.com

第二个subscribeOn不起作用。只有第一个subscribeOn才有作用。

 
 

参考自:https://blog.csdn.net/get_set/article/details/79480172

标签:Schedulers,name,Thread,subscribeOn,线程,publishOn
From: https://www.cnblogs.com/shigongp/p/17382971.html

相关文章