跳到主要内容

响应式编程框架 WebFlux在实际开发中的一些痛点

前言

起初这个问题是为了回到知乎上的一个提问:为什么大多数程序员认为响应式编程不易理解,甚至反人类?

但是写着写着发现内容还挺多的,于是整理成一篇博客吧。

以下是在实际开发中碰到的一些问题。

几乎所有的方法返回套上了Mono Flux

从一个请求的入口

@GetMapping("check")
public Mono<ApiResponse<String>> hlCheck(){
return Mono.just(ApiResponse.success("I am fine. version is: " + version));
}

到从数据库的查询结果(使用的r2dbc)

@Override
public Flux<AntiquarianBook> findByBookName(String bookName){
return antiquarianBookRepository.findAll();
}

甚至一些IO操作的工具类,为了异步非阻塞的方式来处理,几乎”污染“了所有方法。

public static Flux<String> fromPath(Path path) {
return Flux.using(() -> Files.lines(path),
Flux::fromStream,
BaseStream::close
);
}

而这也直接导致,没法方便快捷的做缓存!因为你拿到的方法返回是Mono,Flux,而不是完成的数据。

所以也就引发了下面这个问题

缓存框架的支持少之又少

具体可以看之前的这篇博客:

在WebFlux中如何使用缓存,缓存Mono和Flux
example
在传统项目中,使用缓存框架对一个方法的返回做缓存那是再简单不过的事,缓存框架也是有多重选择,比如EHcache,Caffeine,jetcache,Guava Cache等等等。但是当我真的把webflux应用到真实项目的时候才发现,因为响应式编程中的异步调度,几乎让所有的方法返回都套上了`Mono<T>`, `Flux<T>`,似乎之前的缓存框架没有那么简单能融合进项目中。为此,我收集了常见的缓存框架对project-reactor(webflux中的响应式编程框架)支持,发现并没有我想的这么简单。

太长不看的看结论, 现有缓存框架对响应式编程的支持情况:

框架名支持情况相关链接
ehcache不支持Possibility to provide asynchronous or reactive cache in future versions
jetcache不支持jetcache 支持 spring webflux 吗
reactor-extra最新版本已经停止更新reactor-addons
caffeine支持Reactive types support for @Cacheable methods 但是要求是spring 6.1M4版本之后

所以当你想要一个缓存注解就有本地缓存和远程缓存?自己写一个吧

debug的困难度上升

随便写几个初学者碰到一脸懵逼的场景:

public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("publishOn-T", 2))
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
,2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}

private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://ip:port", HttpMethod.GET, null, String.class).getBody();
}

请问:上述代码的执行对于flatMap来说,每次会同时执行几次外部请求executeRequest()?

答案

答案是一次,而不是两次

因为Mono.just(executeRequest())是Hot sequence, 在初始化时则即时计算出来的。

Hot sequence与Cold sequence

举个很简单的例子:

        Flux.just(new Date(), new Date(), new Date())
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()))
.blockLast();

上面代码输出的都是同一个时间点,因为在Flux初始化的时候就开始计算了。project reactor文档对此的描述是:

It directly captures the value at assembly time and replays it to anybody subscribing to it later.

如果你这样:

        System.out.println(new Date().getTime());
Flux<Date> dateFlux = Flux.just(new Date(), new Date(), new Date())
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()));
Thread.sleep(3000);
dateFlux.subscribe();

等待三秒后订阅,你也会发现输出的时间是3秒前的。

而下面这个使用的Flux.defer,它则会推迟到实际订阅时才会计算对应的时间,则是响应式编程中说的Cold sequence

         Flux.defer(() -> {
return Mono.just(new Date());
})
.repeat(2)
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()))
.blockLast();

这个例子如果你就已经看的云里雾里,那么实际开发中则会有更多的坑,当你期望异步执行的时候实际同步执行的,而你却没察觉到。

IDE的支持很关键,但是经常掉链子

比如这个场景:我期望在debug的时候,拿到deadTipsId的值,所以你肯定会使用IDEA的Evaluate 功能,看看这个值是啥内容

好嘛,然后你就发现IDEA卡在这不动了。

你以为是暂时的,但是当你上个厕所接杯水回来发现还是卡在这,但是数据库其实就3条数据!!

这在紧急排查一个任务的时候真的是非常折磨的。还不如在下面写个xxx.subscribe()打印一下。

但是更奇怪的是,时不时这个功能又是正常的,不理解IDEA抽风是什么原因导致的。

filterWhen的迷惑性

看这段代码, 我期望的是Flux.just(1,2,3,4,5,6) 根据Flux.just(1, 2, 3) 过滤,当存在相等元素的时候进行输出。

