【翻译】什么时候该使用onErrorContinue()
答案是:永远别使用
我最近收到一个关于 Reactor 中 onErrorContinue()
操作符行为的问题。老实说,我从未在生产代码中使用过它。
更坦率地说,我并不完全清楚它是如何工作的。因此,我深入研究了文档和一些在线讨论。原则上,onErrorContinue()
操作符应该是忽略一个错误,然后继续运行。
所以,如果你有一个生成了数千个事件的流,在100个事件上遇到错误,你仍然可以继续处理剩下的900个事件。听起来很棒,尤其是与 onErrorResume()
相比。
后者会简单地停止流并将其替换为另一个流。技术上来说,替换流可以是刚刚失败的那个流。
这基本上就是 retry()
操作符的工作原理:当流失败时,重新订阅它
遗憾的是,onErrorResume()
和 retry()
都不会保存失败流的状态。这意味着重试可能会生成我们已经处理过的相同事件,或者错过一些事件。这取决于 初始流是如何构建的,简单来说,就是看它是hot stream还是cold stream。
从这个角度来看,onErrorContinue()
听起来是个好主意,只需吞掉出错的事件并继续前进!不幸的是,onErrorContinue()
操作符相当棘手,可能会导致一些微妙的错误。
查看这篇精彩的关于 Reactor 中 onErrorContinue
和 onErrorResume
的文章,里面有一些有趣的例子:Reactor onErrorContinue VS onErrorResume
关于onErrorContinue的设计
我偶然发现了 GitHub 上关于 onErrorContinue() 设计的讨论。困惑的开发者们与 Reactor 库贡献者之间持续了一年的对话中,有来自其中一位创始人的精彩引用:
onErrorContinue
是我犯下的一个价值十亿美元的错误 :(
没必要责怪 Simon Baslé,设计一个应用程序接口以及它将如何发展是一件非常困难的事情。
Reactor 和 RxJava 在历史上都曾删除过多个操作符。但这句话可能最能说明你应该如何处理这个操作符。
onErrorContinue()承诺跳过无效输入。让我们以此为例:
Flux.just("one.txt", "two.txt", "three.txt")
.flatMap(file -> Mono.fromCallable(() -> new FileInputStream(file)))
.doOnNext(e -> log.info("Got file {}", e))
.onErrorContinue(FileNotFoundException.class, (ex, o) -> log.warn("Not found? {}", ex.toString()))
.onErrorContinue(IOException.class, (ex, o) -> log.warn("I/O error {}", ex.toString()));
只有文件 two.txt
存在时。输出结果符合预期:
WARN - Not found? java.io.FileNotFoundException: one.txt (No such file or directory)
INFO - Got file java.io.FileInputStream@6933b6c6
WARN - Not found? java.io.FileNotFoundException: three.txt (No such file or directory)
我故意忽略异常的堆栈跟踪。除了博客文章之外,实际项目中这么做绝对是错误的
如果没有 onErrorContinue()
,数据流就会在第一个文件上失败。听起来差不多吧?
那么,如果稍作修改后的代码段不抛出 FileNotFoundException
,而是抛出更通用的 IOException
呢?
幸运的是,我们有两个 onErrorContinue()
,所以期望能在第二个onErrorContinue()
捕获到IOException
Flux
.just("one.txt", "two.txt", "three.txt")
.flatMap(file -> Mono.fromCallable(() -> new File("/dev", file).createNewFile()))
.doOnNext(e -> log.info("Got file {}", e))
.onErrorContinue(FileNotFoundException.class, (ex, o) -> log.warn("Not found? {}", ex.toString()))
.onErrorContinue(IOException.class, (ex, o) -> log.warn("I/O error {}", ex.toString()));
如上述代码,首先是不允许在 /dev 内创建文件的。那么,期望看到的是三次 IOException
?
但是实际上执行时,只执行了第一个 onErrorContinue
,而第二个 onErrorContinue
被忽略,调用链过早终止:
Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.io.IOException: Operation not permitted
Caused by: java.io.IOException: Operation not permitted
at java.base/java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.base/java.io.File.createNewFile(File.java:1026)
如果你觉得这是正常的,那么考虑类似代码逻辑,但是在代码中没有处理 FileNotFoundException
。
听起来这应该没什么关系,毕竟 createNewFile()
引发的是通用 IOException
。但结果是什么呢?
WARN - I/O error java.io.IOException: Operation not permitted
WARN - I/O error java.io.IOException: Operation not permitted
WARN - I/O error java.io.IOException: Operation not permitted
老实说,我不太明白这是怎么回事。为什么删除看似被忽略的 FileNotFoundException
处理后,IOException
处理的行为会突然发生变化?
使用onErrorResume和doOnError
我宁愿使用效率稍低但可读性更高的 onErrorResume()
,因为它是可以预测的。请注意我是如何将 doOnError()
和 onErrorResume()
分割开来的,
public static void main(String[] args) {
Flux
.just("one.txt", "two.txt", "three.txt")
.flatMap(file -> createFile(file))
.doOnNext(e -> log.info("Got file {}", e))
.subscribe();
}
private static Mono<Boolean> createFile(String file) {
return Mono
.fromCallable(() -> new File("/dev", file).createNewFile())
.doOnError(FileNotFoundException.class, ex -> log.warn("Not found? {}", ex.toString()))
.doOnError(IOException.class, ex -> log.warn("I/O error {}", ex.toString()))
.onErrorResume(IOException.class, ex -> Mono.empty());
}
我们得到了期望的响应,没有使用onErrorContinue()
的意外行为:
WARN I/O error for one.txt: java.io.IOException: Operation not permitted
WARN I/O error for two.txt: java.io.IOException: Operation not permitted
WARN I/O error for three.txt: java.io.IOException: Operation not permitted
所以,长话短说。 onErrorContinue()
的创建是为了提高错误处理的性能。
它是通过避免在 Mono.fromCallable()
中包装操作来实现的。不管怎么说,它的行为有时很难理解。此外,并非每个上游运营商都支持resuming。
如果你不太明白上面这句话,我会远离 onErrorContinue()
。实际上,我建议一般情况下避免使用 onErrorContinue()
。
你完全可以使用 onErrorResume()
或 onErrorReturn()
实现相同的效果。