如何在不丢失物品的情况下暂停Observable?
问题描述:
我有发出Observable
蜱每秒:如何在不丢失物品的情况下暂停Observable?
Observable.interval(0, 1, TimeUnit.SECONDS)
.take(durationInSeconds + 1));
我想暂停此可观察到,因此停止发光的数字,并恢复它的需求。
有一些陷阱:
- 根据
Observable
的Javadoc,interval
运营商并不提供背压 - 的RxJava维基约backpressure具有部分大约调用栈阻断作为流量控制的替代背压:
另一种处理生产率过高的Observable的方法是阻止调用堆栈(停止支配生产过剩的Observable的线程)。这具有违背Rx的“反应性”和非阻塞模型的缺点。然而,如果有问题的Observable位于可以安全阻止的线程上,这可能是一个可行的选项。 目前RxJava不公开任何运营商来促进这一点。
是否有办法暂停interval
Observable?还是应该用一些背压支持来实现我自己的'滴答'Observable?
答
有很多方法可以做到这一点。例如,您仍然可以使用并保持两种附加状态:例如布尔标志“paused”和计数器。
public static final Observable<Long> pausableInterval(
final AtomicBoolean paused, long initial, long interval, TimeUnit unit, Scheduler scheduler) {
final AtomicLong counter = new AtomicLong();
return Observable.interval(initial, interval, unit, scheduler)
.filter(tick -> !paused.get())
.map(tick -> counter.getAndIncrement());
}
然后你只需要调用paused.set(真/假)某处暂停/恢复
EDIT 2016年6月4日
有以上解决一个小问题。 如果我们多次重用可观察实例,它将从最后一次取消订阅的值开始。例如:
Observable<Long> o = pausableInterval(...)
List<Long> list1 = o.take(5).toList().toBlocking().single();
List<Long> list2 = o.take(5).toList().toBlocking().single();
虽然list1是预期的[0,1,2,3,4],但list2实际上是[5,6,7,8,9]。 如果上述行为不是所期望的,则可观测值必须变为无状态。这可以通过scan()运算符来实现。 修订后的版本可以是这样的:
public static final Observable<Long> pausableInterval(final AtomicBoolean pause, final long initialDelay,
final long period, TimeUnit unit, Scheduler scheduler) {
return Observable.interval(initialDelay, period, unit, scheduler)
.filter(tick->!pause.get())
.scan((acc,tick)->acc + 1);
}
或者,如果你不希望依赖于Java 8和Lambda表达式,你可以使用Java 6+兼容的代码做这样的事情:
提醒我的[这个问题](http://stackoverflow.com/questions/35419062/how-to-stop-and-resume-observable-interval-emiting-ticks/35419343#35419343) – AndroidEx