响应式编程框架 WebFlux在实际开发中的一些痛点
前言
起初这个问题是为了回到知乎上的一个提问:为什么大多数程序员认为响应式编程不易理解,甚至反人类?
但是写着写着发现内容还挺多的,于是整理成一篇博客吧。
以下是在实际开发中碰到的一些问题。
几乎所有的方法返回套上了Mono Flux
从一个请求的入口
@GetMapping("check")
public Mono<ApiResponse<String>> hlCheck(){
return Mono.just(ApiResponse.success("I am fine. version is: " + version));
}
到从数据库的查询结果(使用的r2dbc)
@Override
public Flux<AntiquarianBook> findByBookName(String bookName){
return antiquarianBookRepository.findAll();
}
甚至一些IO操作的工具类,为了异步非阻塞的方式来处理,几乎”污染“了所有方法。
public static Flux<String> fromPath(Path path) {
return Flux.using(() -> Files.lines(path),
Flux::fromStream,
BaseStream::close
);
}
而这也直接导致,没法方便快捷的做缓存!因为你拿到的方法返回是Mono,Flux,而不是完成的数据。
所以也就引发了下面这个问题
缓存框架的支持少之又少
具体可以看之前的这篇博客:
在传统项目中,使用缓存框架对一个方法的返回做缓存那是再简单不过的事,缓存框架也是有多重选择,比如EHcache,Caffeine,jetcache,Guava Cache等等等。但是当我真的把webflux应用到真实项目的时候才发现,因为响应式编程中的异步调度,几乎让所有的方法返回都套上了`Mono<T>`, `Flux<T>`,似乎之前的缓存框架没有那么简单能融合进项目中。为此,我收集了常见的缓存框架对project-reactor(webflux中的响应式编程框架)支持,发现并没有我想的这么简单。
太长不看的看结论, 现有缓存框架对响应式编程的支持情况:
框架名 | 支持情况 | 相关链接 |
---|---|---|
ehcache | 不支持 | Possibility to provide asynchronous or reactive cache in future versions |
jetcache | 不支持 | jetcache 支持 spring webflux 吗 |
reactor-extra | 最新版本已经停止更新 | reactor-addons |
caffeine | 支持 | Reactive types support for @Cacheable methods 但是要求是spring 6.1M4版本之后 |
所以当你想要一个缓存注解就有本地缓存和远程缓存?自己写一个吧
debug的困难度上升
随便写几个初学者碰到一脸懵逼的场景:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("publishOn-T", 2))
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
,2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://ip:port", HttpMethod.GET, null, String.class).getBody();
}
请问:上述代码的执行对于flatMap来说,每次会同时执行几次外部请求executeRequest()?
答案
答案是一次,而不是两次
因为Mono.just(executeRequest())
是Hot sequence, 在初始化时则即时计算出来的。
Hot sequence与Cold sequence
举个很简单的例子:
Flux.just(new Date(), new Date(), new Date())
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()))
.blockLast();
上面代码输出的都是同一个时间点,因为在Flux初始化的时候就开始计算了。project reactor文档对此的描述是:
- 英文
- 翻译
It directly captures the value at assembly time and replays it to anybody subscribing to it later.
它可在组装时直接捕获值,并在以后向任何订阅者重播。
如果你这样:
System.out.println(new Date().getTime());
Flux<Date> dateFlux = Flux.just(new Date(), new Date(), new Date())
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()));
Thread.sleep(3000);
dateFlux.subscribe();
等待三秒后订阅,你也会发现输出的时间是3秒前的。
而下面这个使用的Flux.defer
,它则会推迟到实际订阅时才会计算对应的时间,则是响应式编程中说的Cold sequence
Flux.defer(() -> {
return Mono.just(new Date());
})
.repeat(2)
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()))
.blockLast();