RxJS可观察的合并角
问题描述:
我有一个订阅消息流的沙箱,我想过滤该流以查找已使用另一个组件中指定的路由参数发送或接收来自特定用户的消息。RxJS可观察的合并角
messages.sandbox.ts:
messages$: Observable<Array<Message>> = this.store.select(state => state.data.messages);
fetchReceived(id: string): Observable<Array<Message>> {
return this.messages$.map((messages: any) => {
return messages.filter((message: Message) => {
return message.recipientId == id;
});
});
}
fetchSent(id: string): Observable<Array<Message>> {
return this.messages$.map((messages: any) => {
return messages.filter((message: Message) => {
return message.userId == id;
})
})
}
messages.detail.container.ts
sentMessages$ = new Observable<Array<Message>>();
receivedMessages$ = new Observable<Array<Message>>();
matchingMessages$ = new Observable<Array<Message>>();
ngOnInit() {
this.route.params.subscribe((params: Params) => {
this.sentMessages$ = this.sb.fetchReceived(params['id']);
this.receivedMessages$ = this.sb.fetchSent(params['id']);
this.matchingMessages$ = Observable.merge(this.sentMessages$, this.receivedMessages$);
});
}
this.matchingMessages $似乎只包括this.receivedMessages $但是我知道this.sentMessages $不是null,因为我可以在模板中使用它而不会出现问题。
我错过了什么与合并Observables?创建一个单独的fetchMessages方法会更好吗?它可以筛选出等于路径参数ID的userId或recipientId?如果是的话,我会怎么做呢?
谢谢!
答
您有正确的概念。只是一些缺陷。
- 千万不要使用
new Observable<T>()
。它不会做你认为它做的事。它几乎没有做任何有用的事情。总是从工厂方法或其他可观察对象构造可观察对象 - 您需要使用运算符将可观察对象变换为新的可观察对象。你的问题是你订阅可观测的参数,然后每次构建新的observables。但其他代码已经订阅了最初的可观察对象,因此他们将永远不会看到这些更改。
所以,你想要做这样的事情:
sentMessages$ : Observable<Array<Message>>;
receivedMessages$ : Observable<Array<Message>>;
matchingMessages$ : Observable<Array<Message>>;
ngOnInit() {
const params$ = this.route.params;
// use switchMap to map the new params to a new sent observable
// each time params change, unsubscribe from the old fetch and subscribe
// to the new fetch. Anyone subscribed to "sentMessages" will see the
// change transparently
this.sentMessages$ = params$.switchMap((params: Params) => this.sb.fetchReceived(params['id']));
// same for received
this.receivedMessages$ = params$.switchMap((params: Params) => this.sb.fetchSent(params['id'])));
// merge the 2 streams together
this.matchingMessages$ = Observable.merge(this.sentMessages$, this.receivedMessages$);
}
编辑:
回答您的其他问题:到底要建立一个满足发送者和接收者一个单一可观察:取决于你的用例。但这里是你怎么去了解它:
messages.sandbox.ts:
fetchEither(id: string): Observable<Array<Message>> {
return this.messages$.map((messages: any) => {
return messages.filter((message: Message) => {
return message.recipientId == id || message.userId === id;
});
});
}
容器:
matchingMessages$ : Observable<Array<Message>>;
ngOnInit() {
const params$ = this.route.params;
// use switchMap to map the new params to a new either observable
// each time params change, unsubscribe from the old and subscribe
// to the new fetch. Anyone subscribed to "matchingMessages" will see the
// change transparently
this.matchingMessages$ = params$.switchMap((params: Params) => this.sb.fetchEither(params['id']));
}
感谢布兰登!两者的答案和解释。我是Rxjs的新手。现在我将最终使用第二种解决方案,因为它可以更好地匹配沙盒其他部分的代码模式 –