取消RX.Net Observer的正在进行的OnNext方法
正如我原来的问题所述(请参阅Correlate interdependent Event Streams with RX.Net),我有一个RX.net事件流,只有在某个其他事件没有被触发时才会调用观察者的OnNext方法(基本上是'处理更改 - *只要系统已连接,事件就会中断,断开连接时暂停,并在系统重新连接后重新开始处理Change- *事件。取消RX.Net Observer的正在进行的OnNext方法
但是,虽然这项新事件顺利进行,但我将如何取消/取消信号到正在进行中 .OnNext()调用?
由于您的观察者已经接受CancellationToken
,我们可以修改您的Rx流以提供一个事件数据。我们将使用Rx CancellationDisposable
,只要流取消订阅,我们将处置该Rx CancellationDisposable
。
// Converts into a stream that supplies a `CancellationToken` that will be cancelled when the stream is unsubscribed
public static IObservable<Tuple<CancellationToken, T>> CancelOnUnsubscribe<T>(this IObservable<T> source)
{
return Observable.Using(
() => new CancellationDisposable(),
cts => source.Select(item => Tuple.Create(cts.Token, item)));
}
与其他问题的解决方案放在一起这样的:
DataSourceLoaded
.SelectMany(_ => DataSourceFieldChanged
.Throttle(x)
.CancelOnUnsubscribe()
.TakeUntil(DataSourceLoaded))
.Subscribe(c => handler(c.Item1, c.Item2));
当TakeUntil
条款被触发,它将从CancelOnUnsubscribe
观察到的取消,将在CancellationDisposable
和事业转处置令牌被取消。您的观察员可以观看此令牌并在发生此情况时停止其工作。
感谢您的解决方案!我发现你的答案是一个类似的问题(http://stackoverflow.com/questions/18477018/rx-and-tasks-cancel-running-task-when-new-task-is-spawned?rq=1)多一点优雅阅读虽然并结合它是这样的:https://gist.github.com/jbattermann/914d48f4011f0842a03b – 2014-11-05 21:05:22
更新了要点,它现在完全工作。感谢Brandon,也是关于CancellationDisposable的暗示 - 也不知道那一个。 – 2014-11-05 21:36:44
嗯,这是一个与你在这里发布的问题稍有不同的问题。您的要点(和解决方案)会取消当前的工作任务,并在新的“DataSourceFieldChanged”事件到达时启动一个新任务。听起来像这实际上是你想要的,虽然这么好交易:) – Brandon 2014-11-05 22:32:39
有一个异步超载SelectMany
,虽然承认如果Do
存在类似的过载,它会更符合语义。
var subscription =
(from _ in DataSourceLoaded
from __ in DataSourceFieldChanged
.Throttle(x)
.SelectMany(DataSourceFieldChangedAsync)
.TakeUntil(DataSourceUnloaded)
select Unit.Default);
.Subscribe(); // Subscribing for side effects only.
...
async Task<Unit> DataSourceFieldChangedAsync(Field value, CancellationToken cancel);
这是不错的,因为它关系到取消的订阅为好。
调用要么
subscription.Dispose()
或
DataSourceUnloaded.OnNext(x);
将导致CancellationToken
被取消。
我创建了一个工作项目。目前正在讨论替代方法: https:// github。com/Reactive-Extensions/Rx.NET/issues/63#issuecomment-61631595 – 2014-11-04 13:22:05
Thansk Dave。我把你的和Brandon的解决方案(更确切地说,Brandon的答案基本相同,但先前被问及SO问题)并将它们合并到此:https://gist.github.com/jbattermann/914d48f4011f0842a03b – 2014-11-05 21:03:38
这只适用于'几乎'正确:虽然DataSourceFieldChanged仍然是异步处理的,而DataSourceUnloaded同时发生,但后者不会为WorkerTask(..)方法触发cancellationToken - 出于显而易见的原因,但不完全是“那里”。 – 2014-11-05 21:14:07
澄清,处理方法是async/await-able,并已取消CancellationToken作为参数..也许有可能结合RX和这种异步方法。 – 2014-11-03 14:58:31
你能发表一些代码吗?你现在传什么'CancellationToken'? – Brandon 2014-11-03 15:01:00
@Brandon今天晚些时候会做,目前正在运行,并且项目在我家里的机器上。现在,我仅仅是在处理默认的(CancellationToken)/ CancellationToken.None,但不相信使用和外部CancellationTokenSource字段(我会在原始流中触发,然后重新创建)。 – 2014-11-03 15:04:43