响应式编程框架 WebFlux在实际开发中的一些痛点
前言
起初这个问题是为了回到知乎上的一个提问:为什么大多数程序员认为响应式编程不易理解,甚至反人类?
但是写着写着发现内容还挺多的,于是整理成一篇博客吧。
以下是在实际开发中碰到的一些问题。
几乎所有的方法返回套上了Mono Flux
从一个请求的入口
@GetMapping("check")
public Mono<ApiResponse<String>> hlCheck(){
return Mono.just(ApiResponse.success("I am fine. version is: " + version));
}
到从数据库的查询结果(使用的r2dbc)
@Override
public Flux<AntiquarianBook> findByBookName(String bookName){
return antiquarianBookRepository.findAll();
}
甚至一些IO操作的工具类,为了异步非阻塞的方式来处理,几乎”污染“了所有方法。
public static Flux<String> fromPath(Path path) {
return Flux.using(() -> Files.lines(path),
Flux::fromStream,
BaseStream::close
);
}
而这也直接导致,没法方便快捷的做缓存!因为你拿到的方法返回是Mono,Flux,而不是完成的数据。
所以也就引发了下面这个问题
缓存框架的支持少之又少
具体可以看之前的这篇博客:
太长不看的看结论, 现有缓存框架对响应式编程的支持情况:
框架名 | 支持情况 | 相关链接 |
---|---|---|
ehcache | 不支持 | Possibility to provide asynchronous or reactive cache in future versions |
jetcache | 不支持 | jetcache 支持 spring webflux 吗 |
reactor-extra | 最新版本已经停止更新 | reactor-addons |
caffeine | 支持 | Reactive types support for @Cacheable methods 但是要求是spring 6.1M4版本之后 |
所以当你想要一个缓存注解就有本地缓存和远程缓存?自己写一个吧
debug的困难度上升
随便写几个初学者碰到一脸懵逼的场景:
public static void main(String[] args) throws InterruptedException {
Flux.range(1, 10)
.publishOn(Schedulers.newParallel("publishOn-T", 2))
.flatMap(it -> Mono.just(executeRequest())
.subscribeOn(Schedulers.newParallel("subscribeOn-T", 2))
,2)
.subscribe(it -> {
System.out.println(Instant.now() + " " +it);
});
}
private static String executeRequest(){
RestTemplate restTemplate = new RestTemplate();
return restTemplate.exchange("http://ip:port", HttpMethod.GET, null, String.class).getBody();
}
请问:上述代码的执行对于flatMap来说,每次会同时执行几次外部请求executeRequest()?
答案
答案是一次,而不是两次
因为Mono.just(executeRequest())
是Hot sequence, 在初始化时则即时计算出来的。
Hot sequence与Cold sequence
举个很简单的例子:
Flux.just(new Date(), new Date(), new Date())
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()))
.blockLast();
上面代码输出的都是同一个时间点,因为在Flux初始化的时候就开始计算了。project reactor文档对此的描述是:
- 英文
- 翻译
It directly captures the value at assembly time and replays it to anybody subscribing to it later.
它可在组装时直接捕获值,并在以后向任何订阅者重播。
如果你这样:
System.out.println(new Date().getTime());
Flux<Date> dateFlux = Flux.just(new Date(), new Date(), new Date())
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()));
Thread.sleep(3000);
dateFlux.subscribe();
等待三秒后订阅,你也会发现输出的时间是3秒前的。
而下面这个使用的Flux.defer
,它则会推迟到实际订阅时才会计算对应的时间,则是响应式编程中说的Cold sequence
Flux.defer(() -> {
return Mono.just(new Date());
})
.repeat(2)
.delayElements(Duration.ofSeconds(1))
.doOnNext(it -> System.out.println(it.getTime()))
.blockLast();
这个例子如果你就已经看的云里雾里,那么实际开发中则会有更多的坑,当你期望异步执行的时候实际同步执行的,而你却没察觉到。
IDE的支持很关键,但是经常掉链子
比如这个场景:我期望在debug的时候,拿到deadTipsId
的值,所以你肯定会使用IDEA的Evaluate 功能,看看这个值是啥内容
好嘛,然后你就发现IDEA卡在这不动了。
你以为是暂时的,但是当你上个厕所接杯水回来发现还是卡在这,但是数据库其实就3条数据!!
这在紧急排查一个任务的时候真的是非常折磨的。还不如在下面写个xxx.subscribe()
打印一下。
但是更奇怪的是,时不时这个功能又是正常的,不理解IDEA抽风是什么原因导致的。
filterWhen的迷惑性
看这段代码, 我期望的是Flux.just(1,2,3,4,5,6)
根据Flux.just(1, 2, 3)
过滤,当存在相等元素的时候进行输出。
val cacheFlux = Flux.just(1, 2, 3).cache()
Flux.just(1,2,3,4,5,6)
.filterWhen {mainELe ->
cacheFlux.any {
mainELe == it
}
}
.doOnNext {
println(it)
}
.subscribe()
它的输出也确实符合预期
1
2
3
开始上强度了,如果现在我想过滤出和Flux.just(1, 2, 3)
不相等的元素,你的下意识是不是把mainELe == it
改为mainELe != it
?
那可就太错了,你会发现输出的是
1
2
3
4
5
6
为什么?仔细分析你就会发现
- 前者
cacheFlux.any { mainELe == it }
说的是任意元素存在相等时则通过 - 后者
cacheFlux.any { mainELe != it }
说的是任意元素不相等则通过
所以当你期望“过滤掉和cacheFlux相等的数据”时,应该是对这个结果取反, 代码变成了这样:
val cacheFlux = Flux.just(1, 2, 3).cache()
Flux.just(1,2,3,4,5,6)
.filterWhen {mainELe ->
cacheFlux.any {
mainELe == it
}
.map {
it.not()
}
}
.doOnNext {
println(it)
}
.subscribe()
但我使用的时候发现这真的挺反直觉的!因为any的操作符会让人少思考一层。
那聪明的同学就要问了,那我直接使用.map岂不是更好?类似这样,在map中直接比较,看起来没有那么多弯弯绕绕。
val cacheFlux = Flux.just(1, 2, 3).cache()
Flux.just(1,2,3,4,5,6)
.filterWhen {mainELe ->
cacheFlux.map {
mainELe == it
}
}
.doOnNext {
println(it)
}
.subscribe()
当你运行一下就会发现只输出了1, 这是因为cacheFlux.map { mainELe == it }
实际是一个flux, 而filterWhen只拿了flux中的第一个元素。就相当于Flux.just(1,2,3,4,5,6)
和Flux.just(1)
比较。
失去了异常栈的打印,让排查猜谜
比如某个场景下,查询了3个表的数据合并,聚合后处理一个属性。
当某个查询异常时,你无法一眼看出是哪个方法引发的查询错误,因为调用栈压根没打印你的代码调用位置。
log
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [xxxxx]
Caused by: org.springframework.r2dbc.BadSqlGrammarException: executeMany; bad SQL grammar [xxxxxx]
at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:253)
at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:156)
at reactor.core.publisher.Flux.lambda$onErrorMap$29(Flux.java:7310)
at reactor.core.publisher.Flux.lambda$onErrorResume$30(Flux.java:7363)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:403)
at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:480)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210)
at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:540)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:781)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:893)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.Operators.complete(Operators.java:137)
at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:241)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:260)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:164)
at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:265)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:134)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:157)
at reactor.core.publisher.FluxFilter$FilterConditionalSubscriber.onError(FluxFilter.java:291)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onError(FluxMap.java:265)
at reactor.core.publisher.Operators.error(Operators.java:198)
at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:76)
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53)
at reactor.core.publisher.Mono.subscribe(Mono.java:4576)
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:368)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:612)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:592)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerError(FluxFlatMap.java:867)
at reactor.core.publisher.FluxFlatMap$FlatMapInner.onError(FluxFlatMap.java:994)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:213)
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:256)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:201)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onNext(FluxDiscardOnCancel.java:91)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:880)
at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:805)
at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:163)
at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:684)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:936)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:425)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlBadGrammarException: relation "table_name" does not exist
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:96)
at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:65)
at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:132)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:179)
... 45 common frames omitted
初始化Mono或者Flux的时候并不代表真的执行了
在学习project reactor的时候,我们都知道,如果一个Mono or Flux没有被subscribe,那么什么也不会发生。但是实际我们在debug的时候总是会被这个迷惑
看这段代码:
这段代码中,先是根据name findOne
一条记录,如果有找到对于记录则进入flatMap
中,更新属性,然后保存。
如果没有找到记录,则执行switchIfEmpty
中的逻辑新增一条记录
public Mono<Void> saveClicks(String name) {
JSONObject dataJson = new JSONObject().put("age", "100");
return easterEggRepository.findOne(Example.of(new EasterEgg().setName(name)))
.flatMap(it -> {
JSONObject dataJsonExist = new JSONObject(it.getData().asString());
//... 更新一些属性
return easterEggRepository.save(it);
})
.switchIfEmpty(saveEasterEgg())
.then()
;
}
private Mono<EasterEgg> saveEasterEgg(String name) {
return easterEggRepository.save(new EasterEgg().setName(name))
.doOnNext(it -> {
System.out.println(it);
})
;
}
但是当你实际通过IDE debug的时候发现,假设你在高亮的这两行代码打了断点。
不管是更新操作还是新增操作,请求进来时都会先进入switchIfEmpty
中,也就是saveEasterEgg
方法里面,然后再到flatMap
中的断点,你会想当然的以为
saveEasterEgg
被先执行了,但是其实没有,这只是Flux的初始化过程。
如何得知的?因为如果执行了easterEggRepository.save(new EasterEgg().setName(name))
这步,那么下面的doOnNext
也会执行,而实际上是没有执行的。
unit test覆盖率难度上升
这里不讨论unit test是否是鸡肋这个话题。当你想使 用unit test覆盖你的代码时,你得这样写:
@Test
public void testAppendBoomError() {
Flux<String> source = Flux.just("thing1", "thing2");
StepVerifier.create(
appendBoomError(source))
.expectNext("thing1")
.expectNext("thing2")
.expectErrorMessage("boom")
.verify();
}
或者这样:
StepVerifier.create(Mono.just(1).map(i -> i + 10),
StepVerifierOptions.create().withInitialContext(Context.of("thing1", "thing2")))
.expectAccessibleContext()
.contains("thing1", "thing2")
.then()
.expectNext(11)
.verifyComplete();
太棒啦!比以前的unit test写法看起来一点都不麻烦呢。。。
Mono or Flux操作符超过200个,要熟练使用不是件容易事
比如现在对你随堂测试:
Flux.using
的使用场景?- 什么时候该使用
onErrorContinue()
- 当你想使用
Backpressure
时,可以使用哪些操作符来完成? - 你能使用哪些操作符完成斐波那契数列的计算?
这些都是需要开发者不停看文档才能逐渐累积起来对应的知识。 所以也就导致即便你的Java 函数式编程用的炉火纯青,到响应式编程看到这么多个操作符还是得重走一遍西游路。
既要R2DBC,又要分库分表?
期望使用r2dbc的同时实现分库分表? 可能你只有akka这个选择。
搜了一堆关于:“sharding jdbc 支持r2dbc吗?” , “r2dbc怎么实现分库分表”
得到的结果是:
在反应式编程 API 下 ShardingSphere JDBC 无法处理 R2DBC DataSource,仅可处理 JDBC DataSource。 在使用 WebFlux 组件的 Spring Boot 微服务中应避免创建 ShardingSphere JDBC DataSource。
具体可以看这个文档上说的:
Can I use ShardingSphere fully reactive with R2DBC without proxy and sidecar? #10837
最后找到的结果是可以通过akka来实现数据库分片:Database sharding
上下文问题
在传统的 Servlet 环境中的使用有ThreadLocal,而因为WebFlux 是非阻塞的、异步的,使用事件驱动模型,因此请求处理不再绑定到特定的线程。
这意味着不能再依赖 ThreadLocal 传递上下文信息,必须使用 Reactor 提供的 Context 进行显式的上下文传递。
所以当你想调试上下文丢失的问题时绝对是够你吃一壶的。
-
你可能会遇到上下文值突然“消失”或意外覆盖的情况,却难以追踪其源头。不知道为啥被覆盖,被谁覆盖的。
-
某些第三方库或现有代码可能仍依赖于 ThreadLocal 来存储用户上下文(例如日志的 MDC),但在 WebFlux 中,由于异步非阻塞的线程模型,ThreadLocal 的数据不会在反应式链中自动传递。 如果上下文依赖于 ThreadLocal,可能会遇到数据不一致或丢失的问题。
-
使用阻塞操作(如
block()
、blockOptional()
、toFuture().get()
等)会导致上下文的丢失,因为阻塞操作会中断反应式链条,线程模型会发生改变,导致上下文无法被正确传递
写在最后
就目前而言,我还没有把WebFlux中的坑都踩一遍,所以即便是有以上种种不妥之处,但是在写响应式编程还是有种上瘾的感觉,毕竟也不会同时踩中上面说的所有坑。
你也可以说我是因为手里有把奇怪的锤子,所以看啥都像钉子想敲敲。
在一些场景下,会让我觉得省略了很多的代码,比如:
容错与重试机制
对请求的重试只需要.retry
即可,比如下面这个:重试3次,每次重试间隔2秒。
WebClient webClient = WebClient.create("https://example.com");
webClient.get()
.uri("/resource")
.retrieve()
.bodyToMono(String.class)
// retry 3 times with a 2 seconds delay between retries
.retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(2)))
.onErrorResume(e -> {
// Handle the error case after retries fail
System.out.println("Request failed after retries: " + e.getMessage());
return Mono.empty();
})
.subscribe(response -> System.out.println("Response: " + response));
还能使用指数退避:Retry.backoff(3, Duration.ofSeconds(1))
,每次重试的间隔时间呈指数级增长。
背压支持
比如:
- 生产者每隔 100 毫秒生成一个数据项。
- 消费者处理数据需要 500 毫秒。
通过背压机制,消费者可以根据其处理能力控制数据的获取速度,避免过载。
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class BackpressureExample {
public static void main(String[] args) throws InterruptedException {
// 创建一个生产者,每 100 毫秒生成一个数字
Flux<Long> fastProducer = Flux.interval(Duration.ofMillis(100))
.log() // 用于输出流的每个步骤
.onBackpressureBuffer(100, // 缓冲区大小为 10
value -> System.out.println("Dropping value: " + value)
); // 当缓冲区满时丢弃数据
// 模拟一个处理较慢的消费者,每 500 毫秒处理一个数据
fastProducer
.publishOn(Schedulers.boundedElastic()) // 在一个弹性线程池上处理
.subscribe(value -> {
try {
// 模拟慢处理
Thread.sleep(500);
System.out.println("Processed: " + value);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 保持主线程活跃一段时间以查看输出
Thread.sleep(500000);
}
}