跳到主要内容

一文彻底搞懂WebFlux中的publishOn 和 subscribeOn怎么使用

1. 前言

在学习project reactor的时候,对publishOnsubscribeOn 使用时总是让我觉得有点迷惑,因为经常我期望它并行执行的时候,却又是串行。这篇文章会以多种不同的例子区分两者,彻底搞清楚应该怎么使用。

2. 在开始之前

希望你已经知道了publishOnsubscribeOn 的基本概念。

publishOn用于改变后续操作符执行的线程,它会影响到其后的操作符在哪个线程上执行。

比如说如下代码:

Flux.range(0, 3)
.map(it - > {
System.out.println("当前线程名输出1:" + Thread.currentThread().getName());
return it;
})
.publishOn(Schedulers.newParallel("NEW"))
.map(it - > {
System.out.println("当前线程名输出2:" + Thread.currentThread().getName());
return it;
})
.subscribe();

在第一个map中输出的线程名是main,而在第二个map中输出的是NEW

subscribeOn的作用是在这个开始执行的时候,指定使用的线程调度器。

具体来说,subscribeOn 会影响到整个数据流的起始部分,即从订阅操作符开始,直到数据流的起始发布者(Publisher)的部分。它将决定订阅操作在哪个线程上执行,从而影响整个数据流的执行上下文。

以代码为例子:

Flux.range(0, 3)
.map(it - > {
System.out.println("当前线程名输出1:" + Thread.currentThread().getName());
return it;
})
.map(it - > {
System.out.println("当前线程名输出2:" + Thread.currentThread().getName());
return it;
})
.subscribeOn(Schedulers.newParallel("NEW"))
.subscribe();

虽然我把subscribeOn的操作放在最后,但是在执行的时候,会导致整个流从最开始就在NEW的线程上执行

这是最基本你需要知道的,接下来让我们看点不一样的。

2.1 Mock Server

我们需要一些准备工作,新建一个SpringBoot项目,搭建一个简单的Mock Server 用来模拟我们工作时,请求的第三方服务。

代码只写一个controller,并延迟一秒后进行返回

@SpringBootApplication
@RequestMapping("/")
@Controller
public class TempHttpServerApplication {

public static void main(String[] args) {
SpringApplication.run(TempHttpServerApplication.class, args);
}

@GetMapping("")
@ResponseBody
public String hello() throws InterruptedException {
Thread.sleep(1000);
return "hello SpringBoot";
}

}

3. 从最简单的开始

在下面的代码中,输出从1到10,这是再简单不过的例子了

public static void main(String[] args) {
Flux.range(1, 10)
.subscribe(it -> System.out.println(it));
}

接着我们增加一个executeRequest 方法用来请求我们刚刚的Mock Server,在流执行时我们进行调用,因为我们是在map方法中执行的,所以看输出的日期,显而易见整个流是串行执行的。

public static void main(String[] args) {
Flux.range(1, 10)
.map(it -> executeRequest())
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
2024-02-24T02:59:38.612704Z hello SpringBoot
2024-02-24T02:59:39.626442Z hello SpringBoot
2024-02-24T02:59:40.635633Z hello SpringBoot
2024-02-24T02:59:41.643354Z hello SpringBoot
2024-02-24T02:59:42.646554Z hello SpringBoot
2024-02-24T02:59:43.654952Z hello SpringBoot
2024-02-24T02:59:44.658078Z hello SpringBoot
2024-02-24T02:59:45.662179Z hello SpringBoot
2024-02-24T02:59:46.666218Z hello SpringBoot
2024-02-24T02:59:47.671938Z hello SpringBoot

3.1 .map.flatMap

在上面的那个例子中,如果我们.map改为.flatMap 时,因为flatMap中默认是并行执行的,但是实际在运行时,如果.flatMap中如果为同步操作,指:Mono.just(executeRequest()),会导致整个流变为同步。

public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.flatMap(it -> Mono.just(executeRequest()), 8)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

3.2 增加publishOn

为了让flatMap 中的执行改为并行,这时候有同学就要说了,使用publishOn

可惜的是,在下面这个例子中,虽然使用了publishOn, 让下游线程执行在新的线程组中执行,但是因为flatMap中存在同步操作,指:Mono.just(executeRequest()),所以整个流仍然为同步执行。

public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("REQ-T", 2))
.flatMap(it - > Mono.just(executeRequest()), 2)
.subscribe(it - > {
System.out.println(Instant.now() + " " + it);
});
}

