首先看以下代码:
Flux.range(1, 1_0000)
.doOnNext(it -> {
System.out.println(Thread.currentThread().getName());
})
.map(String::valueOf)
.filter(s -> {
System.out.println("filter:" + Thread.currentThread().getName());
return s.length() > 1;
})
.publishOn(Schedulers.newParallel("child-thread", 10))
.map(this::calculateHash)
.subscribe();
你觉得打印child-thread的线程会有几个?
实际答案是只会有1个

这是因为.publishOn
是对于订阅端来说的,在这段代码中,只有一个subscriber,所以即使设置了Schedulers.newParallel("child-thread", 10)
但是实际只会有一个异步线程执行。
要让.publishOn(Schedulers.newParallel("child-thread", 10))
那么意味着应该创建多个subscriber,正确的代码如下:
@SneakyThrows
@Test
public void publishOn() {
final Flux<String> testFlux = Flux.range(1, 1_0000)
.doOnNext(it -> {
System.out.println(Thread.currentThread().getName());
})
.map(String::valueOf)
.filter(s -> {
System.out.println("filter:" + Thread.currentThread().getName());
return s.length() > 1;
})
.publishOn(Schedulers.newParallel("child-thread", 10)) // (5)
.map(this::calculateHash);
final Disposable subscribe1 = testFlux.subscribe(System.out::println);
final Disposable subscribe2 = testFlux.subscribe(System.out::println);
Thread.sleep(10000 * 30);
}
这里创建了两个subscriber, subscribe1和subscribe2,执行的话就可以看到两个异步线程在各自消费自己的响应式流
