concatMap/flatMap应该立即在同一个调度程序上运行

问题描述:

给定一个Service对象,我想确保每个对该服务的函数调用都不会产生副作用。在我的情况下,无论函数A在做什么,除非调度器可用,否则什么都不会在函数B中执行。concatMap/flatMap应该立即在同一个调度程序上运行

这里是这个样子:

class Service { 

    func handleJobA(input: String) -> Observable<String> { 
     return Observable.just(input) 
      .do(onNext: { (str) in 
       print ("Job A: \(str)") 
      }) 
      .concatMap { input -> Observable<String> in 
       return Observable.just("Job AA: \(input)") 
        .delay(2, scheduler: self.scheduler) 
        .do(onNext: { (str) in 
         print (str) 
        }) 
      } 

      .subscribeOn(scheduler) 
    } 

    func handleJobB(input: String) -> Observable<String> { 
     return Observable.just(input) 
      .do(onNext: { (str) in 
       print ("Job B: \(str)") 
      }) 
      .delay(1, scheduler: scheduler) 
      .concatMap { input -> Observable<String> in 
       return Observable.just("Job BB: \(input)") 
        .do(onNext: { (str) in 
         print (str) 
        }) 
      } 

      .subscribeOn(scheduler) 
    } 


    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service") 
} 


let service = Service() 

_ = Observable.from(["1","2","3"]) 
    .concatMap { service.handleJobA(input: $0) } 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

_ = Observable.from(["1","2","3"]) 
    .concatMap { service.handleJobB(input: $0) } 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

import PlaygroundSupport 

PlaygroundPage.current.needsIndefiniteExecution = true 

目前,输出为:

Job A: 1 
Job B: 1 
Job BB: 1 
Job BB: 1 √ 
Job B: 2 
Job AA: 1 
Job AA: 1 √ 
Job A: 2 
Job BB: 2 
Job BB: 2 √ 
Job B: 3 
Job BB: 3 
Job BB: 3 √ 
Job AA: 2 
Job AA: 2 √ 
Job A: 3 
Job AA: 3 
Job AA: 3 √ 

然而,这说明根本问题。内部延迟(可能发生在任何事情上,实际上......网络,处理)都会导致可观察的处理失去“顺序”。

我想是这样的:

Job A: 1 
Job AA: 1 
Job AA: 1 √ 
Job B: 1 
Job BB: 1 
Job BB: 1 √ 
Job B: 2 
Job BB: 2 
Job BB: 2 √ 
Job B: 3 
Job BB: 3 
Job BB: 3 √ 
Job A: 2 
Job AA: 2 
Job AA: 2 √ 
Job A: 3 
Job AA: 3 
Job AA: 3 √ 

这意味着,一旦功能已开始处理任务,没有其他人获得的访问,除非它完成。

我以前收到很好的answer。它并不完全适用,因为flatMap/concatMap(?)似乎都不喜欢调度程序。

我的理论是,concatMap调用确实做了正确的工作,但是然后将子序列省略调度到调度器队列的末尾,而我希望它在前面,接下来要处理。

我无法解释调度行为,但我可以做一个小建议

...一旦功能已开始处理任务,除非是做旁若无人得到的 访问。 ..

您可以通过concatMap通过所有handleJob电话来获得您所需要的行为:

Observable 
    .from([1,2,3,4,5,6]) 
    .flatMap({ (value) -> Observable<String> in 
     switch value % 2 == 0 { 
     case true: 
      return service.handleJobA(input: "\(value)") 
     case false: 
      return service.handleJobB(input: "\(value)") 
     } 
    }) 
    .subscribe(onNext:{ 
     print($0 + " √") 
    }) 

服务等级示例:

private class Service { 

    private lazy var result = PublishSubject<(index: Int, result: String)>() 
    private lazy var publish = PublishSubject<(index: Int, input: String, transformation: (String) -> String)>() 
    private lazy var index: Int = 0 
    private lazy var disposeBag = DisposeBag() 

    init() { 
     publish 
      .asObservable() 
      .concatMap({ (index, input, transformation) -> Observable<(index: Int, result: String)> in 
       let dueTime = RxTimeInterval(arc4random_uniform(3) + 1) 
       return Observable 
        .just((index: index, result: transformation(input))) 
        .delay(dueTime, scheduler: self.scheduler) 
      }) 
      .bind(to: result) 
      .disposed(by: disposeBag) 
    } 

    func handleJobA(input: String) -> Observable<String> { 
     let transformation: (String) -> String = { string in 
      return "Job A: \(string)" 
     } 
     return handleJob(input: input, transformation: transformation) 
    } 

    func handleJobB(input: String) -> Observable<String> { 
     let transformation: (String) -> String = { string in 
      return "Job B: \(string)" 
     } 
     return handleJob(input: input, transformation: transformation) 
    } 

    func handleJob(input: String, transformation: @escaping (String) -> String) -> Observable<String> { 
     index += 1 
     defer { 
      publish.onNext((index, input, transformation)) 
     } 
     return result 
      .filter({ [expected = index] (index, result) -> Bool in 
       return expected == index 
      }) 
      .map({ $0.result }) 
      .take(1) 
      .shareReplayLatestWhileConnected() 
    } 

    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service") 
}