取消RX.Net Observer的正在进行的OnNext方法

取消RX.Net Observer的正在进行的OnNext方法

问题描述:

正如我原来的问题所述(请参阅Correlate interdependent Event Streams with RX.Net),我有一个RX.net事件流,只有在某个其他事件没有被触发时才会调用观察者的OnNext方法(基本上是'处理更改 - *只要系统已连接,事件就会中断,断开连接时暂停,并在系统重新连接后重新开始处理Change- *事件。取消RX.Net Observer的正在进行的OnNext方法

但是,虽然这项新事件顺利进行,但我将如何取消/取消信号到正在进行中 .OnNext()调用?

+0

澄清,处理方法是async/await-able,并已取消CancellationToken作为参数..也许有可能结合RX和这种异步方法。 – 2014-11-03 14:58:31

+0

你能发表一些代码吗?你现在传什么'CancellationToken'? – Brandon 2014-11-03 15:01:00

+0

@Brandon今天晚些时候会做,目前正在运行,并且项目在我家里的机器上。现在,我仅仅是在处理默认的(CancellationToken)/ CancellationToken.None,但不相信使用和外部CancellationTokenSource字段(我会在原始流中触发,然后重新创建)。 – 2014-11-03 15:04:43

由于您的观察者已经接受CancellationToken,我们可以修改您的Rx流以提供一个事件数据。我们将使用Rx CancellationDisposable,只要流取消订阅,我们将处置该Rx CancellationDisposable

// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed 
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source) 
{ 
    return Observable.Using(
     () => new CancellationDisposable(), 
     cts => source.Select(item => Tuple.Create(cts.Token, item))); 
} 

与其他问题的解决方案放在一起这样的:

DataSourceLoaded 
    .SelectMany(_ => DataSourceFieldChanged 
     .Throttle(x) 
     .CancelOnUnsubscribe() 
     .TakeUntil(DataSourceLoaded)) 
    .Subscribe(c => handler(c.Item1, c.Item2)); 

TakeUntil条款被触发,它将从CancelOnUnsubscribe观察到的取消,将在CancellationDisposable和事业转处置令牌被取消。您的观察员可以观看此令牌并在发生此情况时停止其工作。

+0

感谢您的解决方案!我发现你的答案是一个类似的问题(http://stackoverflow.com/questions/18477018/rx-and-tasks-cancel-running-task-when-new-task-is-spawned?rq=1)多一点优雅阅读虽然并结合它是这样的:https://gist.github.com/jbattermann/914d48f4011f0842a03b – 2014-11-05 21:05:22

+0

更新了要点,它现在完全工作。感谢Brandon,也是关于CancellationDisposable的暗示 - 也不知道那一个。 – 2014-11-05 21:36:44

+0

嗯,这是一个与你在这里发布的问题稍有不同的问题。您的要点(和解决方案)会取消当前的工作任务,并在新的“DataSourceFieldChanged”事件到达时启动一个新任务。听起来像这实际上是你想要的,虽然这么好交易:) – Brandon 2014-11-05 22:32:39

有一个异步超载SelectMany,虽然承认如果Do存在类似的过载,它会更符合语义。

var subscription = 
    (from _ in DataSourceLoaded 
    from __ in DataSourceFieldChanged 
    .Throttle(x) 
    .SelectMany(DataSourceFieldChangedAsync) 
    .TakeUntil(DataSourceUnloaded) 
    select Unit.Default); 
    .Subscribe(); // Subscribing for side effects only. 

... 

async Task<Unit> DataSourceFieldChangedAsync(Field value, CancellationToken cancel); 

这是不错的,因为它关系到取消的订阅为好。

调用要么

subscription.Dispose() 

DataSourceUnloaded.OnNext(x); 

将导致CancellationToken被取消。

+0

我创建了一个工作项目。目前正在讨论替代方法: https:// github。com/Reactive-Extensions/Rx.NET/issues/63#issuecomment-61631595 – 2014-11-04 13:22:05

+0

Thansk Dave。我把你的和Brandon的解决方案(更确切地说,Brandon的答案基本相同,但先前被问及SO问题)并将它们合并到此:https://gist.github.com/jbattermann/914d48f4011f0842a03b – 2014-11-05 21:03:38

+0

这只适用于'几乎'正确:虽然DataSourceFieldChanged仍然是异步处理的,而DataSourceUnloaded同时发生,但后者不会为WorkerTask(..)方法触发cancellationToken - 出于显而易见的原因,但不完全是“那里”。 – 2014-11-05 21:14:07