val cacheFlux = Flux.just(1, 2, 3).cache()

Flux.just(1,2,3,4,5,6)
.filterWhen {mainELe ->
cacheFlux.any {
mainELe == it
}
}
.doOnNext {
println(it)
}
.subscribe()

它的输出也确实符合预期

1
2
3

开始上强度了,如果现在我想过滤出和Flux.just(1, 2, 3)不相等的元素,你的下意识是不是把mainELe.equals(it) 改为!mainELe.equals(it) ?

那可就太错了,你会发现输出的是

1
2
3
4
5
6

为什么?仔细分析你就会发现

  • 前者cacheFlux.any { mainELe == it }说的是任意元素存在相等时则通过
  • 后者cacheFlux.any { mainELe != it }说的是任意元素不相等则通过

所以当你期望“过滤掉和cacheFlux相等的数据”时,应该是对这个结果取反, 代码变成了这样:

    val cacheFlux = Flux.just(1, 2, 3).cache()

Flux.just(1,2,3,4,5,6)
.filterWhen {mainELe ->
cacheFlux.any {
mainELe == it
}
.map {
it.not()
}
}
.doOnNext {
println(it)
}
.subscribe()

但我使用的时候发现这真的挺反直觉的!因为any的操作符会让人少思考一层。

那聪明的同学就要问了,那我直接使用.map岂不是更好?类似这样,在map中直接比较,看起来没有那么多弯弯绕绕。

val cacheFlux = Flux.just(1, 2, 3).cache()

Flux.just(1,2,3,4,5,6)
.filterWhen {mainELe ->
cacheFlux.map {
mainELe == it
}
}
.doOnNext {
println(it)
}
.subscribe()

当你运行一下就会发现只输出了1, 这是因为cacheFlux.map { mainELe == it } 实际是一个flux, 而filterWhen只拿了flux中的第一个元素。就相当于Flux.just(1,2,3,4,5,6)Flux.just(1)比较。

失去了异常栈的打印,让排查猜谜

比如某个场景下,查询了3个表的数据合并,聚合后处理一个属性。

当某个查询异常时,你无法一眼看出是哪个方法引发的查询错误,因为调用栈压根没打印你的代码调用位置。

log
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [xxxxx]
Caused by: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [xxxxxx]
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:253)
at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:156)
at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:7310)
at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:7363)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:403)
at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:480)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:540)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:781)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:893)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:260)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:164)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:265)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:368)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:592)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:867)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:994)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:256)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:201)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:684)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:936)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:425)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlBadGrammarException: relation "table_name" does not exist
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:96)
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65)
at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:179)
... 45 common frames omitted

初始化Mono或者Flux的时候并不代表真的执行了

在学习project reactor的时候,我们都知道,如果一个Mono or Flux没有被subscribe,那么什么也不会发生。但是实际我们在debug的时候总是会被这个迷惑

看这段代码: 这段代码中,先是根据name findOne 一条记录,如果有找到对于记录则进入flatMap中,更新属性,然后保存。 如果没有找到记录,则执行switchIfEmpty中的逻辑新增一条记录

public Mono<Void> saveClicks(String name) {
JSONObject dataJson = new JSONObject().put("age", "100");

return easterEggRepository.findOne(Example.of(new EasterEgg().setName(name)))
.flatMap(it -> {
JSONObject dataJsonExist = new JSONObject(it.getData().asString());
//... 更新一些属性
return easterEggRepository.save(it);
})
.switchIfEmpty(saveEasterEgg())
.then()
;
}

private Mono<EasterEgg> saveEasterEgg(String name) {
return easterEggRepository.save(new EasterEgg().setName(name))
.doOnNext(it -> {
System.out.println(it);
})
;
}

但是当你实际通过IDE debug的时候发现,假设你在高亮的这两行代码打了断点。 不管是更新操作还是新增操作,请求进来时都会先进入switchIfEmpty 中,也就是saveEasterEgg方法里面,然后再到flatMap中的断点,你会想当然的以为 saveEasterEgg被先执行了,但是其实没有,这只是Flux的初始化过程。

如何得知的?因为如果执行了easterEggRepository.save(new EasterEgg().setName(name)) 这步,那么下面的doOnNext 也会执行,而实际上是没有执行的。

unit test覆盖率难度上升

这里不讨论unit test是否是鸡肋这个话题。当你想使用unit test覆盖你的代码时,你得这样写:

@Test
public void testAppendBoomError() {
Flux<String> source = Flux.just("thing1", "thing2");

StepVerifier.create(
appendBoomError(source))
.expectNext("thing1")
.expectNext("thing2")
.expectErrorMessage("boom")
.verify();
}

或者这样:

