使用多个observables创建可观察?

使用多个observables创建可观察?

问题描述:

我有一堆观察对象,我在一个方法中运行/订阅(大约9个确切的),它本身返回一个可观察值。为了这个问题的目的,我把它缩小到2个观察值。这是我创建的方法,它返回一个包含其他可观察对象的Observable。使用多个observables创建可观察?

public static Observable<MyCustomObject> runAll(Service networkService) { 
     return Observable.create(subscriber -> { 

      networkService.getOne().subscribe(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       subscriber.onNext(case); 
      }, exception -> { 
       throw new OnErrorNotImplementedException(exception); 
      }); 


      networkService.getTwo().subscribe(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       subscriber.onNext(case); 
      }, exception -> { 
       throw new OnErrorNotImplementedException(exception); 
      }); 

      subscriber.onComplete(); 
     }); 
    } 

我然后使用可观察到的是所返回......

 runAll(networkService) 
.subscribeOn(Schedulers.io()) 
.subscribe(case -> { 
      //do stuff 

     }, new Consumer<Throwable>() { 
      @Override 
      public void accept(Throwable throwable) throws Exception { 
       //handle error 
      } 
     }); 

我不知道如果我正确地创建一个可观察的。我基本上用Observable替换了我之前在这里的一个监听器/接口,但是我知道使用Observable.create()是不容易的。我应该以另一种方式去做吗?做我在做什么有什么不对吗?

编辑:getOne()和getTwo()是网络调用,所以他们返回Observables。

编辑2:目前有这样的

public static Observable<MyCustomObject> quickRun(Service networkService) { 
    return Observable.concat(
      networkService.getOne().map(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       return case; 
      }), 
      networkService.getTwo().map(response -> { 
       Request request = response.raw().request(); 
       MyCustomObject case = new MyCustomObject(request); 
       return case; 
      }) 
    ); 
} 
+0

是否所有9个'Observable's返回相同的类型?他们互相依赖,还是可以独立运行? –

+0

可能有一个observable返回一个不同的类型,但大多数情况下我可以使它工作(我可以将两个结果都包装在另一个类型或其他类型中),并且每个都可以独立运行。 – EGHDK

是的,你应该做它用不同的方式。
每当你将Observable视为像常规的异步回调那样,这是一种你没有做出反应的气味。
正如你所说Observable.create()是不平凡的方式,创造Observable的一些陷阱,也就是说,你可能使用旧版本的RxJava 1,与更新的版本(1.3+我认为),并与RxJava2,创建基于发射器并且更安全。你可以阅读here关于Create和发射器方法的缺陷。 (作为RxJava2的附注,有另一种方式extending Observable)。

所有这些方法都是在异步回调世界与反应世界之间架起一道桥梁,并用Observable包装任何类型的异步操作。至于你的情况,因为你手头已经有Observables,所以你需要将它们组合成单一的流 - Observable。 Rx有很多运营商为此目的,根据你的例子Observable.merge似乎是合适的运营商,你所有Observable将异步运行(注意,你将需要适用于他们每个人的IO调度器),并且每个Observable将发射它在合并流上的结果,当所有的Observable将完成时 - onCompleted将在合并Observable上调用,这在您的示例中是错误的,因为它在刚启动所有任务之后才被调用。

Observable.merge(
     networkService.getOne() 
       .subscribeOn(Schedulers.io()), 
     networkService.getTwo() 
       .subscribeOn(Schedulers.io()) 
) 
     .map(response -> { 
      Request request = response.raw().request(); 
      return new MyCustomObject(request); 
     }) 
     .subscribe(customObject -> { 
        //do stuff 
       }, throwable -> { 
        //handle error 
       } 
     ); 
+0

其实,就我而言,我希望他们都能同步运行......这是怎么改变的? – EGHDK

+0

同样的方法也适用,只是用不同的运营商 - 'Observable.concat()' – yosriz

+0

所以我会做'Observable.concat( networkService.getOne() .MAP(), networkService.getTwo() .MAP () )'?不确定你通过合并定义的顺序,然后映射... – EGHDK