跳到主要内容

【翻译】什么时候该使用onErrorContinue()

答案是:永远别使用

我最近收到一个关于 Reactor 中 onErrorContinue() 操作符行为的问题。老实说,我从未在生产代码中使用过它。

更坦率地说,我并不完全清楚它是如何工作的。因此,我深入研究了文档和一些在线讨论。原则上,onErrorContinue() 操作符应该是忽略一个错误,然后继续运行。

所以,如果你有一个生成了数千个事件的流,在100个事件上遇到错误,你仍然可以继续处理剩下的900个事件。听起来很棒,尤其是与 onErrorResume() 相比。

后者会简单地停止流并将其替换为另一个流。技术上来说,替换流可以是刚刚失败的那个流。

这基本上就是 retry() 操作符的工作原理:当流失败时,重新订阅它

遗憾的是,onErrorResume()retry() 都不会保存失败流的状态。这意味着重试可能会生成我们已经处理过的相同事件,或者错过一些事件。这取决于初始流是如何构建的,简单来说,就是看它是hot stream还是cold stream。

从这个角度来看,onErrorContinue() 听起来是个好主意,只需吞掉出错的事件并继续前进!不幸的是,onErrorContinue() 操作符相当棘手,可能会导致一些微妙的错误。

查看这篇精彩的关于 Reactor 中 onErrorContinueonErrorResume 的文章,里面有一些有趣的例子:Reactor onErrorContinue VS onErrorResume

关于onErrorContinue的设计

我偶然发现了 GitHub 上关于 onErrorContinue() 设计的讨论。困惑的开发者们与 Reactor 库贡献者之间持续了一年的对话中,有来自其中一位创始人的精彩引用

onErrorContinue 是我犯下的一个价值十亿美元的错误 :(

没必要责怪 Simon Baslé,设计一个应用程序接口以及它将如何发展是一件非常困难的事情。

Reactor 和 RxJava 在历史上都曾删除过多个操作符。但这句话可能最能说明你应该如何处理这个操作符。

onErrorContinue()承诺跳过无效输入。让我们以此为例:

Flux.just("one.txt", "two.txt", "three.txt")
.flatMap(file -> Mono.fromCallable(() -> new FileInputStream(file)))
.doOnNext(e -> log.info("Got file {}", e))
.onErrorContinue(FileNotFoundException.class, (ex, o) -> log.warn("Not found? {}", ex.toString()))
.onErrorContinue(IOException.class, (ex, o) -> log.warn("I/O error {}", ex.toString()));

只有文件 two.txt 存在时。输出结果符合预期:

WARN - Not found? java.io.FileNotFoundException: one.txt (No such file or directory)
INFO - Got file java.io.FileInputStream@6933b6c6
WARN - Not found? java.io.FileNotFoundException: three.txt (No such file or directory)
注意

我故意忽略异常的堆栈跟踪。除了博客文章之外,实际项目中这么做绝对是错误的

如果没有 onErrorContinue(),数据流就会在第一个文件上失败。听起来差不多吧? 那么,如果稍作修改后的代码段不抛出 FileNotFoundException,而是抛出更通用的 IOException 呢? 幸运的是,我们有两个 onErrorContinue(),所以期望能在第二个onErrorContinue()捕获到IOException

Flux
.just("one.txt", "two.txt", "three.txt")
.flatMap(file -> Mono.fromCallable(() -> new File("/dev", file).createNewFile()))
.doOnNext(e -> log.info("Got file {}", e))
.onErrorContinue(FileNotFoundException.class, (ex, o) -> log.warn("Not found? {}", ex.toString()))
.onErrorContinue(IOException.class, (ex, o) -> log.warn("I/O error {}", ex.toString()));

如上述代码,首先是不允许在 /dev 内创建文件的。那么,期望看到的是三次 IOException

但是实际上执行时,只执行了第一个 onErrorContinue,而第二个 onErrorContinue 被忽略,调用链过早终止:

Exception in thread "main" reactor.core.Exceptions$ErrorCallbackNotImplemented: java.io.IOException: Operation not permitted
Caused by: java.io.IOException: Operation not permitted
at java.base/java.io.UnixFileSystem.createFileExclusively(Native Method)
at java.base/java.io.File.createNewFile(File.java:1026)

如果你觉得这是正常的,那么考虑类似代码逻辑,但是在代码中没有处理 FileNotFoundException。 听起来这应该没什么关系,毕竟 createNewFile() 引发的是通用 IOException。但结果是什么呢?

WARN - I/O error java.io.IOException: Operation not permitted
WARN - I/O error java.io.IOException: Operation not permitted
WARN - I/O error java.io.IOException: Operation not permitted

老实说,我不太明白这是怎么回事。为什么删除看似被忽略的 FileNotFoundException 处理后,IOException 处理的行为会突然发生变化?

使用onErrorResume和doOnError

我宁愿使用效率稍低但可读性更高的 onErrorResume(),因为它是可以预测的。请注意我是如何将 doOnError()onErrorResume() 分割开来的,

doOnError用于日志记录和度量,onErrorResume用于对指定异常的实际处理
public static void main(String[] args) {
Flux
.just("one.txt", "two.txt", "three.txt")
.flatMap(file -> createFile(file))
.doOnNext(e -> log.info("Got file {}", e))
.subscribe();
}

private static Mono<Boolean> createFile(String file) {
return Mono
.fromCallable(() -> new File("/dev", file).createNewFile())
.doOnError(FileNotFoundException.class, ex -> log.warn("Not found? {}", ex.toString()))
.doOnError(IOException.class, ex -> log.warn("I/O error {}", ex.toString()))
.onErrorResume(IOException.class, ex -> Mono.empty());
}

我们得到了期望的响应,没有使用onErrorContinue() 的意外行为:

WARN I/O error for one.txt: java.io.IOException: Operation not permitted
WARN I/O error for two.txt: java.io.IOException: Operation not permitted
WARN I/O error for three.txt: java.io.IOException: Operation not permitted

所以,长话短说。 onErrorContinue() 的创建是为了提高错误处理的性能。

它是通过避免在 Mono.fromCallable() 中包装操作来实现的。不管怎么说,它的行为有时很难理解。此外,并非每个上游运营商都支持resuming。

如果你不太明白上面这句话,我会远离 onErrorContinue()。实际上,我建议一般情况下避免使用 onErrorContinue()

你完全可以使用 onErrorResume()onErrorReturn() 实现相同的效果。