在一段时间内事件没有跟随不同事件的反应式订阅

问题描述:

我想弄清楚如何为特定时间窗口内不跟随不同事件的事件创建反应式订阅。在一段时间内事件没有跟随不同事件的反应式订阅

为了说明,这里有一个用例:

由两个事件触发的忙闲指示。忙和NotBusy。它们可能会一起靠近,但指示灯不应该频繁闪烁。

当NotBusy启动时,应该没有指示符。当遇忙并且NotBusy在5秒钟内没有开火时,它应该显示。

有没有一种方法可以完全在Reactive内完成而无需添加外部状态?

+0

你的问题[有书呆子sniped](http://xkcd.com/356/)Rx的创造者回应他的回答:https://gist.github.com/mattpodwysocki/8d9a17df9fb94a3fb631(编辑。从[Paul Betts](http://stackoverflow.com/users/5728/paul-betts)删除的答案复制) – Benjol 2013-02-13 05:51:24

啊,我无法抗拒 - 多久你会反驳原始资料的作者? (见问题Benjol的评论) :)

这里是我的刺它(LINQPad就绪):

输出看起来是这样的:

At 1/1/0001 12:00:00 AM +00:00, Busy Signal Indicator is now:OFF 
Sending BUSY at 1/1/0001 12:00:00 AM +00:00 
Sending NOTBUSY at 1/1/0001 12:00:02 AM +00:00 

Sending BUSY at 1/1/0001 12:00:03 AM +00:00 
Sending BUSY at 1/1/0001 12:00:06 AM +00:00 
At 1/1/0001 12:00:08 AM +00:00, Busy Signal Indicator is now:ON 
Sending NOTBUSY at 1/1/0001 12:00:09 AM +00:00 
At 1/1/0001 12:00:09 AM +00:00, Busy Signal Indicator is now:OFF 

这里我们的活动的一个基本定义:

enum EventType 
{ 
    Busy, 
    NotBusy 
} 
class StreamEvent 
{ 
    public EventType Type {get; set;} 
    public StreamEvent(EventType type) { Type = type;} 
} 

而这里的查询+测试代码:

void Main() 
{ 
    // our simulated event stream 
    var fakeSource = new System.Reactive.Subjects.Subject<StreamEvent>(); 

    // Let's use a scheduler we actually don't have to wait for 
    var theTardis = new System.Reactive.Concurrency.HistoricalScheduler(); 

    var busySignal = fakeSource 
     // Batch up events: 
     .Window(
      // Starting batching on a busy signal 
      fakeSource.Where(e => e.Type == EventType.Busy), 
      // Stop batching on a not busy signal 
      (open) => fakeSource.Where(e => e.Type == EventType.NotBusy) 
       // but throw a timeout if we exceed 5 seconds per window 
       .Timeout(TimeSpan.FromSeconds(5), theTardis))  
     // Unpack the windows 
     .Switch() 
     // Catch any timeout exception and inject a NULL into the stream   
     .Catch(fakeSource.StartWith((StreamEvent)null)) 
     // Bool-ify on "did a timeout happen?" 
     .Select(e => e == null) 
     // Start in an "unbusy" state 
     .StartWith(false) 
     // And only tell us about transitions 
     .DistinctUntilChanged();  

    using(busySignal.Subscribe(signal => 
     Console.WriteLine("At {0}, Busy Signal Indicator is now:{1}", 
      theTardis.Now, 
      signal ? "ON" : "OFF"))) 
    { 
     // should not generate a busy signal 
     Console.WriteLine("Sending BUSY at {0}", theTardis.Now); 
     fakeSource.OnNext(new StreamEvent(EventType.Busy)); 
     theTardis.AdvanceBy(TimeSpan.FromSeconds(2)); 
     Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now); 
     fakeSource.OnNext(new StreamEvent(EventType.NotBusy)); 
     theTardis.AdvanceBy(TimeSpan.FromSeconds(1)); 
     Console.WriteLine(); 

     // should generate a busy signal 
     Console.WriteLine("Sending BUSY at {0}", theTardis.Now); 
     fakeSource.OnNext(new StreamEvent(EventType.Busy)); 
     theTardis.AdvanceBy(TimeSpan.FromSeconds(3)); 
     Console.WriteLine("Sending BUSY at {0}", theTardis.Now); 
     fakeSource.OnNext(new StreamEvent(EventType.Busy)); 
     theTardis.AdvanceBy(TimeSpan.FromSeconds(3)); 

     // and this should clear it 
     Console.WriteLine("Sending NOTBUSY at {0}", theTardis.Now); 
     fakeSource.OnNext(new StreamEvent(EventType.NotBusy));  
     theTardis.AdvanceBy(TimeSpan.FromSeconds(1)); 
     Console.WriteLine();  
    } 
} 
+0

@JayWalker您需要添加Rx程序集(您可以使用最新的Nuget LINQPad),并添加这些使用:System.Reactive.Subjects System.Reactive.Concurrency – JerKimball 2013-02-10 01:29:29