一文彻底搞懂WebFlux中的publishOn 和 subscribeOn怎么使用
1. 前言
在学习project reactor的时候,对publishOn
和 subscribeOn
使用时总是让我觉得有点迷惑,因为经常我期望它并行执行的时候,却又是串行。这篇文章会以多种不同的例子区分两者,彻底搞清楚应该怎么使用。
2. 在开始之前
希望你已经知道了publishOn
和subscribeOn
的基本概念。
publishOn用于改变后续操作符执行的线程,它会影响到其后的操作符在哪个线程上执行。
比如说如下代码:
Flux.range(0, 3)
.map(it - > {
System.out.println("当前线程名输出1:" + Thread.currentThread().getName());
return it;
})
.publishOn(Schedulers.newParallel("NEW"))
.map(it - > {
System.out.println("当前线程名输出2:" + Thread.currentThread().getName());
return it;
})
.subscribe();
在第一个map中输出的线程名是main,而在第二个map中输出的是NEW
subscribeOn的作用是在这个开始执行的时候,指定使用的线程调度器。
具体来说,subscribeOn 会影响到整个数据流的起始部分,即从订阅操作符开始,直到数据流的起始发布者(Publisher)的部分。它将决定订阅操作在哪个线程上执行,从而影响整个数据流的执行上下文。
以代码为例子:
Flux.range(0, 3)
.map(it - > {
System.out.println("当前线程名输出1:" + Thread.currentThread().getName());
return it;
})
.map(it - > {
System.out.println("当前线程名输出2:" + Thread.currentThread().getName());
return it;
})
.subscribeOn(Schedulers.newParallel("NEW"))
.subscribe();
虽然我把subscribeOn的操作放在最后,但是在执行的时候,会导致整个流从最开始就在NEW的线程上执行
这是最基本你需要知道的,接下来让我们看点不一样的。
2.1 Mock Server
我们需要一些准备工作,新建一个SpringBoot项目,搭建一个简单的Mock Server 用来模拟我们工作时,请求的第三方服务。
代码只写一个controller,并延迟一秒后进行返回
@SpringBootApplication
@RequestMapping("/")
@Controller
public class TempHttpServerApplication {
public static void main(String[] args) {
SpringApplication.run(TempHttpServerApplication.class, args);
}
@GetMapping("")
@ResponseBody
public String hello() throws InterruptedException {
Thread.sleep(1000);
return "hello SpringBoot";
}
}
3. 从最简单的开始
在下面的代码中,输出从1到10,这是再简单不过的例子了
public static void main(String[] args) {
Flux.range(1, 10)
.subscribe(it -> System.out.println(it));
}
接着我们增加一个executeRequest
方法用来请求我们刚刚的Mock Server,在流执行时我们进行调用,因为我们是在map方法中执行的,所以看输出的日期,显而易见整个流是串行执行的。
public static void main(String[] args) {
Flux.range(1, 10)
.map(it -> executeRequest())
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
2024-02-24T02:59:38.612704Z hello SpringBoot
2024-02-24T02:59:39.626442Z hello SpringBoot
2024-02-24T02:59:40.635633Z hello SpringBoot
2024-02-24T02:59:41.643354Z hello SpringBoot
2024-02-24T02:59:42.646554Z hello SpringBoot
2024-02-24T02:59:43.654952Z hello SpringBoot
2024-02-24T02:59:44.658078Z hello SpringBoot
2024-02-24T02:59:45.662179Z hello SpringBoot
2024-02-24T02:59:46.666218Z hello SpringBoot
2024-02-24T02:59:47.671938Z hello SpringBoot
3.1 .map
→ .flatMap
在上面的那个例子中,如果我们.map
改为.flatMap
时,因为flatMap中默认是并行执行的,但是实际在运行时,如果.flatMap
中如果为同步操作,指:Mono.just(executeRequest())
,会导致整个流变为同步。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.flatMap(it -> Mono.just(executeRequest()), 8)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
3.2 增加publishOn
为了让flatMap
中的执行改为并行,这时候有同学就要说了,使用publishOn
!
可惜的是,在下面这个例子中,虽然使用了publishOn
, 让下游线程执行在新的线程组中执行,但是因为flatMap
中存在同步操作,指:Mono.just(executeRequest())
,所以整个流仍然为同步执行。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("REQ-T", 2))
.flatMap(it - > Mono.just(executeRequest()), 2)
.subscribe(it - > {
System.out.println(Instant.now() + " " + it);
});
}
private static String executeRequest() {
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
3.3 如果使用subscribeOn
呢
订阅内部流的执行在其他线程, 且设置flatMap
的并行度为2,但是整个流还是同步执行,因为
Mono.just
为意味着热数据,每次会同步的执行去获取,因此执行的时候仍然为同步。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("REQ-T", 2))
.log()
, 2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
日志:
11:03:44.820 [main] INFO reactor.Mono.SubscribeOnValue.1 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:44.822 [main] INFO reactor.Mono.SubscribeOnValue.1 -- request(32)
11:03:44.824 [REQ-T-1] INFO reactor.Mono.SubscribeOnValue.1 -- onNext(hello SpringBoot)
2024-02-24T03:03:44.824966Z hello SpringBoot
11:03:44.827 [REQ-T-1] INFO reactor.Mono.SubscribeOnValue.1 -- onComplete()
11:03:45.829 [main] INFO reactor.Mono.SubscribeOnValue.2 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:45.829 [main] INFO reactor.Mono.SubscribeOnValue.2 -- request(32)
11:03:45.829 [REQ-T-2] INFO reactor.Mono.SubscribeOnValue.2 -- onNext(hello SpringBoot)
2024-02-24T03:03:45.830078Z hello SpringBoot
11:03:45.830 [REQ-T-2] INFO reactor.Mono.SubscribeOnValue.2 -- onComplete()
11:03:46.837 [main] INFO reactor.Mono.SubscribeOnValue.3 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:46.837 [main] INFO reactor.Mono.SubscribeOnValue.3 -- request(32)
11:03:46.838 [REQ-T-3] INFO reactor.Mono.SubscribeOnValue.3 -- onNext(hello SpringBoot)
2024-02-24T03:03:46.838210Z hello SpringBoot
11:03:46.838 [REQ-T-3] INFO reactor.Mono.SubscribeOnValue.3 -- onComplete()
11:03:47.848 [main] INFO reactor.Mono.SubscribeOnValue.4 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:47.849 [main] INFO reactor.Mono.SubscribeOnValue.4 -- request(32)
11:03:47.849 [REQ-T-4] INFO reactor.Mono.SubscribeOnValue.4 -- onNext(hello SpringBoot)
2024-02-24T03:03:47.850130Z hello SpringBoot
11:03:47.850 [REQ-T-4] INFO reactor.Mono.SubscribeOnValue.4 -- onComplete()
11:03:48.861 [main] INFO reactor.Mono.SubscribeOnValue.5 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:48.861 [main] INFO reactor.Mono.SubscribeOnValue.5 -- request(32)
11:03:48.861 [REQ-T-5] INFO reactor.Mono.SubscribeOnValue.5 -- onNext(hello SpringBoot)
2024-02-24T03:03:48.861965Z hello SpringBoot
11:03:48.862 [REQ-T-5] INFO reactor.Mono.SubscribeOnValue.5 -- onComplete()
11:03:49.871 [main] INFO reactor.Mono.SubscribeOnValue.6 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:49.871 [main] INFO reactor.Mono.SubscribeOnValue.6 -- request(32)
11:03:49.871 [REQ-T-6] INFO reactor.Mono.SubscribeOnValue.6 -- onNext(hello SpringBoot)
2024-02-24T03:03:49.871636Z hello SpringBoot
11:03:49.871 [REQ-T-6] INFO reactor.Mono.SubscribeOnValue.6 -- onComplete()
11:03:50.879 [main] INFO reactor.Mono.SubscribeOnValue.7 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:50.879 [main] INFO reactor.Mono.SubscribeOnValue.7 -- request(32)
11:03:50.880 [REQ-T-7] INFO reactor.Mono.SubscribeOnValue.7 -- onNext(hello SpringBoot)
2024-02-24T03:03:50.880186Z hello SpringBoot
11:03:50.880 [REQ-T-7] INFO reactor.Mono.SubscribeOnValue.7 -- onComplete()
11:03:51.884 [main] INFO reactor.Mono.SubscribeOnValue.8 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:51.885 [main] INFO reactor.Mono.SubscribeOnValue.8 -- request(32)
11:03:51.885 [REQ-T-8] INFO reactor.Mono.SubscribeOnValue.8 -- onNext(hello SpringBoot)
2024-02-24T03:03:51.885495Z hello SpringBoot
11:03:51.885 [REQ-T-8] INFO reactor.Mono.SubscribeOnValue.8 -- onComplete()
11:03:52.895 [main] INFO reactor.Mono.SubscribeOnValue.9 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:52.896 [main] INFO reactor.Mono.SubscribeOnValue.9 -- request(32)
11:03:52.896 [REQ-T-9] INFO reactor.Mono.SubscribeOnValue.9 -- onNext(hello SpringBoot)
2024-02-24T03:03:52.896721Z hello SpringBoot
11:03:52.896 [REQ-T-9] INFO reactor.Mono.SubscribeOnValue.9 -- onComplete()
11:03:53.903 [main] INFO reactor.Mono.SubscribeOnValue.10 -- onSubscribe([Fuseable] FluxSubscribeOnValue.ScheduledScalar)
11:03:53.904 [main] INFO reactor.Mono.SubscribeOnValue.10 -- request(32)
11:03:53.904 [REQ-T-10] INFO reactor.Mono.SubscribeOnValue.10 -- onNext(hello SpringBoot)
2024-02-24T03:03:53.904910Z hello SpringBoot
11:03:53.905 [REQ-T-10] INFO reactor.Mono.SubscribeOnValue.10 -- onComplete()
3.4 publishOn+subscribeOn
我们都知道publishOn
针对的是后面流所执行的操作,因为executeRequest为同步,且使用的是Mono.just
,所以整个流还是同步。
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("publishOn-T", 2))
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
,2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
3.5 关键的Mono.fromCallable
下面这个例子终于为异步请求了, 且控制并发度由flatMap的第二个参数控制,此时并发度为2
public static void main(String[] args) {
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
, 2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
此时并发度为8,由flatMap的第二个参数控制
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
, 8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
3.6 Schedulers.newParallel 和 Schedulers.single 的区别
使用.subscribeOn(Schedulers.single())
,为串行执行
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.single())
,8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
使用Schedulers.newParallel("subscribeOn-T", 1)
为并行执行
public static void main(String[] args) {
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 1))
,8)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
其原因在于使用 Schedulers.single()
时它会将每个任务放入一个共享的单线程池中执行。这意味着任务将按顺序一个接一个地执行,因为它们都在同一个线程上执行。
3.7 Mono.defer
、Flux.defer
、Mono.*fromSupplier
除了使用Mono.fromCallable
,可以使用Mono.defer
,或者Flux.defer
、Mono.fromSupplier
这些的本质在于对处理executeRequest 时当作冷数据进行处理,只有在真正调用的时候才会进行执行。
什么,你说你还不明白冷数据和热数据的区别?
Flux.just(new Date(), new Date(), new Date())
在产生订阅事件时,时间就已经确定了。
Flux.defer(() -> Mono.just(new Date())).repeat(2)
只有在执行到这个方法时,才会计算当前的时间
Mono.defer
例子
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.defer(() -> Mono.just(executeRequest()))
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 1))
,8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
Mono.fromSupplier
例子
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromSupplier(() -> executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 1))
,8)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
countDownLatch.await();
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
3.8 subscribeOn
的使用
subscribeOn
其核心在于当订阅的这个事件为同步操作,我们期望改为异步时,使用subscribeOn
,比如IO 操作,save DB,包括订阅内部流的执行。
Mono.fromCallable(()->{
return Mono.just(mongoTemplate.insertOne(xxx))
})
.subscribeOn(Schedulers.boundedElastic())
希望上面的例子已经让你搞懂subscribeOn
的使用
4. publishOn
publishOn
把后续的操作切换到新的调度线程池中,flatMap
并发数默认和电脑核心数相关,下面以一个新的例子出发,我们需要更改一下mock server的响应为这样:
更改了一下mock server的响应
@SpringBootApplication
@RequestMapping("/")
@Controller
public class TempHttpServerApplication {
public static void main(String[] args) {
SpringApplication.run(TempHttpServerApplication.class, args);
}
@GetMapping("")
@ResponseBody
public String hello() throws InterruptedException {
Thread.sleep(1000);
return new Date().toString();
}
}
接着这是我们的测试代码,在没有使用publishOn
的情况下,subscribe 中睡眠3秒,然后输出mock server的返回
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(()->executeRequest())
.subscribeOn(Schedulers.newSingle("REQ-T"))
,3)
// .publishOn(Schedulers.newSingle("SOUT-T"))
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
在上面这个例子中,如果没有publishOn,那么得到的日志是:
REQ-T-3 2024-02-24T03:16:40.634093Z Sat Feb 24 11:16:37 CST 2024
REQ-T-3 2024-02-24T03:16:43.645577Z Sat Feb 24 11:16:37 CST 2024
REQ-T-3 2024-02-24T03:16:46.646355Z Sat Feb 24 11:16:37 CST 2024
REQ-T-5 2024-02-24T03:16:50.673382Z Sat Feb 24 11:16:47 CST 2024
REQ-T-5 2024-02-24T03:16:53.674860Z Sat Feb 24 11:16:47 CST 2024
REQ-T-5 2024-02-24T03:16:56.680422Z Sat Feb 24 11:16:47 CST 2024
REQ-T-7 2024-02-24T03:17:00.693387Z Sat Feb 24 11:16:57 CST 2024
REQ-T-7 2024-02-24T03:17:03.695553Z Sat Feb 24 11:16:57 CST 2024
REQ-T-7 2024-02-24T03:17:06.701117Z Sat Feb 24 11:16:57 CST 2024
REQ-T-10 2024-02-24T03:17:10.716434Z Sat Feb 24 11:17:07 CST 2024
给你一分钟的时间想想,为什么10个请求的响应似乎被分成了4组,而每组的响应时间一样,相邻两组的响应差距时间是10秒
因为如果没有publishOn切换到新的线程组执行,那么意味着.subscribe(it -> {
中的代码也是由
Schedulers.newSingle("REQ-T")
中的单个线程执行,那么在flatMap
中会同时发出3个请求,在服务端会睡眠1秒,而在subscribe
中执行时,每个请求会睡眠3秒,
所以10秒=1秒(服务端响应)+3(个请求) * 3(每个请求睡眠3秒)
而由于publishOn
的特性,我们一般可以这样使用,比如切换到新的线程组进行insert
Mono<Void> fluxToBlockingRepository(Flux<User> flux,
BlockingRepository<User> repository) {
return flux
.publishOn(Schedulers.elastic())
.doOnNext(repository::save)
.then();
}
4.1 publishOn 的使用场景
再接着增加难度,模拟一个工作中的场景,经常会先请求1,2个接口的数据进行组装,然后再请求其他接口,如下:
先在第一个flatMap
中执行一次executeRequest
,
然后通过publishOn
切换到"BBB-T"
线程组执行,给定60个线程,
接着在第二个flatMap
中执行executeRequest
,这是模拟拼装好数据后再请求其他接口,此时整个流的执行是并行,还是串行?还是部分并行、部分串行?
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
, 3)
.doOnNext(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
})
.publishOn(Schedulers.newParallel("BBB-T", 60))
.flatMap(item -> {
return Mono.fromCallable(() -> executeRequest());
}, 2)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://localhost:8888", HttpMethod.GET, null, String.class).getBody();
}
执行后输出日志如下:
AAA-T-3 2024-02-21T06:16:36.428794Z Wed Feb 21 14:16:36 CST 2024
AAA-T-3 2024-02-21T06:16:36.434086Z Wed Feb 21 14:16:36 CST 2024
AAA-T-3 2024-02-21T06:16:36.434188Z Wed Feb 21 14:16:36 CST 2024
AAA-T-6 2024-02-21T06:16:37.440315Z Wed Feb 21 14:16:37 CST 2024
BBB-T-1 2024-02-21T06:16:37.440402Z Wed Feb 21 14:16:37 CST 2024
AAA-T-4 2024-02-21T06:16:37.443843Z Wed Feb 21 14:16:37 CST 2024
AAA-T-4 2024-02-21T06:16:37.444355Z Wed Feb 21 14:16:37 CST 2024
BBB-T-1 2024-02-21T06:16:38.445792Z Wed Feb 21 14:16:38 CST 2024
AAA-T-9 2024-02-21T06:16:38.448931Z Wed Feb 21 14:16:38 CST 2024
AAA-T-9 2024-02-21T06:16:38.449036Z Wed Feb 21 14:16:38 CST 2024
AAA-T-8 2024-02-21T06:16:38.453588Z Wed Feb 21 14:16:38 CST 2024
BBB-T-1 2024-02-21T06:16:39.450717Z Wed Feb 21 14:16:39 CST 2024
AAA-T-10 2024-02-21T06:16:39.455242Z Wed Feb 21 14:16:39 CST 2024
BBB-T-1 2024-02-21T06:16:40.456177Z Wed Feb 21 14:16:40 CST 2024
BBB-T-1 2024-02-21T06:16:41.465664Z Wed Feb 21 14:16:41 CST 2024
BBB-T-1 2024-02-21T06:16:42.476952Z Wed Feb 21 14:16:42 CST 2024
BBB-T-1 2024-02-21T06:16:43.485045Z Wed Feb 21 14:16:43 CST 2024
BBB-T-1 2024-02-21T06:16:44.494117Z Wed Feb 21 14:16:44 CST 2024
BBB-T-1 2024-02-21T06:16:45.502331Z Wed Feb 21 14:16:45 CST 2024
BBB-T-1 2024-02-21T06:16:46.511551Z Wed Feb 21 14:16:46 CST 2024
从日志中可以看出,第一个flatMap
中的执行是并行的,一次3个请求,但是在第二个flatMap中的执行是串行的,BBB-T-1
线程的打印日志显示,每隔一秒进行请求一次。
这时候有聪明的同学肯定就要说了:因为你的第二个flatMap没有使用subscribeOn
是的,如果加上subscribeOn
,代码如下:
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
,3)
.publishOn(Schedulers.newParallel("BBB-T", 60))
.flatMap(item -> {
return Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("CCC-T"));
}, 2)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}
那我问你,最后subscribe
中的打印,是在哪个线程组中进行的?
"AAA-T"、 "BBB-T"、 "CCC-T"
答案是:"CCC-T"
因为第二个flatMap 中的内部订阅流再一次切换了后续的执行线程组
CCC-T-8 2024-02-21T06:24:38.150541Z Wed Feb 21 14:24:38 CST 2024
CCC-T-8 2024-02-21T06:24:38.157196Z Wed Feb 21 14:24:38 CST 2024
CCC-T-13 2024-02-21T06:24:39.163621Z Wed Feb 21 14:24:39 CST 2024
CCC-T-12 2024-02-21T06:24:39.164459Z Wed Feb 21 14:24:39 CST 2024
CCC-T-16 2024-02-21T06:24:40.172837Z Wed Feb 21 14:24:40 CST 2024
CCC-T-16 2024-02-21T06:24:40.173081Z Wed Feb 21 14:24:40 CST 2024
CCC-T-18 2024-02-21T06:24:41.180035Z Wed Feb 21 14:24:41 CST 2024
CCC-T-17 2024-02-21T06:24:41.183437Z Wed Feb 21 14:24:41 CST 2024
CCC-T-19 2024-02-21T06:24:42.186140Z Wed Feb 21 14:24:42 CST 2024
CCC-T-20 2024-02-21T06:24:42.187466Z Wed Feb 21 14:24:42 CST 2024
最后我们再来看一种写法,判断第二个flatMap
的执行是串行还是并行的,
这次我们不使用subscribeOn
订阅内部流,而是使用publishOn
通知下游执行的流在"BBB-T"
线程组执行,但是executeRequest
方法是阻塞的
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
, 3)
.flatMap(item -> {
return Mono.just(item)
.publishOn(Schedulers.newSingle("BBB-T"))
.map(code -> {
return executeRequest();
});
}, 2)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}
打印的日志如下:
BBB-T-5 2024-02-21T06:30:10.067077Z Wed Feb 21 14:30:10 CST 2024
BBB-T-5 2024-02-21T06:30:10.075096Z Wed Feb 21 14:30:10 CST 2024
BBB-T-10 2024-02-21T06:30:11.084870Z Wed Feb 21 14:30:11 CST 2024
BBB-T-10 2024-02-21T06:30:11.085104Z Wed Feb 21 14:30:11 CST 2024
BBB-T-14 2024-02-21T06:30:12.096516Z Wed Feb 21 14:30:12 CST 2024
BBB-T-14 2024-02-21T06:30:12.099821Z Wed Feb 21 14:30:12 CST 2024
BBB-T-16 2024-02-21T06:30:13.104845Z Wed Feb 21 14:30:13 CST 2024
BBB-T-18 2024-02-21T06:30:13.109858Z Wed Feb 21 14:30:13 CST 2024
BBB-T-19 2024-02-21T06:30:14.111670Z Wed Feb 21 14:30:14 CST 2024
BBB-T-20 2024-02-21T06:30:14.119100Z Wed Feb 21 14:30:14 CST 2024
第二个flatMap
的执行是并行的!没想到吧
如果你把subscribeOn
当作一个特殊的publishOn
看是不是就马上明白了呢,两者本质都是让事件切换到新的线程组进行执行,subscribeOn
是从订阅开始影响整个流,而 publishOn
是只影响后续的流
4.2 最后再测试一下
还不过瘾,再来一个验证一下自己是否真的明白了subscribeOn
和 publishOn
的作用,看下面这段代码,回答:
第二个flatMap
中执行executeRequest
方法时发生在哪个线程组,是串行还是并行,如果是并行,并行度是一次几个?
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
Flux.range(1, 10)
.flatMap(it -> Mono.fromCallable(() -> executeRequest())
.subscribeOn(Schedulers.newSingle("AAA-T"))
, 3)
.publishOn(Schedulers.newSingle("BBB-T"))
.flatMap(item -> {
return Mono.just(item)
.publishOn(Schedulers.newSingle("CCC-T"))
.map(code -> {
return executeRequest();
})
.subscribeOn(Schedulers.newSingle("DDD-T"));
}, 100)
.doOnComplete(countDownLatch::countDown)
.subscribe(it -> {
System.out.println(Thread.currentThread().getName() +" " + Instant.now() + " " +it);
});
countDownLatch.await();
}
答案:base64解码后查看
Q0NDLVQtMTMgMjAyNC0wMi0yNFQwMzoyODo1My40Njk3NzlaIFNhdCBGZWIgMjQgMTE6Mjg6NTMgQ1NUIDIwMjQKQ0NDLVQtMTMgMjAyNC0wMi0yNFQwMzoyODo1My40NzY1MTZaIFNhdCBGZWIgMjQgMTE6Mjg6NTMgQ1NUIDIwMjQKQ0NDLVQtMTMgMjAyNC0wMi0yNFQwMzoyODo1My40NzY2MTlaIFNhdCBGZWIgMjQgMTE6Mjg6NTMgQ1NUIDIwMjQKQ0NDLVQtMTggMjAyNC0wMi0yNFQwMzoyODo1NC40NzcyOTFaIFNhdCBGZWIgMjQgMTE6Mjg6NTQgQ1NUIDIwMjQKQ0NDLVQtMTggMjAyNC0wMi0yNFQwMzoyODo1NC40Nzc1MzhaIFNhdCBGZWIgMjQgMTE6Mjg6NTQgQ1NUIDIwMjQKQ0NDLVQtMjIgMjAyNC0wMi0yNFQwMzoyODo1NC40ODA4NDdaIFNhdCBGZWIgMjQgMTE6Mjg6NTQgQ1NUIDIwMjQKQ0NDLVQtMjUgMjAyNC0wMi0yNFQwMzoyODo1NS40ODkyMjhaIFNhdCBGZWIgMjQgMTE6Mjg6NTUgQ1NUIDIwMjQKQ0NDLVQtMjcgMjAyNC0wMi0yNFQwMzoyODo1NS40OTAxMDNaIFNhdCBGZWIgMjQgMTE6Mjg6NTUgQ1NUIDIwMjQKQ0NDLVQtMjkgMjAyNC0wMi0yNFQwMzoyODo1NS40OTE3NDVaIFNhdCBGZWIgMjQgMTE6Mjg6NTUgQ1NUIDIwMjQKQ0NDLVQtMzEgMjAyNC0wMi0yNFQwMzoyODo1Ni41MDQzMjRaIFNhdCBGZWIgMjQgMTE6Mjg6NTYgQ1NUIDIwMjQK