如何等待可变数组中的所有可观察元素在RxSwift中完成

问题描述:

我的期望是即时添加可观察对象(例如:图片上传),让它们开始,并且当我完成动态排队时,等待所有可观察到的完成。如何等待可变数组中的所有可观察元素在RxSwift中完成

这里是我的类:

open class InstantObservables<T> { 
    lazy var disposeBag = DisposeBag() 

    public init() { } 

    lazy var observables: [Observable<T>] = [] 
    lazy var disposables: [Disposable] = [] 

    open func enqueue(observable: Observable<T>) { 
     observables.append(observable) 

     let disposable = observable 
      .subscribe() 

     disposables.append(disposable) 

     disposable 
      .addDisposableTo(disposeBag) 
    } 

    open func removeAndStop(atIndex index: Int) { 
     guard observables.indices.contains(index) 
      && disposables.indices.contains(index) else { 
      return 
     } 
     let disposable = disposables.remove(at: index) 
     disposable.dispose() 

     _ = observables.remove(at: index) 
    } 

    open func waitForAllObservablesToBeFinished() -> Observable<[T]> { 
     let multipleObservable = Observable.zip(observables) 
     observables.removeAll() 
     disposables.removeAll() 
     return multipleObservable 
    } 

    open func cancelObservables() { 
     disposeBag = DisposeBag() 
    } 
} 

但当我订阅观察到的由waitForAllObservablesToBeFinished()发送,所有的人都重新执行(这是逻辑,有关的Rx是如何工作的)。

我该如何保证每一次执行一次,无论订阅的数量是多少?

在写这个问题时,我得到了答案! 通过shareReplay(1)改变可见性,并排队和订阅这个改变的observable ..它的作品!

下面是更新后的代码:

open class InstantObservables<T> { 
    lazy var disposeBag = DisposeBag() 

    public init() { } 

    lazy var observables: [Observable<T>] = [] 
    lazy var disposables: [Disposable] = [] 

    open func enqueue(observable: Observable<T>) { 
     let shared = observable.shareReplay(1) 
     observables.append(shared) 

     let disposable = shared 
      .subscribe() 

     disposables.append(disposable) 

     disposable 
      .addDisposableTo(disposeBag) 
    } 

    open func removeAndStop(atIndex index: Int) { 
     guard observables.indices.contains(index) 
      && disposables.indices.contains(index) else { 
      return 
     } 
     let disposable = disposables.remove(at: index) 
     disposable.dispose() 

     _ = observables.remove(at: index) 
    } 

    open func waitForAllObservablesToBeFinished() -> Observable<[T]> { 
     let multipleObservable = Observable.zip(observables) 
     observables.removeAll() 
     disposables.removeAll() 
     return multipleObservable 
    } 

    open func cancelObservables() { 
     disposeBag = DisposeBag() 
    } 
}