合并历史和现场股票价格数据与Rx

合并历史和现场股票价格数据与Rx

问题描述:

我试用Rx,因为它似乎是一个很好的适合我们的领域,但学习曲线让我吃惊。合并历史和现场股票价格数据与Rx

我需要将历史价格数据与实时价格数据结合在一起。

我想通常的做法适应做成Rx的语言这样的:

  1. 立即订阅实时价格,并开始缓冲值,我回去
  2. 启动对历史的请求价格数据(这需要在订阅实时价格后发生,因此我们的数据没有任何空白)
  3. 当他们回来时发布历史价格
  4. 一旦我们收到所有历史数据,发布缓冲区实时数据,删除任何值在开始我们的历史数据重叠
  5. 继续从实时价饲料重放数据

我有这样的恶心和不正确的稻草人码这似乎为幼稚的测试案例,我写的工作:

IConnectableObservable<Tick> live = liveService 
    .For(symbol) 
    .Replay(/* Some appropriate buffer size */); 
live.Connect(); 

IObservable<Tick> historical = historyService.For(since, symbol); 

return new[] {historical, live} 
    .Concat() 
    .Where(TicksAreInChronologicalOrder()); 

private static Func1<Tick,bool> TicksAreInChronologicalOrder() 
{ 
    // Some stateful predicate comparing the timestamp of this tick 
    // to the timestamp of the last tick we saw 
} 

这具有一些缺点

  1. 适当重传缓冲器尺寸是未知的。设置无限制的缓冲区是不可能的 - 这是一个长期运行的序列。真的,我们需要某种一次性缓冲区,在第一次调用Subscribe时刷新。如果这存在于Rx中,我找不到它。
  2. 即使我们切换到发布实时价格,重播缓冲区仍将继续存在。此时我们不需要缓冲区。
  3. 同样,一旦我们跳过了历史价格和实时价格之间的初始重叠,就不需要过滤出重叠记号的谓词。我真的想做一些事情:live.SkipWhile(tick => tick.Timestamp < /* lazily get last timestamp in historical data */)Wait(this IObservable<TSource>)在这里有用吗?

必须有更好的方法来做到这一点,但我仍然在等待我的大脑像Rx那样练习FP。

我已经考虑解决的另一个选项是1.编写我自己的Rx扩展,它将是一个排队消息的ISubject,直到它获得其第一个订户(并在此之后拒绝订户?)。也许这是要走的路?

+0

'Switch()'在这里工作吗?如:'historical.Switch(live)' – AlexFoxGill 2013-02-12 10:18:33

为了记录在案,这里就是我最后还是没买。我仍然是一个Rx学习者,然后回到.Net上次看到2.0版本。所有反馈都非常感谢。

下面使用的Ticks对象可能包含一个或多个刻度值。历史数据服务在多个Ticks中返回数据。

public class HistoricalAndLivePriceFeed : IPriceFeed 
{ 
    private readonly IPriceFeed history; 
    private readonly IPriceFeed live; 
    private readonly IClock clock; 

    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live) 
:   this(history, live, new RealClock()) 
     { 
    } 
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock) 
    { 
     this.history = history; 
     this.live = live; 
     this.clock = clock; 
    } 

    public IObservable<Ticks> For(DateTime since, ISymbol symbol) 
    { 
     return Observable.Create<Ticks>(observer => 
     { 
      var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol)); 

      var definitelyInHistoricalTicks = clock.Now; 
      // Sleep to make sure that historical data overlaps our live data 
      // If we ever use a data provider with less fresh historical data, we may need to rethink this 
      clock.Wait(TimeSpan.FromSeconds(1)); 

      var liveStreamAfterEndOfHistoricalTicks = liveStream 
       .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks) 
       .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1)); 

      var subscription = history.For(since, symbol) 
       .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1)) 
       .Concat(liveStreamAfterEndOfHistoricalTicks) 
       .Subscribe(observer); 

      return liveStream.And(subscription); 
     }); 
    } 
} 
public static class CompositeDisposableExtensions 
{ 
    public static CompositeDisposable And(this IDisposable disposable, Action action) 
    { 
     return And(disposable, Disposable.Create(action)); 
    } 

    public static CompositeDisposable And(this IDisposable disposable, IDisposable other) 
    { 
     return new CompositeDisposable(disposable, other); 
    } 
} 

使用这个代码的Rx,我还是不太放心:

using System; 
using System.Collections.Generic; 
using System.Reactive.Disposables; 
using System.Reactive.Subjects; 

namespace My.Rx 
{ 
    /// <summary> 
    /// Buffers values from an underlying observable when no observers are subscribed. 
    /// 
    /// On Subscription, any buffered values will be replayed. 
    /// 
    /// Only supports one observer for now. 
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods 
    /// are hidden. It is not intended that Buffer should be used as an IObserver, 
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed. 
    /// </summary> 
    /// <typeparam name="TSource"></typeparam> 
    public class Buffer<TSource> : ISubject<TSource>, IDisposable 
    { 
     private readonly object gate = new object(); 
     private readonly Queue<TSource> queue = new Queue<TSource>(); 

