RxJava:OnNext取消订阅不起作用

RxJava:OnNext取消订阅不起作用

问题描述:

我想在从observable接收到第一个项目后取消订阅。它似乎不起作用。我究竟做错了什么?RxJava:OnNext取消订阅不起作用

public class ObservableAndSubscriber { 

    public static void main(final String[] args) { 
     final Observable<String> strObservable = Observable.create(s -> { 
      while (true) { 
       s.onNext("Hello World!!"); 
      } 
     }); 

     final Subscriber<String> strSubscriber = new Subscriber<String>() { 

      @Override 
      public void onCompleted() { 

      } 

      @Override 
      public void onError(final Throwable e) { 
       e.printStackTrace(); 

      } 

      @Override 
      public void onNext(final String t) { 
       System.out.println(t); 
       this.unsubscribe(); 

      } 
     }; 
     strObservable.subscribe(strSubscriber); 
    } 
} 

结果似乎在无限循环中打印“Hello World”。

+0

http://stackoverflow.com/q/30046124/697313 - 相关讨论。 –

在我读Tomasz Nurkiewicz的书时,我做了这个例子。他后来有了一个类似的例子,并解释了我的代码出了什么问题。

无限循环是邪恶的!因为我有一个可观察的创建lambda表达式,它在我的主线程的上下文中执行并无限期地订阅块。为了让可观察的create lambda在不同的线程中执行,这意味着主线程永远不会被阻塞,并且subscribe()完成。我问订阅发生在一个IO线程上,并且做了这个窍门,这意味着无限循环发生在IO线程上,并且不会阻塞主线程。

strObservable.subscribeOn(Schedulers.io()).subscribe(strSubscriber); 
+0

我不认为这将解决取消订阅的问题。在'main()'末尾添加'Thread.sleep(1000)'以延迟程序终止,并看看它是如何发生的。 –

对于取消订阅工作,您需要检查循环中的订阅状态。否则,它将无限耗尽您的CPU。

处理无限序列最简单的方法是利用库提供的方法Observable.generate()Observable.fromIterable()。他们会为你做适当的检查。

此外请确保您订阅并观察不同线程。否则,您只能为单个订户提供服务。

你举的例子:

strObservable = Observable.generate(g -> g.onNext("Hello!")); 
strObservable.subscribeOn(Schedulers.newThread()).subscribe(strSubscriber);