private static String executeRequest() {
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

3.3 如果使用subscribeOn

订阅内部流的执行在其他线程, 且设置flatMap的并行度为2,但是整个流还是同步执行,因为

Mono.just 为意味着热数据,每次会同步的执行去获取,因此执行的时候仍然为同步。

public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("REQ-T", 2))
.log()
, 2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

日志:

11:03:44.820 [main] INFO reactor.Mono.SubscribeOnValue.1 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:44.822 [main] INFO reactor.Mono.SubscribeOnValue.1 -- request(32)
11:03:44.824 [REQ-T-1] INFO reactor.Mono.SubscribeOnValue.1 -- onNext(hello SpringBoot)
2024-02-24T03:03:44.824966Z hello SpringBoot
11:03:44.827 [REQ-T-1] INFO reactor.Mono.SubscribeOnValue.1 -- onComplete()
11:03:45.829 [main] INFO reactor.Mono.SubscribeOnValue.2 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:45.829 [main] INFO reactor.Mono.SubscribeOnValue.2 -- request(32)
11:03:45.829 [REQ-T-2] INFO reactor.Mono.SubscribeOnValue.2 -- onNext(hello SpringBoot)
2024-02-24T03:03:45.830078Z hello SpringBoot
11:03:45.830 [REQ-T-2] INFO reactor.Mono.SubscribeOnValue.2 -- onComplete()
11:03:46.837 [main] INFO reactor.Mono.SubscribeOnValue.3 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:46.837 [main] INFO reactor.Mono.SubscribeOnValue.3 -- request(32)
11:03:46.838 [REQ-T-3] INFO reactor.Mono.SubscribeOnValue.3 -- onNext(hello SpringBoot)
2024-02-24T03:03:46.838210Z hello SpringBoot
11:03:46.838 [REQ-T-3] INFO reactor.Mono.SubscribeOnValue.3 -- onComplete()
11:03:47.848 [main] INFO reactor.Mono.SubscribeOnValue.4 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:47.849 [main] INFO reactor.Mono.SubscribeOnValue.4 -- request(32)
11:03:47.849 [REQ-T-4] INFO reactor.Mono.SubscribeOnValue.4 -- onNext(hello SpringBoot)
2024-02-24T03:03:47.850130Z hello SpringBoot
11:03:47.850 [REQ-T-4] INFO reactor.Mono.SubscribeOnValue.4 -- onComplete()
11:03:48.861 [main] INFO reactor.Mono.SubscribeOnValue.5 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:48.861 [main] INFO reactor.Mono.SubscribeOnValue.5 -- request(32)
11:03:48.861 [REQ-T-5] INFO reactor.Mono.SubscribeOnValue.5 -- onNext(hello SpringBoot)
2024-02-24T03:03:48.861965Z hello SpringBoot
11:03:48.862 [REQ-T-5] INFO reactor.Mono.SubscribeOnValue.5 -- onComplete()
11:03:49.871 [main] INFO reactor.Mono.SubscribeOnValue.6 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:49.871 [main] INFO reactor.Mono.SubscribeOnValue.6 -- request(32)
11:03:49.871 [REQ-T-6] INFO reactor.Mono.SubscribeOnValue.6 -- onNext(hello SpringBoot)
2024-02-24T03:03:49.871636Z hello SpringBoot
11:03:49.871 [REQ-T-6] INFO reactor.Mono.SubscribeOnValue.6 -- onComplete()
11:03:50.879 [main] INFO reactor.Mono.SubscribeOnValue.7 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:50.879 [main] INFO reactor.Mono.SubscribeOnValue.7 -- request(32)
11:03:50.880 [REQ-T-7] INFO reactor.Mono.SubscribeOnValue.7 -- onNext(hello SpringBoot)
2024-02-24T03:03:50.880186Z hello SpringBoot
11:03:50.880 [REQ-T-7] INFO reactor.Mono.SubscribeOnValue.7 -- onComplete()
11:03:51.884 [main] INFO reactor.Mono.SubscribeOnValue.8 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:51.885 [main] INFO reactor.Mono.SubscribeOnValue.8 -- request(32)
11:03:51.885 [REQ-T-8] INFO reactor.Mono.SubscribeOnValue.8 -- onNext(hello SpringBoot)
2024-02-24T03:03:51.885495Z hello SpringBoot
11:03:51.885 [REQ-T-8] INFO reactor.Mono.SubscribeOnValue.8 -- onComplete()
11:03:52.895 [main] INFO reactor.Mono.SubscribeOnValue.9 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:52.896 [main] INFO reactor.Mono.SubscribeOnValue.9 -- request(32)
11:03:52.896 [REQ-T-9] INFO reactor.Mono.SubscribeOnValue.9 -- onNext(hello SpringBoot)
2024-02-24T03:03:52.896721Z hello SpringBoot
11:03:52.896 [REQ-T-9] INFO reactor.Mono.SubscribeOnValue.9 -- onComplete()
11:03:53.903 [main] INFO reactor.Mono.SubscribeOnValue.10 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:53.904 [main] INFO reactor.Mono.SubscribeOnValue.10 -- request(32)
11:03:53.904 [REQ-T-10] INFO reactor.Mono.SubscribeOnValue.10 -- onNext(hello SpringBoot)
2024-02-24T03:03:53.904910Z hello SpringBoot
11:03:53.905 [REQ-T-10] INFO reactor.Mono.SubscribeOnValue.10 -- onComplete()

