等待所有观察到的与rxjs
我有以下代码:等待所有观察到的与rxjs
this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => {
let hangars: IHangar[] = result.arguments[0];
for (let hangar of hangars) {
this.pieceService.getGroupedPieces(hangar.pieces).subscribe(group => hangar.groupedPieces = group);
}
this.hangars$.next(hangars);
}, (ex: any) => this.hangars$.error(ex));
所以基本上,sendScopeCommand
通过的WebSocket发送的东西,当WebSocket的接收到响应执行then
功能。在这一点上,我得到了一个对象的数组,我把它放在hangars
。
在这些对象中,我有一个玩家拥有的所有棋子的数组。可以有多个相同的片类型,所以我做了一个函数来将它们分组:getGroupedPieces
。它的代码如下:
public getGroupedPieces(pieces: IPiece[]): Observable<IGroupedPiece[]> {
return Observable
.from(pieces)
.groupBy(p => p.pieceTypeId)
.flatMap(p => p.toArray())
.map(p => { return <IGroupedPiece>{ amount: p.length, piece: p[0] }; })
.toArray();
}
此代码的工作原理,但我敢肯定,这是不正确的。事实上,我认为hangars
甚至在for
循环中的可观测值完成之前在可观测值上发射。
我想在这里等待所有这些observable完成,然后在Observable上发射hangars
。
我想你可以尝试使用RxJS的forkJoin
操作:
this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => {
let hangars: IHangar[] = result.arguments[0];
let hangars$$: Observable<IHangar>[] = hangars.map(hangar => {
return this.pieceService.getGroupedPieces(hangar.pieces)
})
Observable
.forkJoin(...hangars$$)
.subscribe(groups => {
groups.forEach((group, i) => hangars[i].groupedPieces = group)
this.hangars$.next(hangars);
})
}, (ex: any) => this.hangars$.error(ex));
太棒了,谢谢 – ssougnez
就我个人而言,我尝试尽可能少地调用订阅,特别是避免可能的订阅嵌套。
我把一个快速的角度组件放在一起给我如何处理它的样本。
对不起,如果它的原油或拼写错误(断锁骨只用一只手来使用)。
import { Component, OnInit } from '@angular/core';
import { Observable, Subject } from 'rxjs';
interface hangar {
pieces: number[];
groupedPieces: number[];
}
@Component({
selector: 'app-root',
templateUrl: './app.component.html',
styleUrls: ['./app.component.css']
})
export class AppComponent {
title = 'app works!';
//output mock
hangars$: Subject<hangar[]> = new Subject<hangar[]>();
ngOnInit() {
//proof of functionality
this.hangars$.subscribe(h => console.log(h));
}
//method to mock the then call from example
start() {
//mock some data
let hangars: hangar[] = [{ pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }];
//subject to handle observable clean up
let subManagement$: Subject<any> = new Subject<any>();
let obsArr: Observable<number[]>[] = [];
//here were going to build an array the observables but not subscribe to them yet
hangars.forEach(hangar =>
obsArr.push(
this.getGroupedPieces(hangar.pieces)
.takeUntil(subManagement$)
.do(group => hangar.groupedPieces = group)
)
);
//real magic, this waits for all of the observables responses before emitting its value
Observable.combineLatest(obsArr).subscribe(
() => this.hangars$.next(hangars),
null,
() => subManagement$.next()//cleanup
);
}
//mock out your service
private getGroupedPieces(pcs: number[]): Observable<number[]> {
return Observable.of([1, 2, 3, 4]).delay(1000);
}
}
如果您将Observable更改为承诺,则可以使用'Promise.all'来等待所有承诺响应并继续。 – Dekonunes
为什么不发射订阅被调用时的事件? –