为什么响应式编程中.publishOn设置了并行多线程,但是实际的异步线程却只有一个

首先看以下代码:

Flux.range(1, 1_0000)
    .doOnNext(it -> {
        System.out.println(Thread.currentThread().getName());
    })
    .map(String::valueOf)
    .filter(s -> {
        System.out.println("filter:" + Thread.currentThread().getName());
        return s.length() > 1;
    })
    .publishOn(Schedulers.newParallel("child-thread", 10))
    .map(this::calculateHash)
    .subscribe();

你觉得打印child-thread的线程会有几个?

实际答案是只会有1个

https://runnable.oss-cn-guangzhou.aliyuncs.com/blog/2023-07-23-143631.png

这是因为.publishOn 是对于订阅端来说的,在这段代码中,只有一个subscriber,所以即使设置了Schedulers.newParallel("child-thread", 10) 但是实际只会有一个异步线程执行。

要让.publishOn(Schedulers.newParallel("child-thread", 10)) 那么意味着应该创建多个subscriber,正确的代码如下:

@SneakyThrows
@Test
public void publishOn() {
    final Flux<String> testFlux = Flux.range(1, 1_0000)
            .doOnNext(it -> {
                System.out.println(Thread.currentThread().getName());
            })
            .map(String::valueOf)
            .filter(s -> {
                System.out.println("filter:" + Thread.currentThread().getName());
                return s.length() > 1;
            })
            .publishOn(Schedulers.newParallel("child-thread", 10)) // (5)
            .map(this::calculateHash);

    final Disposable subscribe1 = testFlux.subscribe(System.out::println);
    final Disposable subscribe2 = testFlux.subscribe(System.out::println);
    Thread.sleep(10000 * 30);
}

这里创建了两个subscriber, subscribe1和subscribe2,执行的话就可以看到两个异步线程在各自消费自己的响应式流

https://runnable.oss-cn-guangzhou.aliyuncs.com/blog/2023-07-23-144044.png

评论