3.4 publishOn+subscribeOn

我们都知道publishOn 针对的是后面流所执行的操作,因为executeRequest为同步,且使用的是Mono.just,所以整个流还是同步。

public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("publishOn-T", 2))
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
,2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

3.5 关键的Mono.fromCallable

下面这个例子终于为异步请求了, 且控制并发度由flatMap的第二个参数控制,此时并发度为2

public static void main(String[] args) {
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
, 2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

此时并发度为8,由flatMap的第二个参数控制

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
, 8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

3.6 Schedulers.newParallel 和 Schedulers.single 的区别

使用.subscribeOn(Schedulers.single()) ,为串行执行

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.single())
,8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

使用Schedulers.newParallel("subscribeOn-T", 1) 为并行执行

public static void main(String[] args) {
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 1))
,8)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

其原因在于使用 Schedulers.single() 时它会将每个任务放入一个共享的单线程池中执行。这意味着任务将按顺序一个接一个地执行,因为它们都在同一个线程上执行。

3.7 Mono.deferFlux.deferMono.*fromSupplier

除了使用Mono.fromCallable ,可以使用Mono.defer ,或者Flux.deferMono.fromSupplier

这些的本质在于对处理executeRequest 时当作冷数据进行处理,只有在真正调用的时候才会进行执行。

什么,你说你还不明白冷数据和热数据的区别? Flux.just(new Date(), new Date(), new Date()) 在产生订阅事件时,时间就已经确定了。

Flux.defer(() -> Mono.just(new Date())).repeat(2) 只有在执行到这个方法时,才会计算当前的时间

Mono.defer 例子

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.defer(() -> Mono.just(executeRequest()))
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 1))
,8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

Mono.fromSupplier 例子

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromSupplier(() -> executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 1))
,8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

3.8 subscribeOn 的使用

subscribeOn其核心在于当订阅的这个事件为同步操作,我们期望改为异步时,使用subscribeOn ,比如IO 操作,save DB,包括订阅内部流的执行。

Mono.fromCallable(()->{
return Mono.just(mongoTemplate.insertOne(xxx))
})
.subscribeOn(Schedulers.boundedElastic())

希望上面的例子已经让你搞懂subscribeOn 的使用

4. publishOn

publishOn 把后续的操作切换到新的调度线程池中,flatMap 并发数默认和电脑核心数相关,下面以一个新的例子出发,我们需要更改一下mock server的响应为这样: 更改了一下mock server的响应

