正确的方式来应对“onNext”抛出热点,共享的错误,可观测量

正确的方式来应对“onNext”抛出热点,共享的错误,可观测量

问题描述:

在RxJS 5版本,在这个过程中下面的代码后的结果这两个预订的三个迭代被终止:正确的方式来应对“onNext”抛出热点,共享的错误,可观测量

var Rx = require("rxjs"); 

const published$ = Rx.Observable.interval(1000).publish(); 

published$.subscribe(index => { 
    console.log(`One: ${index}`); 

    if (index == 3) throw new Error("ded."); 
}); 

published$.forEach(index => { 
    console.log(`Two: ${index}`); 
}); 

published$.connect(); 

然而,我的理解是,在下一个处理程序中抛出的错误将简单地取消订阅特定的订阅,并且不会导致底层的observable终止。我的预期产出是“One”订阅会取消订阅,但间隔会继续产生“Two”订阅的结果。

这种行为引起了我的问题,我可能有多个订阅到一个底层的热观察 - 但是任何这些订阅抛出的单个异常导致底层的observable完全终止。

当我使用热模块重新加载进行开发时,尤其令人讨厌,因为任何订阅中的任何编程错误都会导致我不得不刷新整个页面以重新启动可观察序列。

有没有一种方法,没有在try/catch中包装我的每个订阅,在我的下一个处理程序中抛出异常,以便简单地取消订阅该一个订阅,并且不终止底层的observable?

------------ ------------编辑

我发现我要找的行为,通过syncErrorThrowable设置对“订阅”返回的订阅对象为true。看起来,在代码库中唯一一次这样设置为true的是通过“do”操作符。

我应该利用这个领域吗?我觉得用它很肮脏,但另一方面,我发现奇怪的是,“do”操作符具有与“下一个”订阅处理器不同的错误处理语义。

这里的代码受此标志的主区块: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L132

如果将它设置为false,此方法被调用: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L179

然而,如果它被设置为true,此方法来代替: https://github.com/ReactiveX/RxJS/blob/master/src%2FSubscriber.ts#L188

区别在于第一个方法将重新抛出异常备份调用堆栈,而第二个方法将错误向前传播到后续订阅。

为什么do运算符向前传播错误,而“下一个”处理程序将错误恢复起来?这对我来说似乎很奇怪。

不,不要使用该字段。如果您将其更改为true,您的订阅将开始吞咽错误。

这是我们用来了解是否同步通知订阅(与源Observable的订阅呼叫位于同一个块中)或异步通知的私人状态。如果在同步通知期间从其中一个订阅者的消息处理程序抛出错误,我们推迟重新抛出它,直到退出Observable的订阅回调。[1]

如果您的处理程序抛出您希望转发给您的订阅的onError处理程序的错误,则本指南将它们移动到订阅上方的do块中。

不,我不同意这种行为。这里有一个背景下几个环节:

[1]来源:我写了这段代码。