如何在不丢失物品的情况下暂停Observable?

如何在不丢失物品的情况下暂停Observable?

问题描述:

我有发出Observable蜱每秒:如何在不丢失物品的情况下暂停Observable?

Observable.interval(0, 1, TimeUnit.SECONDS) 
    .take(durationInSeconds + 1)); 

我想暂停此可观察到,因此停止发光的数字,并恢复它的需求。

有一些陷阱:

  • 根据Observable的Javadoc,interval运营商并不提供背压
  • 的RxJava维基约backpressure具有部分大约调用栈阻断作为流量控制的替代背压

另一种处理生产率过高的Observable的方法是阻止调用堆栈(停止支配生产过剩的Observable的线程)。这具有违背Rx的“反应性”和非阻塞模型的缺点。然而,如果有问题的Observable位于可以安全阻止的线程上,这可能是一个可行的选项。 目前RxJava不公开任何运营商来促进这一点。

是否有办法暂停interval Observable?还是应该用一些背压支持来实现我自己的'滴答'Observable?

+0

提醒我的[这个问题](http://stackoverflow.com/questions/35419062/how-to-stop-and-resume-observable-interval-emiting-ticks/35419343#35419343) – AndroidEx

有很多方法可以做到这一点。例如,您仍然可以使用​​并保持两种附加状态:例如布尔标志“paused”和计数器。

public static final Observable<Long> pausableInterval(
    final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) { 

    final AtomicLong counter = new AtomicLong(); 
    return Observable.interval(initial, interval, unit, scheduler) 
     .filter(tick -> !paused.get()) 
     .map(tick -> counter.getAndIncrement()); 
} 

然后你只需要调用paused.set(真/假)某处暂停/恢复

EDIT 2016年6月4日

有以上解决一个小问题。 如果我们多次重用可观察实例,它将从最后一次取消订阅的值开始。例如:

Observable<Long> o = pausableInterval(...) 
List<Long> list1 = o.take(5).toList().toBlocking().single(); 
List<Long> list2 = o.take(5).toList().toBlocking().single(); 

虽然list1是预期的[0,1,2,3,4],但list2实际上是[5,6,7,8,9]。 如果上述行为不是所期望的,则可观测值必须变为无状态。这可以通过scan()运算符来实现。 修订后的版本可以是这样的:

public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay, 
     final long period, TimeUnit unit, Scheduler scheduler) { 

    return Observable.interval(initialDelay, period, unit, scheduler) 
     .filter(tick->!pause.get()) 
     .scan((acc,tick)->acc + 1); 
    } 

或者,如果你不希望依赖于Java 8和Lambda表达式,你可以使用Java 6+兼容的代码做这样的事情:

https://github.com/ybayk/rxjava-recipes/blob/v0.0.2/src/main/java/yurgis/rxjava/recipes/RxRecipes.java#L343-L361

+0

为什么你用过Blocking操作符?无法阻止不起作用?或者如果没有toBlocking操作员,如何订阅此事件? – Matrix12

+0

Toblocking仅用于演示或单元测试。它只是为了确保您的单元测试在observable完成之前不会退出。您可以像其他可观察项一样运行异步。 – yurgis