@SpringBootApplication
@RequestMapping("/")
@Controller
public class TempHttpServerApplication {

public static void main(String[] args) {
SpringApplication.run(TempHttpServerApplication.class, args);
}

@GetMapping("")
@ResponseBody
public String hello() throws InterruptedException {
Thread.sleep(1000);
return new Date().toString();
}

}

接着这是我们的测试代码,在没有使用publishOn 的情况下,subscribe 中睡眠3秒,然后输出mock server的返回

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)

.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newSingle("REQ-T"))
,3)
// .publishOn(Schedulers.newSingle("SOUT-T"))
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

在上面这个例子中,如果没有publishOn,那么得到的日志是:

REQ-T-3 2024-02-24T03:16:40.634093Z Sat Feb 24 11:16:37 CST 2024
REQ-T-3 2024-02-24T03:16:43.645577Z Sat Feb 24 11:16:37 CST 2024
REQ-T-3 2024-02-24T03:16:46.646355Z Sat Feb 24 11:16:37 CST 2024
REQ-T-5 2024-02-24T03:16:50.673382Z Sat Feb 24 11:16:47 CST 2024
REQ-T-5 2024-02-24T03:16:53.674860Z Sat Feb 24 11:16:47 CST 2024
REQ-T-5 2024-02-24T03:16:56.680422Z Sat Feb 24 11:16:47 CST 2024
REQ-T-7 2024-02-24T03:17:00.693387Z Sat Feb 24 11:16:57 CST 2024
REQ-T-7 2024-02-24T03:17:03.695553Z Sat Feb 24 11:16:57 CST 2024
REQ-T-7 2024-02-24T03:17:06.701117Z Sat Feb 24 11:16:57 CST 2024
REQ-T-10 2024-02-24T03:17:10.716434Z Sat Feb 24 11:17:07 CST 2024

给你一分钟的时间想想,为什么10个请求的响应似乎被分成了4组,而每组的响应时间一样,相邻两组的响应差距时间是10秒

因为如果没有publishOn切换到新的线程组执行,那么意味着.subscribe(it -> { 中的代码也是由

Schedulers.newSingle("REQ-T") 中的单个线程执行,那么在flatMap 中会同时发出3个请求,在服务端会睡眠1秒,而在subscribe 中执行时,每个请求会睡眠3秒,

所以10秒=1秒(服务端响应)+3(个请求) * 3(每个请求睡眠3秒)

而由于publishOn 的特性,我们一般可以这样使用,比如切换到新的线程组进行insert

Mono<Void> fluxToBlockingRepository(Flux<User> flux,
BlockingRepository<User> repository) {
return flux
.publishOn(Schedulers.elastic())
.doOnNext(repository::save)
.then();
}

4.1 publishOn 的使用场景

再接着增加难度,模拟一个工作中的场景,经常会先请求1,2个接口的数据进行组装,然后再请求其他接口,如下:

先在第一个flatMap中执行一次executeRequest

然后通过publishOn切换到"BBB-T" 线程组执行,给定60个线程,

接着在第二个flatMap 中执行executeRequest ,这是模拟拼装好数据后再请求其他接口,此时整个流的执行是并行,还是串行?还是部分并行、部分串行?

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
, 3)
.doOnNext(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
})
.publishOn(Schedulers.newParallel("BBB-T", 60))
.flatMap(item -> {
return Mono.fromCallable(() -> executeRequest());
}, 2)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}

执行后输出日志如下:

