为什么这个observable只发出一个值
我有以下代码根据@ a.bertucci在这里提供的示例Emit objects for drawing in the UI in a regular interval using RxJava on Android,其中我使用Timer对一个Observable进行压缩。当我通过调用processDelayedItems()来触发订阅时,压缩的Observable中的代码[A]只执行一次,一个项目发送到[B]。我希望代码[A]在触发后连续运行,并且每1500毫秒保持发射物体,但显然它只在这里运行一次。为什么这个observable只发出一个值
private static void processDelayedItems() {
Observable.zip(
Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
// [A] this code is only called once
subscriber.OnNext(o)
}
}),
Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() {
@Override public Object call(Object entity, Long aLong) {
return entity;
}
}
)
.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Object>() {
@Override public void call(Object entity) {
// ... and accordingly one item is emitted [B]
}
}, new Action1<Throwable>() {
@Override public void call(Throwable throwable) {
throwable.printStackTrace();
}
}, new Action0() {
@Override public void call() {
}
});
}
任何人都可以看到我这里有问题吗?是否需要从函数外部引用Observable以使其保持更长时间?它是由GC(Android)收集的吗?这个函数是静态的吗?
Observable的生命期规则是什么?有没有什么最佳实践应该引用更长时间运行的Observable,以及它们是否可以是静态的?在我的测试中,我注意到它并不重要,但也许它在这里,当涉及一个计时器。
-
更正代码[还没有成型]:
-
添加重复()
Observable.zip( Observable.create(new Observable.OnSubscribe<Object>() { @Override public void call(Subscriber<? super Object> subscriber) { // [A] this code is only called once subscriber.OnNext(o); subscriber.OnCompleted(); } }).repeat(Schedulers.newThread()), Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() { @Override public Object call(Object entity, Long aLong) { return entity; } } ) .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()) .subscribe(new Action1<Object>() { @Override public void call(Object entity) { // ... and accordingly one item is emitted [B] } }, new Action1<Throwable>() { @Override public void call(Throwable throwable) { throwable.printStackTrace(); } }, new Action0() { @Override public void call() { } });
您需要repeat
产生无限的可观测。例如,
Observable.create(new Observable.OnSubscribe<Object>() {
@Override public void call(Subscriber<? super Object> subscriber) {
// [A] this code is only called once
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(o);
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
}).repeat(Schedulers.newThread());
难道我需要从引用可观测的功能外,以保持它活着有更多的时间?它是由GC(Android)收集的吗?这个函数是静态的吗?
由于您使用Schedulers.newThread()
和timer
,会有其中有你而观察到的一些参考其他线程。你不需要更多的工作。
Observable的生命期规则是什么?有没有什么最佳实践应该引用更长时间运行的Observable,以及它们是否可以是静态的?在我的测试中,我注意到它并不重要,但也许它在这里,当涉及一个计时器。
你说得对。没关系。
关于你的评论,为简单起见,你可以这样做,
Observable.timer(1500, 1500, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<Object>>() {
@Override
public Observable<Object> call(Long aLong) {
String o = "0";
return Observable.from(o);
}
})
.subscribe(new Action1<Object>() {
@Override
public void call(Object aLong) {
System.out.println(aLong);
}
});
在这里,你仍然可以得到定时器的好处,而不在上面添加的ZIP /重复。它仍然有点冗长,但有点简单。
感谢您的详细回复。这是有道理的,因为Y combinator在两侧都会查找可观察对象,如果一侧没有东西需要拉链,则不会发射任何东西。 – 2014-10-29 14:12:40
我刚刚测试过 - 不知道为什么 - Observable仍然只发射一次。我更正的代码在我原来的帖子下方。即使我不这么认为,请检查我是否已将repeat()添加到正确的Observable中。 – 2014-10-29 14:34:10
另一个问题:有没有办法实现相同的不同?像使用zip而不是使用zip,有一个定时器/时间间隔Observable发出固定的时间间隔,然后触发另一个Observable在定时器/间隔Oberservable后面串行切换? – 2014-10-29 14:36:20