Rx.Net:处理订阅时执行异步副作用

Rx.Net:处理订阅时执行异步副作用

问题描述:

我有一个通过串口与计算机通信的设备。发送一个“START”命令后,设备回应一个确认,并开始监控一些外部活动。然后它根据外部活动异步传输串行端口上的一些消息。当设备收到“停止”命令时,它会回应确认,然后停止发送更多消息(表示外部活动)。Rx.Net:处理订阅时执行异步副作用

我已经实现了带有冷观察功能的启动/停止命令,这些命令执行副作用(串口发送命令),并且如果在串口上收到一个ACKckowledge,则会发出一个单一的Unit.Default。我想构建一个IObservable,它发出与外部活动相关的消息,并在订阅时执行“START”副作用,并且在订阅处理时执行“STOP”副作用。 “START”很简单,我只需要做一个“SelectMany”,但我不知道如何执行“STOP”。

class MonitoringDevice 
{ 
    private SerialPort _sp; 
    private IObservable<byte> _receivedBytes; 

    public IObservable<ExternalActivity> ActivityStream { get; } 

    public MonitoringDevice() 
    { 
     _sp = new SerialPort("COM1"); 
     _receivedBytes = Observable 
         .FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
          h => 
          { 
           _sp.DiscardInBuffer(); 
           _sp.DataReceived += h; 
          }, 
          h => 
          { 
           _sp.DataReceived -= h; 
          }) 
         .SelectMany(x => 
         { 

          byte[] buffer = new byte[1024]; 
          var ret = new List<byte>(); 
          int bytesRead = 0; 
          do 
          { 
           bytesRead = _sp.Read(buffer, 0, buffer.Length); 
           ret.AddRange(buffer.Take(bytesRead)); 
          } while ((bytesRead >= buffer.Length)); 
          return ret; 

         }) 
         .Publish() 
         .RefCount(); 


     ActivityStream = StartMonitoringAsync() 
         .SelectMany(_receivedBytes.ToActivity()); 
         // we need to execute StopMonitoringAsync 
         // when a subscription to ActivityStream is disposed 

     _sp.Open(); 
    } 



    private IObservable<Unit> StartMonitoringAsync() 
    { 
     return Observable 
       .Create<Unit>(
       obs => 
       { 
        _sp.Write("START"); 
        return _receivedBytes 
          .ToAcknowledge() 
          .FirstAsync() 
          .Timeout(TimeSpan.FromMilliseconds(1000)) 
          .Subscribe(obs); 
       }); 
    } 


    private IObservable<Unit> StopMonitoringAsync() 
    { 
     return Observable 
       .Create<Unit>(
       obs => 
       { 
        _sp.Write("STOP"); 
        return _receivedBytes 
          .ToAcknowledge() 
          .FirstAsync() 
          .Timeout(TimeSpan.FromMilliseconds(1000)) 
          .Subscribe(obs); 
       }); 
    } 


} 

ExternalActivity只是一个POCO。

ToAcknowledge是一种扩展方法,返回IObservable,当设备传输确认时发出Unit.Default。 - 这是按预期工作的;

ToActivity是一种扩展方法,返回IObservable,它解析输入的串行数据并发出ExternalActivity对象。 - 这是按预期工作的;


编辑:增加了对ToAcknowledgeToActivity扩展方法实现。

public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source) 
    { 
     return source.Buffer(3, 1) 
       .Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK 
       .Select(x => Unit.Default); 

    } 

    public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source) 
    { 
     return source 
       .Publish(ps => ps.Buffer(ps.Where(x => x == 1),    // SOH 
              bo => ps.Where(x => x == 4))) // EOT 
       .Select(bfr => bfr.Take(bfr.Count - 1).Skip(1)) 
       .Where(bfr => bfr.Count() == 12) 
       .Select(bfr => 
       { 
        var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0); 
        var id = BitConverter.ToInt32(bfr.ToArray(), 8); 
        return new ExternalActivity(timestamp, id); 
       });   
    } 
+0

请问您可以显示'ToAcknowledge'和'ToActivity'的代码吗?没有他们,我无法给你一个答案。 – Enigmativity

+0

@Enigmativity - 看我编辑的问题。 – francezu13k50

+1

嗯,我不认为在这种情况下,停止和处置应该被合并到相同的事情。我认为你想把Stop模型化为一个命令。并因此认为它可以成功,失败,超时等。处置就是纯粹的,断开和摧毁订阅。 –

您可以通过修改StartAsync是这样的:

private IObservable<Unit> StartAsync(Action unsubscribe) 
{ 
    return 
     Observable 
      .Create<Unit>(o => 
      { 
       var subscription = 
        Observable 
         .Timer(TimeSpan.FromSeconds(1)) 
         .Select(_=> Unit.Default) 
         .Subscribe(o); 
       return new CompositeDisposable(
        subscription, 
        Disposable.Create(unsubscribe)); 
      });; 
} 

然后你就可以在你喜欢的任何Action unsubscribe注入。与此代码

尝试测试:

var subscription = 
    StartAsync(() => Console.WriteLine("Done")) 
    .Subscribe(); 

Thread.Sleep(3000); 

subscription.Dispose(); 

你会看到“完成”后3秒写入控制台。

+0

谢谢。订阅处理所采取的行动是异步行为。它是作为一个冷观察实现的,它执行副作用(通过串行端口发送命令)并在响应到达时发出单个值。在我的代码示例中,它是用'Observable.Timer'模拟的。我需要等待主要订阅处置中的这个可观察的内容。 – francezu13k50

+0

@ francezu13k50 - 你不能在'Action'中放置一个observable吗?但是,依靠计时器来完成异步操作并不是一个好主意。 Rx非常擅长处理这些情况。你能显示你真实的代码吗? – Enigmativity

+0

我不知道如何在'Action'中放置一个observable。 “Observable.Timer”仅用于模拟器件的异步响应,实际上是一个“STOP”命令;我并不是依靠计时器来完成操作,它实际上是确认操作成功的设备。 – francezu13k50