第一步与反应式扩展蹒跚步骤
问题描述:
我正在努力与我的第一个简单的“hello world”RX应用程序。我正在使用VS2010 RC,以及最新的RX下载。第一步与反应式扩展蹒跚步骤
以下是简单的控制台应用程序;
class Program
{
static void Main(string[] args)
{
var channel = new MessageChannel()
.Where(m => m.process)
.Subscribe((MyMessage m) => Console.WriteLine(m.subject));
//channel.GenerateMsgs();
}
}
public class MyMessage
{
public string subject;
public bool process;
}
public class MessageChannel: IObservable<MyMessage>
{
List<IObserver<MyMessage>> observers = new List<IObserver<MyMessage>>();
public IDisposable Subscribe(IObserver<MyMessage> observer)
{
observers.Add(observer);
return observer as IDisposable;
}
public void GenerateMsgs()
{
foreach (IObserver<MyMessage> observer in observers)
{
observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
}
}
}
我在Where子句中得到一个ArgumentNullException。这是堆栈;
System.ArgumentNullException was unhandled
Message=Value cannot be null.
Parameter name: disposable
Source=System.Reactive
ParamName=disposable
StackTrace:
at System.Collections.Generic.AnonymousObservable`1.Disposable.Set(IDisposable disposable)
at System.Collections.Generic.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
at System.Threading.Scheduler.NowScheduler.Schedule(Action action)
at System.Collections.Generic.AnonymousObservable`1.Subscribe(IObserver`1 observer)
at ConsoleApplication1.Program.Main(String[] args) in C:\Users\Jason\documents\visual studio 2010\Projects\ConsoleApplication1\ConsoleApplication1\Program.cs:line 18
at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args)
at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()
InnerException:
答
此行似乎引起大惊小怪:
return observer as IDisposable;
你不应该承担的观察者是一次性的,你应该回到它知道“退订”的一次性对象。
该方法返回对IDSposable接口的引用。这使得 观察者能够在提供者完成之前取消订阅(即, 停止接收通知) 发送它们并且呼叫 订户的OnCompleted方法。
你可以把它用做类似工作:
public class MessageChannel: IObservable<MyMessage>
{
class Subscription : IDisposable {
MessageChannel _c;
IObservable<MyMessage> _obs;
public Subscription(MessageChannel c, IObservable<MyMessage> obs) {
_c = c; _obs = obs;
}
public void Dispose() {
_c.Unsubscribe(_obs);
}
}
public IDisposable Subscribe(IObserver<MyMessage> observer)
{
observers.Add(observer);
return new Subscription(this, observer);
}
void Unsubscribe(IObservable<MyMessage> obs) {
observers.Remove(obs);
}
}
答
!!红旗!
我强烈建议您自己不要执行IObserver<T>
或IObservable<T>
。赞成使用Observable.Create<T>
或作为最后的手段使用Subject
类型。为了正确实现这些接口,需要考虑很多事情,这些接口由正确的Rx类型和运算符为您处理。
在这个例子中,我会劝你放弃的MessageChannel类型和交换它
class Program
{
static void Main(string[] args)
{
var channel = GenerateMsgs()
.Where(m => m.process)
.Subscribe((MyMessage m) => Console.WriteLine(m.subject));
}
public IObservable<MyMessage> GenerateMsgs()
{
return Observable.Create<MyMessage>(observer=>
{
observer.OnNext(new MyMessage() {subject = "Hello!", process = true});
});
}
}
public class MyMessage
{
public string subject;
public bool process;
}
在系统设计的进一步检查,你可能有某种服务的一个公开的“通道”作为观察序列。
public interface OrderService
{
IObservable<OrderRequest> OrderRequests();
IObservable<Order> ProcessedOrders();
IObservable<OrderRejection> OrdersRejections();
}
从而否定为IObserver<T>
或IObservable<T>
这些自定义实现的需要。
非常感谢弗兰克,那打在头上。只需要改变Subcribe类来使用IObserver而不是IObservable(我不抱怨!)。我使用了IObserverable提供的MSDN示例(http://msdn.microsoft.com/en-us/library/dd990377(VS.100).aspx) – 2010-02-13 19:33:33
您也可以使用'Disposable.Create(()=> Unsubscribe(观察者)) - 'System.Reactive.Disposables'命名空间有许多有用的IDisposable实现 – AlexFoxGill 2013-03-28 09:19:12