Rx.NET GroupByUntil组终止,等待线程完成
我有无限的对象流。而且我的要求是,来自具有相同密钥的可观察流中的每个项目应该被同步处理,并且具有不同密钥的所有其他项目可能/应该并行处理。做到这一点(在大多数地方提到),最简单的方法是使用GroupByUntil
操作:Rx.NET GroupByUntil组终止,等待线程完成
var results = observableStream
.GroupByUntil(item => item.Id, group =>
group.Throttle(TimeSpan.FromSeconds(30), scheduler))
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Select(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
代码工作很好,直到我可以保证的ProcessItem(item)
的执行时间不超过30秒。否则group.Throttle(TimeSpan.FromSeconds(30), scheduler)
将关闭组的流,并且新项目到达并开始在新线程上处理的可能性很高。
所以基本上我需要知道我的线程已经完成处理所有具有特定键的项目,并且我需要在durationSelector
之内通知GroupByUntil
关于它的运算符参数。
有关如何实现此目的的任何想法?提前致谢。
这与此问题非常相似:A way to push buffered events in even intervals。
的问答形式对这个问题,有一个运营商Drain
:
public static class ObservableDrainExtensions
{
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
{
return Observable.Defer(() =>
{
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => { },() => queue.OnNext(new Unit()))
);
});
}
}
鉴于运营商,你的问题就变得非常简单:
var results = observableStream
.GroupBy(item => item.Id)
.SelectMany(group =>
group
.ObserveOn(scheduler)
.Drain(item => ProcessItem(item)));
var disposable = results.Subscribe(result => SaveResults(result));
由于看起来像A1,A2流, B1,A3,B2,C1,B3,C2,GroupBy
通过ID分开流:
A: A1, A2, A3
B: B1, B2, B3
C: C1, C2
...和Drain
确保对于给定子流中的项目,它们串行运行,而不是并行运行。
不错的解决方案,但只使用'GroupBy',这些组不会被销毁,并且如果有大量的唯一密钥,我可能会耗尽内存。 – Azat
你怎么知道你已经收到了最后一个特定的密钥? – NetMage
@NetMage其实我不会知道。我试图实现的是,只有当处理特定组的线程完成了它的工作并且队列中再没有任何东西时,我才应该开始调节(反弹)。 – Azat
'ProcessItem'是否同步?它是“异步”吗?它是否返回'IObservable'? –
Shlomo