AAA-T-3 2024-02-21T06:16:36.428794Z Wed Feb 21 14:16:36 CST 2024
AAA-T-3 2024-02-21T06:16:36.434086Z Wed Feb 21 14:16:36 CST 2024
AAA-T-3 2024-02-21T06:16:36.434188Z Wed Feb 21 14:16:36 CST 2024
AAA-T-6 2024-02-21T06:16:37.440315Z Wed Feb 21 14:16:37 CST 2024
BBB-T-1 2024-02-21T06:16:37.440402Z Wed Feb 21 14:16:37 CST 2024
AAA-T-4 2024-02-21T06:16:37.443843Z Wed Feb 21 14:16:37 CST 2024
AAA-T-4 2024-02-21T06:16:37.444355Z Wed Feb 21 14:16:37 CST 2024
BBB-T-1 2024-02-21T06:16:38.445792Z Wed Feb 21 14:16:38 CST 2024
AAA-T-9 2024-02-21T06:16:38.448931Z Wed Feb 21 14:16:38 CST 2024
AAA-T-9 2024-02-21T06:16:38.449036Z Wed Feb 21 14:16:38 CST 2024
AAA-T-8 2024-02-21T06:16:38.453588Z Wed Feb 21 14:16:38 CST 2024
BBB-T-1 2024-02-21T06:16:39.450717Z Wed Feb 21 14:16:39 CST 2024
AAA-T-10 2024-02-21T06:16:39.455242Z Wed Feb 21 14:16:39 CST 2024
BBB-T-1 2024-02-21T06:16:40.456177Z Wed Feb 21 14:16:40 CST 2024
BBB-T-1 2024-02-21T06:16:41.465664Z Wed Feb 21 14:16:41 CST 2024
BBB-T-1 2024-02-21T06:16:42.476952Z Wed Feb 21 14:16:42 CST 2024
BBB-T-1 2024-02-21T06:16:43.485045Z Wed Feb 21 14:16:43 CST 2024
BBB-T-1 2024-02-21T06:16:44.494117Z Wed Feb 21 14:16:44 CST 2024
BBB-T-1 2024-02-21T06:16:45.502331Z Wed Feb 21 14:16:45 CST 2024
BBB-T-1 2024-02-21T06:16:46.511551Z Wed Feb 21 14:16:46 CST 2024

从日志中可以看出,第一个flatMap中的执行是并行的,一次3个请求,但是在第二个flatMap中的执行是串行的,BBB-T-1 线程的打印日志显示,每隔一秒进行请求一次。

这时候有聪明的同学肯定就要说了:因为你的第二个flatMap没有使用subscribeOn

是的,如果加上subscribeOn ,代码如下:

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
,3)
.publishOn(Schedulers.newParallel("BBB-T", 60))
.flatMap(item -> {
return Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("CCC-T"));
}, 2)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}

那我问你,最后subscribe 中的打印,是在哪个线程组中进行的?

"AAA-T"、 "BBB-T"、 "CCC-T"

答案是:"CCC-T"

因为第二个flatMap 中的内部订阅流再一次切换了后续的执行线程组

CCC-T-8 2024-02-21T06:24:38.150541Z Wed Feb 21 14:24:38 CST 2024
CCC-T-8 2024-02-21T06:24:38.157196Z Wed Feb 21 14:24:38 CST 2024
CCC-T-13 2024-02-21T06:24:39.163621Z Wed Feb 21 14:24:39 CST 2024
CCC-T-12 2024-02-21T06:24:39.164459Z Wed Feb 21 14:24:39 CST 2024
CCC-T-16 2024-02-21T06:24:40.172837Z Wed Feb 21 14:24:40 CST 2024
CCC-T-16 2024-02-21T06:24:40.173081Z Wed Feb 21 14:24:40 CST 2024
CCC-T-18 2024-02-21T06:24:41.180035Z Wed Feb 21 14:24:41 CST 2024
CCC-T-17 2024-02-21T06:24:41.183437Z Wed Feb 21 14:24:41 CST 2024
CCC-T-19 2024-02-21T06:24:42.186140Z Wed Feb 21 14:24:42 CST 2024
CCC-T-20 2024-02-21T06:24:42.187466Z Wed Feb 21 14:24:42 CST 2024

最后我们再来看一种写法,判断第二个flatMap 的执行是串行还是并行的,

这次我们不使用subscribeOn 订阅内部流,而是使用publishOn 通知下游执行的流在"BBB-T" 线程组执行,但是executeRequest 方法是阻塞的

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
, 3)
.flatMap(item -> {
return Mono.just(item)
.publishOn(Schedulers.newSingle("BBB-T"))
.map(code -> {
return executeRequest();
});
}, 2)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}

打印的日志如下:

