选择正确的RxJS构造以确保一个订阅在另一个订阅完成之前完成
问题描述:
我正面临RxJS问题。选择正确的RxJS构造以确保一个订阅在另一个订阅完成之前完成
我的应用正在设计如下:我有两个不同的客户端:ClientA
& ClientB
预订了两个不同的观测量:ObservableA
& ObservableB
。
请注意,该应用程序还会变动一个名为aVariable
的变量。
这里是流:
-
ClientA
订阅了ObservableA
。 -
ClientB
订阅ObservableB
。 -
ObservableB
订阅阅读false
从aVariable
并完成。 -
ObservableA
订阅集aVariable
至true
并完成(晚于ObservableB
)。
而什么是真正意图是为ObservableA
S“的订阅之前ObservableB
完成”,让ClientB
将从aVariable
读true
...或者换一种方式,一定程度上保证ObservableB
的订阅等待直到其他订阅已完成。
我不确定用什么RxJS结构来实现我想要的(我目前使用普通Observable)。我相信我在这里需要的不仅仅是简单的Observable ...
有人可以帮忙吗?
P.S. 注意aVariable
是在NGRX店举行,但我不认为这是有关这个问题...
P.P.S. 以上是我的真实应用程序的简化。
答
我认为你可以在你散发出的值时streamB被预订了中间主题解决您的问题:streamB被订阅了它只有在
const completeStreamA = new Rx.Subject();
const streamA = Rx.Observable.never()
.takeUntil(completeStreamA);
const streamB = Rx.Observable.of('aValueOnStreamB')
.do(() => completeStreamA.next('complete stream A'));
//clientA subscribes immediately
streamA.subscribe(
next => console.log('a->next->'+next),
err => console.log('a->error->' + err.message),
() => console.log('a->complete')
);
setTimeout(() => {
//simulate later subscription by clientB
streamB.subscribe(
next => console.log('b->next->'+next),
err => console.log('b->error->' + err.message),
() => console.log('b->complete')
);
}, 3 * 1000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
会接下来是将完成streamA的completeStreamA
主题的值。以上代码的输出:
a->complete
b->next->aValueOnStreamB
b->complete
通常,当您想要运行异步任务以便使用'concat'或'concatMap'。看到也许类似的问题http://stackoverflow.com/questions/39566268/angular-2-rxjs-how-return-stream-of-objects-fetched-with-several-subsequent/39578646#39578646和http:// stackoverflow。 com/questions/36713531/how-to-use-exhaustmap-in-reactivex-rxjs-5-in-typescript/39589408#39589408 – martin
Hi Martin。唯一的麻烦是那些是不同的客户。因此不可能使用concat。 – balteo
你可以给你一个代表什么样的代码样本吗?我不清楚:什么是“客户端A”?你是什么意思你完成订阅?你的意思是“普通”的“可观察”。 – paulpdaniels