     private bool isDisposed; 
     private Exception error; 
     private bool stopped; 
     private IObserver<TSource> observer = null; 
     private IDisposable subscription; 

     public static Buffer<TSource> StartBuffering(IObservable<TSource> observable) 
     { 
      return new Buffer<TSource>(observable); 
     } 

     private Buffer(IObservable<TSource> observable) 
     { 
      subscription = observable.Subscribe(this); 
     } 

     void IObserver<TSource>.OnNext(TSource value) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        queue.Enqueue(value); 
       else 
        observer.OnNext(value); 
      } 
     } 

     void IObserver<TSource>.OnError(Exception error) 
     { 
      lock (gate) 
      { 
       if (stopped) return; 
       if (IsBuffering) 
        this.error = error; 
       else 
        observer.OnError(error); 
       stopped = true; 
      } 
     } 

     void IObserver<TSource>.OnCompleted() 
     { 
      lock (gate) 
      { 
       stopped = true; 
      } 
     } 

     public IDisposable Subscribe(IObserver<TSource> observer) 
     { 
      lock (gate) 
      { 
       if (isDisposed) 
        throw new ObjectDisposedException(string.Empty); 

       if (this.observer != null) 
        throw new NotImplementedException("A Buffer can currently only support one observer at a time"); 

       while(!queue.IsEmpty()) 
       { 
        observer.OnNext(queue.Dequeue()); 
       } 

       if (error != null) 
        observer.OnError(error); 
       else if (stopped) 
        observer.OnCompleted(); 

       this.observer = observer; 
       return Disposable.Create(() => 
              { 
               lock (gate) 
               { 
                      // Go back to buffering 
                this.observer = null; 
               } 
              }); 
      } 
     } 

     private bool IsBuffering 
     { 
      get { return observer == null; } 
     } 


     public void Dispose() 
     { 
      lock (gate) 
      { 
       subscription.Dispose(); 

       isDisposed = true; 
       subscription = null; 
       observer = null; 
      } 
     } 
    } 
} 

其中通过这些测试(我没有打扰检查线程安全尚):

private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world"); 

[Test] 
public void ReplaysBufferedValuesToFirstSubscriber() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    Assert.That(observed, Is.EquivalentTo(new []{1,2})); 
} 

[Test] 
public void PassesNewValuesToObserver() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 
    buffer.Subscribe(Observer.Create<int>(observed.Add)); 

    underlying.OnNext(1); 
    underlying.OnNext(2); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 })); 
} 


[Test] 
public void DisposesOfSubscriptions() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    var observed = new List<int>(); 

    buffer.Subscribe(Observer.Create<int>(observed.Add)) 
     .Dispose(); 

    underlying.OnNext(1); 

    Assert.That(observed, Is.Empty); 
} 

[Test] 
public void StartsBufferingAgainWhenSubscriptionIsDisposed() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    // These should be buffered 
    underlying.OnNext(1); 
    underlying.OnNext(2); 

    var firstSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add))) 
    { 
     // Should be passed through to first subscription 
     underlying.OnNext(3); 
    } 
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 })); 

    // First subscription has been disposed- 
    // we should be back to buffering again 
    underlying.OnNext(4); 
    underlying.OnNext(5); 

    var secondSubscriptionObserved = new List<int>(); 
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add))) 
    { 
     // Should be passed through to second subscription 
     underlying.OnNext(6); 
    } 
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6})); 
} 

[Test] 
public void DoesNotSupportTwoConcurrentObservers() 
{ 
    // Use .Publish() if you need to do this 

    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    buffer.Subscribe(Observer.Create<int>(i => { })); 

    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void CannotBeUsedAfterDisposal() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { }))); 
} 

[Test] 
public void ReplaysBufferedError() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    var observed = new List<int>(); 
    Exception foundException = null; 
    buffer.Subscribe(
     observed.Add, 
     e => foundException = e); 

    Assert.That(observed, Is.EquivalentTo(new []{1})); 
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletion() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    var observed = new List<int>(); 
    var completed = false; 
    buffer.Subscribe(
     observed.Add, 
     () => completed=true); 

    Assert.That(observed, Is.EquivalentTo(new[] { 1 })); 
    Assert.True(completed); 
} 

[Test] 
public void ReplaysBufferedErrorToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnError(exceptionThrownFromUnderlying); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ; 

    var observered = new List<int>(); 
    Exception exceptionEncountered = null; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e))); 

    Assert.That(observered, Is.Empty); 
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying)); 
} 

[Test] 
public void ReplaysBufferedCompletionToSubsequentObservers() 
{ 
    var underlying = new Subject<int>(); 
    var buffer = Buffer<int>.StartBuffering(underlying); 

    underlying.OnNext(1); 
    underlying.OnCompleted(); 

    // Drain value queue 
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ; 

    var observered = new List<int>(); 
    var completed = false; 
    using (buffer.Subscribe(Observer.Create<int>(observered.Add,()=>completed=true))); 

    Assert.That(observered, Is.Empty); 
    Assert.True(completed); 
} 



