RxJava 2 - 如何链接异步电话
我刚刚开始学习RxJava,所以如果我要求一个新手问题,请不要将它钉死在十字架上,但我已经花了好几天的时间来尝试解决这个问题,但没有取得任何成功。我已经阅读了几乎所有我能找到的文档,并沿着大多数关于http://reactivex.io/tutorials.html的教程进行了跟踪。我搜索了StackOverflow和互联网的其他高低,但显然我似乎是这个星球上唯一有这个问题的人。这很奇怪,因为它基本上归结为每件软件必须做的事情:登录用户。RxJava 2 - 如何链接异步电话
我发现的所有教程都是在一个流上应用一些函数,创建一个新的流,这是有用的,真棒,不要误解我的意思,但对我来说并不真正有帮助。这也让我想到......也许我完全错了。但是我现在陷入了如此深刻的境地,并且也遵循了“一切都是流”的口号,为什么不可能呢?
因此,这里是我想要做的事:
- 表明某种形式加载
- 呼叫
Completable
一些服务器上执行登录操作 - 呼叫
Single
进行创建某些服务器上的用户操作返回用于本地参考的用户标识 - 使用
Single
调用结果在下一个操作中并隐藏加载
尽管我最终会在Android上做到这一点,但我创建了一个基本的Java 8示例来概述我想实现的目标。
这是我想出了迄今:
注:
- 我使用rxjava2
- 的
getMainStream()
功能有模拟一些互动
这里一个可运行版本的代码:
public static void main(final String[] args) {
getMainStream()
.doOnNext(__ -> showLoading())
.flatMap(__ -> loginUser().toObservable())
.flatMap(__ -> createUser().toObservable())
.doOnNext(userId -> {
hideLoading();
System.out.println("userId: " + userId);
})
.subscribe();
}
public static Completable loginUser() {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter e) throws Exception {
Thread.sleep(500);
System.out.println("loginUser");
e.onComplete();
}
});
}
public static Single<String> createUser() {
return Single.<String>create(new SingleOnSubscribe<String>() {
@Override
public void subscribe(final SingleEmitter<String> e) throws Exception {
Thread.sleep(1000);
System.out.println("createUser");
e.onSuccess("some_user_id");
}
});
}
public static Completable getCompletable(final String input) {
return Completable.create(new CompletableOnSubscribe() {
@Override
public void subscribe(final CompletableEmitter e) throws Exception {
Thread.sleep(750);
System.out.println("completable, input=" + input);
e.onComplete();
}
});
}
public static Observable<Object> getMainStream() {
return Observable.just(new Object());
}
private static void hideLoading() {
System.out.println("hideLoading()");
}
private static void showLoading() {
System.out.println("showLoading()");
}
这个控制台输出为:
showLoading()
loginUser
遗憾的是,登录用户永远不会返回?
我真的很期待这个话题的任何帮助!
谢谢!
loginUser()
是Completable
,当你转换到Completable
Observable
,效果可观察到将完成。因此,createUser()
没有下游值可以采取行动。
您可能会考虑将表达式更改为loginUser().andThen(() -> createUser().toObservable()
,这会导致发出字符串。
让我试着去理解这一点:'Completable'是'Observable'的一种特殊形式,它永远不会调用'onNext()' 'onComplete()',因此,如果链接在一起,“不允许”流将继续,因为Stream只在调用onNext()时才继续。另一方面,'Single'是可链接的,因为它调用onNext() )“恰好一次,这句话是否正确?你的建议ed解决方案工程btw。我结束了'.flatMap(__ - > loginUser()。和Then(createUser())。toObservable())''。这是好的,而不是滥用'flatMap()' – grAPPfruit
你的总结是正确的。而'flatMap()'是为了你的目的而设计的。 –
不要以为你需要巨大的介绍。如果'loginUser'完成,它不会在流链下发射任何值。但是'flatMap(__ - > createUser())。toObservable())只有在有新的发射对象时才会执行'createUser'。这意味着你的整个链条只会终止。 – masp
为什么不呢?毕竟它调用完成,我认为'Completable'只是一个'Observable'的特殊情况?!我尝试了将'Completable'与“真正的”'Observable'交换的链,然后它可以工作。我不明白'Completable'和'Single'的好处,那么如果它不可链接的话。 #confused – grAPPfruit
为什么'Completable'和'Single'需要存在是一个完全不同的问题。你可以用“Observable”来覆盖每个案例。但是,请考虑在您的数据库中插入某些内容并发出Api请求的两种情况。第一个通常会完成或返回错误,第二个通常会返回一个项目或错误。 – masp