BBB-T-5 2024-02-21T06:30:10.067077Z Wed Feb 21 14:30:10 CST 2024
BBB-T-5 2024-02-21T06:30:10.075096Z Wed Feb 21 14:30:10 CST 2024
BBB-T-10 2024-02-21T06:30:11.084870Z Wed Feb 21 14:30:11 CST 2024
BBB-T-10 2024-02-21T06:30:11.085104Z Wed Feb 21 14:30:11 CST 2024
BBB-T-14 2024-02-21T06:30:12.096516Z Wed Feb 21 14:30:12 CST 2024
BBB-T-14 2024-02-21T06:30:12.099821Z Wed Feb 21 14:30:12 CST 2024
BBB-T-16 2024-02-21T06:30:13.104845Z Wed Feb 21 14:30:13 CST 2024
BBB-T-18 2024-02-21T06:30:13.109858Z Wed Feb 21 14:30:13 CST 2024
BBB-T-19 2024-02-21T06:30:14.111670Z Wed Feb 21 14:30:14 CST 2024
BBB-T-20 2024-02-21T06:30:14.119100Z Wed Feb 21 14:30:14 CST 2024

第二个flatMap 的执行是并行的!没想到吧

如果你把subscribeOn 当作一个特殊的publishOn 看是不是就马上明白了呢,两者本质都是让事件切换到新的线程组进行执行,subscribeOn 是从订阅开始影响整个流,而 publishOn 是只影响后续的流

4.2 最后再测试一下

还不过瘾,再来一个验证一下自己是否真的明白了subscribeOnpublishOn 的作用,看下面这段代码,回答:

第二个flatMap 中执行executeRequest 方法时发生在哪个线程组,是串行还是并行,如果是并行,并行度是一次几个?

public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
, 3)
.publishOn(Schedulers.newSingle("BBB-T"))
.flatMap(item -> {
return Mono.just(item)
.publishOn(Schedulers.newSingle("CCC-T"))
.map(code -> {
return executeRequest();
})
.subscribeOn(Schedulers.newSingle("DDD-T"));
}, 100)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}

答案:base64解码后查看

Q0NDLVQtMTMgMjAyNC0wMi0yNFQwMzoyODo1My40Njk3NzlaIFNhdCBGZWIgMjQgMTE6Mjg6NTMgQ1NUIDIwMjQKQ0NDLVQtMTMgMjAyNC0wMi0yNFQwMzoyODo1My40NzY1MTZaIFNhdCBGZWIgMjQgMTE6Mjg6NTMgQ1NUIDIwMjQKQ0NDLVQtMTMgMjAyNC0wMi0yNFQwMzoyODo1My40NzY2MTlaIFNhdCBGZWIgMjQgMTE6Mjg6NTMgQ1NUIDIwMjQKQ0NDLVQtMTggMjAyNC0wMi0yNFQwMzoyODo1NC40NzcyOTFaIFNhdCBGZWIgMjQgMTE6Mjg6NTQgQ1NUIDIwMjQKQ0NDLVQtMTggMjAyNC0wMi0yNFQwMzoyODo1NC40Nzc1MzhaIFNhdCBGZWIgMjQgMTE6Mjg6NTQgQ1NUIDIwMjQKQ0NDLVQtMjIgMjAyNC0wMi0yNFQwMzoyODo1NC40ODA4NDdaIFNhdCBGZWIgMjQgMTE6Mjg6NTQgQ1NUIDIwMjQKQ0NDLVQtMjUgMjAyNC0wMi0yNFQwMzoyODo1NS40ODkyMjhaIFNhdCBGZWIgMjQgMTE6Mjg6NTUgQ1NUIDIwMjQKQ0NDLVQtMjcgMjAyNC0wMi0yNFQwMzoyODo1NS40OTAxMDNaIFNhdCBGZWIgMjQgMTE6Mjg6NTUgQ1NUIDIwMjQKQ0NDLVQtMjkgMjAyNC0wMi0yNFQwMzoyODo1NS40OTE3NDVaIFNhdCBGZWIgMjQgMTE6Mjg6NTUgQ1NUIDIwMjQKQ0NDLVQtMzEgMjAyNC0wMi0yNFQwMzoyODo1Ni41MDQzMjRaIFNhdCBGZWIgMjQgMTE6Mjg6NTYgQ1NUIDIwMjQK

5. 相关参考

Project Reactor 之 publishOn 与 subscribeOn

practical-reactor

Project Reactor源码解析publishOn使用示例