[Test] 
public void DisposingOfBufferDisposesUnderlyingSubscription() 
{ 
    var underlyingSubscriptionWasDisposed = false; 
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed= true )); 

    var buffer = Buffer<int>.StartBuffering(underlying); 
    buffer.Dispose(); 

    Assert.True(underlyingSubscriptionWasDisposed); 
} 

如果你的历史和实时数据都是时间或调度为基础的,也就是说,事件流看起来像这样随着时间的推移:

|----------------------------------------------------> time 
    h h h h h h         historical 
       l l l l l l      live 

您可以使用一个简单的TakeUntil结构:

var historicalStream = <fetch historical data>; 
var liveStream = <fetch live data>; 

var mergedWithoutOverlap = 
    // pull from historical 
    historicalStream 
     // until we start overlapping with live 
     .TakeUntil(liveStream) 
     // then continue with live data 
     .Concat(liveStream); 

如果你让所有的历史数据全部一次,像IEnumerable<T>,你可以使用StartWith搭配您的其他逻辑:

var historicalData = <get IEnumerable of tick data>; 
var liveData = <get IObservable of tick data>; 

var mergedWithOverlap = 
    // the observable is the "long running" feed 
    liveData 
    // But we'll inject the historical data in front of it 
    .StartWith(historicalData) 
    // Perform filtering based on your needs 
    .Where(....); 

如何像:

public static IObservable<T> CombineWithHistory<T, TSelectorResult>(this IObservable<T> live, IObservable<T> history, Func<T, TSelectorResult> selector) 
{ 
    var replaySubject = new ReplaySubject<T>(); 
    live.Subscribe(replaySubject); 
    return history.Concat(replaySubject).Distinct(selector); 
} 

它使用的序列标识和独特的过滤重复。

以及相应的测试:

var testScheduler = new TestScheduler(); 

var history = testScheduler.CreateColdObservable(
    OnNext(1L, new PriceTick { PriceId = 1 }), 
    OnNext(2L, new PriceTick { PriceId = 2 }), 
    OnNext(3L, new PriceTick { PriceId = 3 }), 
    OnNext(4L, new PriceTick { PriceId = 4 }), 
    OnCompleted(new PriceTick(), 5L)); 

var live = testScheduler.CreateHotObservable(
    OnNext(1L, new PriceTick { PriceId = 3 }), 
    OnNext(2L, new PriceTick { PriceId = 4 }), 
    OnNext(3L, new PriceTick { PriceId = 5 }), 
    OnNext(4L, new PriceTick { PriceId = 6 }), 
    OnNext(5L, new PriceTick { PriceId = 7 }), 
    OnNext(6L, new PriceTick { PriceId = 8 }), 
    OnNext(7L, new PriceTick { PriceId = 9 }) 
    ); 


live.Subscribe(pt => Console.WriteLine("Live {0}", pt.PriceId)); 
history.Subscribe(pt => Console.WriteLine("Hist {0}", pt.PriceId),() => Console.WriteLine("C")); 

var combined = live.CombineWithHistory(history, t => t.PriceId); 

combined.Subscribe(pt => Console.WriteLine("Combined {0}", pt.PriceId)); 

testScheduler.AdvanceTo(6L); 

如果执行这个测试中,结合放出的价格与IDS 1蜱至8

+0

感谢戴夫,这给我带来了一些很好的调度器新技巧。 在我们的例子中使用Distinct的问题是: 1)因为数据以一个或多个刻度而不是单个值的块形式返回,所以在调用选择器之前,我们必须先选择SelectMany。鉴于我们大量的数据和性能要求,这是不可能的。 2)为了节省内存,我们的刻度具有时间戳精度为1秒。我们可能会在同一个秒内有几个滴答声,所以不可能在没有状态的情况下编写语义正确的选择器函数。 – 2013-04-24 18:18:36

+0

我实际上改进了这个答案,因为我自己做了它,但忘了发布代码。最后,我基本上排队了蜱,然后在历史完成时清除它们。你需要有一些序列号来确保你不会丢失任何数据。 – 2013-04-24 21:52:27

+0

此外,我给出的代码不符合您的要求,因为重播主题将保存整个历史记录。 – 2013-04-24 21:53:43

内存和交易重叠(正确性)方便的方式。
等待您的反馈:

var tradeIds = new HashSet<string>(); 
var replayQuotationTrades = new ReplaySubject<IntradayTrade>(); 
var replaySubscription = _quotationTrades.Subscribe(replayQuotationTrades); 
return _historyTrades 
       .DelaySubscription(TimeSpan.FromMilliseconds(500), _backgroundScheduler) 
       .Do(t => tradeIds.Add(t.TradeId)) 
       .Finally(() => DisposeAndCompleteReplayStream(replaySubscription, replayQuotationTrades)) 
       .Concat(replayQuotationTrades.Where(t => !tradeIds.Contains(t.TradeId))) 
       .Finally(tradeIds.Clear) 
       .Concat(_quotationTrades) 
       .Subscribe(observer);