使用RxJava将多个项目从observable组合到新对象
问题描述:
我试图实现某种流式解析器。假设我有整数流,并将它们合并以创建新的对象,该对象聚集了流的一部分。使用RxJava将多个项目从observable组合到新对象
例如,当整数为负时,对象为“完成”。为了简单起见,生产的项目将是数字串。
下面是简单的例子:
Source: 1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11
Output: "1-2", "343-5", "6-7", "8910-11"
我无法找到现有运营商的适当组合。我的解决办法是提供运营商定制,它汇集数据,直到“块”是有效的,并发出新的对象:
rx.Observable<Integer> src = rx.Observable
.from(Arrays.asList(1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11));
rx.Observable<String> res = src
.lift(new rx.Observable.Operator<String, Integer>() {
@Override
public rx.Subscriber<? super Integer> call(
final rx.Subscriber<? super String> subscriber) {
return new rx.Subscriber<Integer>(subscriber) {
private final StringBuilder cur = new StringBuilder();
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
@Override
public void onNext(Integer i) {
if (subscriber.isUnsubscribed())
return;
cur.append(i).append('');
if (i < 0) {
subscriber.onNext(cur.toString());
cur.delete(0, cur.length());
}
}
};
}
});
rx.observers.TestSubscriber<String> sub =
new rx.observers.TestSubscriber<>();
res.subscribe(sub);
List<String> l1 = sub.getOnNextEvents();
它工作得很好,直到我用一些位置运营商在他们身上,像取值(N)或combineLatest。
rx.Observable<String> res2 = res.take(2);
在这种情况下,我没有得到onCompleted或的onError和项目的少量发射。
它看起来像每个输入onNext应该有输出onNext下一个调用。
任何想法?
答
嗯。写长篇文章确实有助于提出新的想法,并且有效。 (我已经2天头撞墙了)。
产生Observer<Observer<String>>
并发出Observer.empty()
直到物体准备就绪。 flatMap
之后。
这里是工作示例:
rx.Observable<rx.Observable<String>> res = src
.lift(new rx.Observable.Operator<rx.Observable<String>, Integer>() {
@Override
public rx.Subscriber<? super Integer> call(
final rx.Subscriber<? super rx.Observable<String>> subscriber) {
return new rx.Subscriber<Integer>(subscriber) {
private final StringBuilder cur = new StringBuilder();
// onError, onCompleted skipped
@Override
public void onNext(Integer i) {
rx.Observable<String> out;
cur.append(i);
if (i < 0) {
out = rx.Observable.just(cur.toString());
cur.delete(0, cur.length());
} else {
out = rx.Observable.<String>empty();
}
subscriber.onNext(out);
}
};
}
});
rx.Observable<String> res2 = res
.flatMap(stringObservable -> stringObservable)
.take(2);