Rx.Net:处理订阅时执行异步副作用
我有一个通过串口与计算机通信的设备。发送一个“START”命令后,设备回应一个确认,并开始监控一些外部活动。然后它根据外部活动异步传输串行端口上的一些消息。当设备收到“停止”命令时,它会回应确认,然后停止发送更多消息(表示外部活动)。Rx.Net:处理订阅时执行异步副作用
我已经实现了带有冷观察功能的启动/停止命令,这些命令执行副作用(串口发送命令),并且如果在串口上收到一个ACKckowledge,则会发出一个单一的Unit.Default
。我想构建一个IObservable
,它发出与外部活动相关的消息,并在订阅时执行“START”副作用,并且在订阅处理时执行“STOP”副作用。 “START”很简单,我只需要做一个“SelectMany”,但我不知道如何执行“STOP”。
class MonitoringDevice
{
private SerialPort _sp;
private IObservable<byte> _receivedBytes;
public IObservable<ExternalActivity> ActivityStream { get; }
public MonitoringDevice()
{
_sp = new SerialPort("COM1");
_receivedBytes = Observable
.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
h =>
{
_sp.DiscardInBuffer();
_sp.DataReceived += h;
},
h =>
{
_sp.DataReceived -= h;
})
.SelectMany(x =>
{
byte[] buffer = new byte[1024];
var ret = new List<byte>();
int bytesRead = 0;
do
{
bytesRead = _sp.Read(buffer, 0, buffer.Length);
ret.AddRange(buffer.Take(bytesRead));
} while ((bytesRead >= buffer.Length));
return ret;
})
.Publish()
.RefCount();
ActivityStream = StartMonitoringAsync()
.SelectMany(_receivedBytes.ToActivity());
// we need to execute StopMonitoringAsync
// when a subscription to ActivityStream is disposed
_sp.Open();
}
private IObservable<Unit> StartMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("START");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
private IObservable<Unit> StopMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("STOP");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
}
ExternalActivity
只是一个POCO。
ToAcknowledge
是一种扩展方法,返回IObservable
,当设备传输确认时发出Unit.Default
。 - 这是按预期工作的;
ToActivity
是一种扩展方法,返回IObservable
,它解析输入的串行数据并发出ExternalActivity
对象。 - 这是按预期工作的;
编辑:增加了对ToAcknowledge
和ToActivity
扩展方法实现。
public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source)
{
return source.Buffer(3, 1)
.Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK
.Select(x => Unit.Default);
}
public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source)
{
return source
.Publish(ps => ps.Buffer(ps.Where(x => x == 1), // SOH
bo => ps.Where(x => x == 4))) // EOT
.Select(bfr => bfr.Take(bfr.Count - 1).Skip(1))
.Where(bfr => bfr.Count() == 12)
.Select(bfr =>
{
var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0);
var id = BitConverter.ToInt32(bfr.ToArray(), 8);
return new ExternalActivity(timestamp, id);
});
}
您可以通过修改StartAsync
是这样的:
private IObservable<Unit> StartAsync(Action unsubscribe)
{
return
Observable
.Create<Unit>(o =>
{
var subscription =
Observable
.Timer(TimeSpan.FromSeconds(1))
.Select(_=> Unit.Default)
.Subscribe(o);
return new CompositeDisposable(
subscription,
Disposable.Create(unsubscribe));
});;
}
然后你就可以在你喜欢的任何Action unsubscribe
注入。与此代码
尝试测试:
var subscription =
StartAsync(() => Console.WriteLine("Done"))
.Subscribe();
Thread.Sleep(3000);
subscription.Dispose();
你会看到“完成”后3秒写入控制台。
谢谢。订阅处理所采取的行动是异步行为。它是作为一个冷观察实现的,它执行副作用(通过串行端口发送命令)并在响应到达时发出单个值。在我的代码示例中,它是用'Observable.Timer'模拟的。我需要等待主要订阅处置中的这个可观察的内容。 – francezu13k50
@ francezu13k50 - 你不能在'Action'中放置一个observable吗?但是,依靠计时器来完成异步操作并不是一个好主意。 Rx非常擅长处理这些情况。你能显示你真实的代码吗? – Enigmativity
我不知道如何在'Action'中放置一个observable。 “Observable.Timer”仅用于模拟器件的异步响应,实际上是一个“STOP”命令;我并不是依靠计时器来完成操作,它实际上是确认操作成功的设备。 – francezu13k50
请问您可以显示'ToAcknowledge'和'ToActivity'的代码吗?没有他们,我无法给你一个答案。 – Enigmativity
@Enigmativity - 看我编辑的问题。 – francezu13k50
嗯,我不认为在这种情况下,停止和处置应该被合并到相同的事情。我认为你想把Stop模型化为一个命令。并因此认为它可以成功,失败,超时等。处置就是纯粹的,断开和摧毁订阅。 –