合并历史和现场股票价格数据与Rx
我试用Rx,因为它似乎是一个很好的适合我们的领域,但学习曲线让我吃惊。合并历史和现场股票价格数据与Rx
我需要将历史价格数据与实时价格数据结合在一起。
我想通常的做法适应做成Rx的语言这样的:
- 立即订阅实时价格,并开始缓冲值,我回去
- 启动对历史的请求价格数据(这需要在订阅实时价格后发生,因此我们的数据没有任何空白)
- 当他们回来时发布历史价格
- 一旦我们收到所有历史数据,发布缓冲区实时数据,删除任何值在开始我们的历史数据重叠
- 继续从实时价饲料重放数据
我有这样的恶心和不正确的稻草人码这似乎为幼稚的测试案例,我写的工作:
IConnectableObservable<Tick> live = liveService
.For(symbol)
.Replay(/* Some appropriate buffer size */);
live.Connect();
IObservable<Tick> historical = historyService.For(since, symbol);
return new[] {historical, live}
.Concat()
.Where(TicksAreInChronologicalOrder());
private static Func1<Tick,bool> TicksAreInChronologicalOrder()
{
// Some stateful predicate comparing the timestamp of this tick
// to the timestamp of the last tick we saw
}
这具有一些缺点
- 适当重传缓冲器尺寸是未知的。设置无限制的缓冲区是不可能的 - 这是一个长期运行的序列。真的,我们需要某种一次性缓冲区,在第一次调用Subscribe时刷新。如果这存在于Rx中,我找不到它。
- 即使我们切换到发布实时价格,重播缓冲区仍将继续存在。此时我们不需要缓冲区。
- 同样,一旦我们跳过了历史价格和实时价格之间的初始重叠,就不需要过滤出重叠记号的谓词。我真的想做一些事情:
live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */)
。Wait(this IObservable<TSource>)
在这里有用吗?
必须有更好的方法来做到这一点,但我仍然在等待我的大脑像Rx那样练习FP。
我已经考虑解决的另一个选项是1.编写我自己的Rx扩展,它将是一个排队消息的ISubject
,直到它获得其第一个订户(并在此之后拒绝订户?)。也许这是要走的路?
为了记录在案,这里就是我最后还是没买。我仍然是一个Rx学习者,然后回到.Net上次看到2.0版本。所有反馈都非常感谢。
下面使用的Ticks对象可能包含一个或多个刻度值。历史数据服务在多个Ticks中返回数据。
public class HistoricalAndLivePriceFeed : IPriceFeed
{
private readonly IPriceFeed history;
private readonly IPriceFeed live;
private readonly IClock clock;
public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live)
: this(history, live, new RealClock())
{
}
public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock)
{
this.history = history;
this.live = live;
this.clock = clock;
}
public IObservable<Ticks> For(DateTime since, ISymbol symbol)
{
return Observable.Create<Ticks>(observer =>
{
var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol));
var definitelyInHistoricalTicks = clock.Now;
// Sleep to make sure that historical data overlaps our live data
// If we ever use a data provider with less fresh historical data, we may need to rethink this
clock.Wait(TimeSpan.FromSeconds(1));
var liveStreamAfterEndOfHistoricalTicks = liveStream
.SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks)
.Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1));
var subscription = history.For(since, symbol)
.Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1))
.Concat(liveStreamAfterEndOfHistoricalTicks)
.Subscribe(observer);
return liveStream.And(subscription);
});
}
}
public static class CompositeDisposableExtensions
{
public static CompositeDisposable And(this IDisposable disposable, Action action)
{
return And(disposable, Disposable.Create(action));
}
public static CompositeDisposable And(this IDisposable disposable, IDisposable other)
{
return new CompositeDisposable(disposable, other);
}
}
使用这个代码的Rx,我还是不太放心:
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
namespace My.Rx
{
/// <summary>
/// Buffers values from an underlying observable when no observers are subscribed.
///
/// On Subscription, any buffered values will be replayed.
///
/// Only supports one observer for now.
///
/// Buffer is an ISubject for convenience of implementation but IObserver methods
/// are hidden. It is not intended that Buffer should be used as an IObserver,
/// except through StartBuffering() and it is dangerous to do so because none of
/// the IObserver methods check whether Buffer has been disposed.
/// </summary>
/// <typeparam name="TSource"></typeparam>
public class Buffer<TSource> : ISubject<TSource>, IDisposable
{
private readonly object gate = new object();
private readonly Queue<TSource> queue = new Queue<TSource>();
private bool isDisposed;
private Exception error;
private bool stopped;
private IObserver<TSource> observer = null;
private IDisposable subscription;
public static Buffer<TSource> StartBuffering(IObservable<TSource> observable)
{
return new Buffer<TSource>(observable);
}
private Buffer(IObservable<TSource> observable)
{
subscription = observable.Subscribe(this);
}
void IObserver<TSource>.OnNext(TSource value)
{
lock (gate)
{
if (stopped) return;
if (IsBuffering)
queue.Enqueue(value);
else
observer.OnNext(value);
}
}
void IObserver<TSource>.OnError(Exception error)
{
lock (gate)
{
if (stopped) return;
if (IsBuffering)
this.error = error;
else
observer.OnError(error);
stopped = true;
}
}
void IObserver<TSource>.OnCompleted()
{
lock (gate)
{
stopped = true;
}
}
public IDisposable Subscribe(IObserver<TSource> observer)
{
lock (gate)
{
if (isDisposed)
throw new ObjectDisposedException(string.Empty);
if (this.observer != null)
throw new NotImplementedException("A Buffer can currently only support one observer at a time");
while(!queue.IsEmpty())
{
observer.OnNext(queue.Dequeue());
}
if (error != null)
observer.OnError(error);
else if (stopped)
observer.OnCompleted();
this.observer = observer;
return Disposable.Create(() =>
{
lock (gate)
{
// Go back to buffering
this.observer = null;
}
});
}
}
private bool IsBuffering
{
get { return observer == null; }
}
public void Dispose()
{
lock (gate)
{
subscription.Dispose();
isDisposed = true;
subscription = null;
observer = null;
}
}
}
}
其中通过这些测试(我没有打扰检查线程安全尚):
private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world");
[Test]
public void ReplaysBufferedValuesToFirstSubscriber()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnNext(2);
var observed = new List<int>();
buffer.Subscribe(Observer.Create<int>(observed.Add));
Assert.That(observed, Is.EquivalentTo(new []{1,2}));
}
[Test]
public void PassesNewValuesToObserver()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
var observed = new List<int>();
buffer.Subscribe(Observer.Create<int>(observed.Add));
underlying.OnNext(1);
underlying.OnNext(2);
Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 }));
}
[Test]
public void DisposesOfSubscriptions()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
var observed = new List<int>();
buffer.Subscribe(Observer.Create<int>(observed.Add))
.Dispose();
underlying.OnNext(1);
Assert.That(observed, Is.Empty);
}
[Test]
public void StartsBufferingAgainWhenSubscriptionIsDisposed()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
// These should be buffered
underlying.OnNext(1);
underlying.OnNext(2);
var firstSubscriptionObserved = new List<int>();
using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add)))
{
// Should be passed through to first subscription
underlying.OnNext(3);
}
Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 }));
// First subscription has been disposed-
// we should be back to buffering again
underlying.OnNext(4);
underlying.OnNext(5);
var secondSubscriptionObserved = new List<int>();
using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add)))
{
// Should be passed through to second subscription
underlying.OnNext(6);
}
Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6}));
}
[Test]
public void DoesNotSupportTwoConcurrentObservers()
{
// Use .Publish() if you need to do this
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
buffer.Subscribe(Observer.Create<int>(i => { }));
Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}
[Test]
public void CannotBeUsedAfterDisposal()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
buffer.Dispose();
Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}
[Test]
public void ReplaysBufferedError()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnError(exceptionThrownFromUnderlying);
var observed = new List<int>();
Exception foundException = null;
buffer.Subscribe(
observed.Add,
e => foundException = e);
Assert.That(observed, Is.EquivalentTo(new []{1}));
Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying));
}
[Test]
public void ReplaysBufferedCompletion()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnCompleted();
var observed = new List<int>();
var completed = false;
buffer.Subscribe(
observed.Add,
() => completed=true);
Assert.That(observed, Is.EquivalentTo(new[] { 1 }));
Assert.True(completed);
}
[Test]
public void ReplaysBufferedErrorToSubsequentObservers()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnError(exceptionThrownFromUnderlying);
// Drain value queue
using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ;
var observered = new List<int>();
Exception exceptionEncountered = null;
using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e)));
Assert.That(observered, Is.Empty);
Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying));
}
[Test]
public void ReplaysBufferedCompletionToSubsequentObservers()
{
var underlying = new Subject<int>();
var buffer = Buffer<int>.StartBuffering(underlying);
underlying.OnNext(1);
underlying.OnCompleted();
// Drain value queue
using (buffer.Subscribe(Observer.Create<int>(i => { }))) ;
var observered = new List<int>();
var completed = false;
using (buffer.Subscribe(Observer.Create<int>(observered.Add,()=>completed=true)));
Assert.That(observered, Is.Empty);
Assert.True(completed);
}
[Test]
public void DisposingOfBufferDisposesUnderlyingSubscription()
{
var underlyingSubscriptionWasDisposed = false;
var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed= true ));
var buffer = Buffer<int>.StartBuffering(underlying);
buffer.Dispose();
Assert.True(underlyingSubscriptionWasDisposed);
}
如果你的历史和实时数据都是时间或调度为基础的,也就是说,事件流看起来像这样随着时间的推移:
|----------------------------------------------------> time
h h h h h h historical
l l l l l l live
您可以使用一个简单的TakeUntil
结构:
var historicalStream = <fetch historical data>;
var liveStream = <fetch live data>;
var mergedWithoutOverlap =
// pull from historical
historicalStream
// until we start overlapping with live
.TakeUntil(liveStream)
// then continue with live data
.Concat(liveStream);
如果你让所有的历史数据全部一次,像IEnumerable<T>
,你可以使用StartWith
搭配您的其他逻辑:
var historicalData = <get IEnumerable of tick data>;
var liveData = <get IObservable of tick data>;
var mergedWithOverlap =
// the observable is the "long running" feed
liveData
// But we'll inject the historical data in front of it
.StartWith(historicalData)
// Perform filtering based on your needs
.Where(....);
如何像:
public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector)
{
var replaySubject = new ReplaySubject<T>();
live.Subscribe(replaySubject);
return history.Concat(replaySubject).Distinct(selector);
}
它使用的序列标识和独特的过滤重复。
以及相应的测试:
var testScheduler = new TestScheduler();
var history = testScheduler.CreateColdObservable(
OnNext(1L, new PriceTick { PriceId = 1 }),
OnNext(2L, new PriceTick { PriceId = 2 }),
OnNext(3L, new PriceTick { PriceId = 3 }),
OnNext(4L, new PriceTick { PriceId = 4 }),
OnCompleted(new PriceTick(), 5L));
var live = testScheduler.CreateHotObservable(
OnNext(1L, new PriceTick { PriceId = 3 }),
OnNext(2L, new PriceTick { PriceId = 4 }),
OnNext(3L, new PriceTick { PriceId = 5 }),
OnNext(4L, new PriceTick { PriceId = 6 }),
OnNext(5L, new PriceTick { PriceId = 7 }),
OnNext(6L, new PriceTick { PriceId = 8 }),
OnNext(7L, new PriceTick { PriceId = 9 })
);
live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId));
history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId),() => Console.WriteLine("C"));
var combined = live.CombineWithHistory(history, t => t.PriceId);
combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId));
testScheduler.AdvanceTo(6L);
如果执行这个测试中,结合放出的价格与IDS 1蜱至8
感谢戴夫,这给我带来了一些很好的调度器新技巧。 在我们的例子中使用Distinct的问题是: 1)因为数据以一个或多个刻度而不是单个值的块形式返回,所以在调用选择器之前,我们必须先选择SelectMany。鉴于我们大量的数据和性能要求,这是不可能的。 2)为了节省内存,我们的刻度具有时间戳精度为1秒。我们可能会在同一个秒内有几个滴答声,所以不可能在没有状态的情况下编写语义正确的选择器函数。 – 2013-04-24 18:18:36
我实际上改进了这个答案,因为我自己做了它,但忘了发布代码。最后,我基本上排队了蜱,然后在历史完成时清除它们。你需要有一些序列号来确保你不会丢失任何数据。 – 2013-04-24 21:52:27
此外,我给出的代码不符合您的要求,因为重播主题将保存整个历史记录。 – 2013-04-24 21:53:43
内存和交易重叠(正确性)方便的方式。
等待您的反馈:
var tradeIds = new HashSet<string>();
var replayQuotationTrades = new ReplaySubject<IntradayTrade>();
var replaySubscription = _quotationTrades.Subscribe(replayQuotationTrades);
return _historyTrades
.DelaySubscription(TimeSpan.FromMilliseconds(500), _backgroundScheduler)
.Do(t => tradeIds.Add(t.TradeId))
.Finally(() => DisposeAndCompleteReplayStream(replaySubscription, replayQuotationTrades))
.Concat(replayQuotationTrades.Where(t => !tradeIds.Contains(t.TradeId)))
.Finally(tradeIds.Clear)
.Concat(_quotationTrades)
.Subscribe(observer);
'Switch()'在这里工作吗?如:'historical.Switch(live)' – AlexFoxGill 2013-02-12 10:18:33