StepVerifier.create(Mono.just(1).map(i -> i + 10),
StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2")))
.expectAccessibleContext()
.contains("thing1", "thing2")
.then()
.expectNext(11)
.verifyComplete();

太棒啦!比以前的unit test写法看起来一点都不麻烦呢。。。

mono or flux操作符超过200个,要熟练使用不是件容易事

比如现在对你随堂测试:

  • Flux.using的使用场景?
  • 什么时候该使用onErrorContinue()
  • 当你想使用Backpressure时,可以使用哪些操作符来完成?
  • 你能使用哪些操作符完成斐波那契数列的计算?

这些都是需要开发者不停看文档才能逐渐累积起来对应的知识。 所以也就导致即便你的java 函数式编程用的炉火纯青,到响应式编程看到这么多个操作符还是得重走一遍西游路。

既要R2DBC,又要分库分表?

期望使用r2dbc的同时实现分库分表?可能你只有akka这个选择。

搜了一堆关于:“sharding jdbc 支持r2dbc吗?” , “r2dbc怎么实现分库分表”

得到的结果是:

在反应式编程 API 下 ShardingSphere JDBC 无法处理 R2DBC DataSource,仅可处理 JDBC DataSource。 在使用 WebFlux 组件的 Spring Boot 微服务中应避免创建 ShardingSphere JDBC DataSource。

具体可以看这个文档上说的:

Can I use ShardingSphere fully reactive with R2DBC without proxy and sidecar? #10837

shardingsphere support r2dbc?

最后找到的结果是可以通过akka来实现数据库分片:Database sharding

上下文问题

在传统的 Servlet 环境中的使用有ThreadLocal,而因为WebFlux 是非阻塞的、异步的,使用事件驱动模型,因此请求处理不再绑定到特定的线程。

这意味着不能再依赖 ThreadLocal 传递上下文信息,必须使用 Reactor 提供的 Context 进行显式的上下文传递。

所以当你想调试上下文丢失的问题时绝对是够你吃一壶的。

  • 你可能会遇到上下文值突然“消失”或意外覆盖的情况,却难以追踪其源头。不知道为啥被覆盖,被谁覆盖的。

  • 某些第三方库或现有代码可能仍依赖于 ThreadLocal 来存储用户上下文(例如日志的 MDC),但在 WebFlux 中,由于异步非阻塞的线程模型,ThreadLocal 的数据不会在反应式链中自动传递。 如果上下文依赖于 ThreadLocal,可能会遇到数据不一致或丢失的问题。

  • 使用阻塞操作(如 block()blockOptional()toFuture().get() 等)会导致上下文的丢失,因为阻塞操作会中断反应式链条,线程模型会发生改变,导致上下文无法被正确传递

写在最后

就目前而言,我还没有把WebFlux中的坑都踩一遍,所以即便是有以上种种不妥之处,但是在写响应式编程还是有种上瘾的感觉,毕竟也不会同时踩中上面说的所有坑。

你也可以说我是因为手里有把奇怪的锤子,所以看啥都像钉子想敲敲。

在一些场景下,会让我觉得省略了很多的代码,比如:

容错与重试机制

对请求的重试只需要.retry即可,比如下面这个:重试3次,每次重试间隔2秒。

WebClient webClient = WebClient.create("https://example.com");

webClient.get()
.uri("/resource")
.retrieve()
.bodyToMono(String.class)
// retry 3 times with a 2 seconds delay between retries
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)))
.onErrorResume(e -> {
// Handle the error case after retries fail
System.out.println("Request failed after retries: " + e.getMessage());
return Mono.empty();
})
.subscribe(response -> System.out.println("Response: " + response));

还能使用指数退避:Retry.backoff(3, Duration.ofSeconds(1)),每次重试的间隔时间呈指数级增长。

背压支持

比如:

  • 生产者每隔 100 毫秒生成一个数据项。
  • 消费者处理数据需要 500 毫秒。

通过背压机制,消费者可以根据其处理能力控制数据的获取速度,避免过载。

import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;

public class BackpressureExample {

public static void main(String[] args) throws InterruptedException {
// 创建一个生产者,每 100 毫秒生成一个数字
Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(100))
.log() // 用于输出流的每个步骤
.onBackpressureBuffer(100, // 缓冲区大小为 10
value -> System.out.println("Dropping value: " + value)
); // 当缓冲区满时丢弃数据

// 模拟一个处理较慢的消费者,每 500 毫秒处理一个数据
fastProducer
.publishOn(Schedulers.boundedElastic()) // 在一个弹性线程池上处理
.subscribe(value -> {
try {
// 模拟慢处理
Thread.sleep(500);
System.out.println("Processed: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

// 保持主线程活跃一段时间以查看输出
Thread.sleep(500000);
}
}