RxJava 2 - 如何链接异步电话

RxJava 2 - 如何链接异步电话

问题描述:

我刚刚开始学习RxJava,所以如果我要求一个新手问题,请不要将它钉死在十字架上,但我已经花了好几天的时间来尝试解决这个问题,但没有取得任何成功。我已经阅读了几乎所有我能找到的文档,并沿着大多数关于http://reactivex.io/tutorials.html的教程进行了跟踪。我搜索了StackOverflow和互联网的其他高低,但显然我似乎是这个星球上唯一有这个问题的人。这很奇怪,因为它基本上归结为每件软件必须做的事情:登录用户。RxJava 2 - 如何链接异步电话

我发现的所有教程都是在一个流上应用一些函数,创建一个新的流,这是有用的,真棒,不要误解我的意思,但对我来说并不真正有帮助。这也让我想到......也许我完全错了。但是我现在陷入了如此深刻的境地,并且也遵循了“一切都是流”的口号,为什么不可能呢?

因此,这里是我想要做的事:

  1. 表明某种形式加载
  2. 呼叫Completable一些服务器上执行登录操作
  3. 呼叫Single进行创建某些服务器上的用户操作返回用于本地参考的用户标识
  4. 使用Single调用结果在下一个操作中并隐藏加载

尽管我最终会在Android上做到这一点,但我创建了一个基本的Java 8示例来概述我想实现的目标。

这是我想出了迄今:

注:

  • 我使用rxjava2
  • getMainStream()功能有模拟一些互动

这里一个可运行版本的代码:

public static void main(final String[] args) { 
    getMainStream() 
     .doOnNext(__ -> showLoading()) 
     .flatMap(__ -> loginUser().toObservable()) 
     .flatMap(__ -> createUser().toObservable()) 
     .doOnNext(userId -> { 
      hideLoading(); 
      System.out.println("userId: " + userId); 
     }) 
     .subscribe(); 
} 

public static Completable loginUser() { 
    return Completable.create(new CompletableOnSubscribe() { 
     @Override 
     public void subscribe(final CompletableEmitter e) throws Exception { 
      Thread.sleep(500); 
      System.out.println("loginUser"); 
      e.onComplete(); 
     } 
    }); 
} 

public static Single<String> createUser() { 
    return Single.<String>create(new SingleOnSubscribe<String>() { 
     @Override 
     public void subscribe(final SingleEmitter<String> e) throws Exception { 
      Thread.sleep(1000); 
      System.out.println("createUser"); 
      e.onSuccess("some_user_id"); 
     } 
    }); 
} 

public static Completable getCompletable(final String input) { 
    return Completable.create(new CompletableOnSubscribe() { 
     @Override 
     public void subscribe(final CompletableEmitter e) throws Exception { 
      Thread.sleep(750); 
      System.out.println("completable, input=" + input); 
      e.onComplete(); 
     } 
    }); 
} 

public static Observable<Object> getMainStream() { 
    return Observable.just(new Object()); 
} 

private static void hideLoading() { 
    System.out.println("hideLoading()"); 
} 

private static void showLoading() { 
    System.out.println("showLoading()"); 
} 

这个控制台输出为:

showLoading() 
loginUser 

遗憾的是,登录用户永远不会返回?

我真的很期待这个话题的任何帮助!

谢谢!

+1

不要以为你需要巨大的介绍。如果'loginUser'完成,它不会在流链下发射任何值。但是'flatMap(__ - > createUser())。toObservable())只有在有新的发射对象时才会执行'createUser'。这意味着你的整个链条只会终止。 – masp

+0

为什么不呢?毕竟它调用完成,我认为'Completable'只是一个'Observable'的特殊情况?!我尝试了将'Completable'与“真正的”'Observable'交换的链,然后它可以工作。我不明白'Completable'和'Single'的好处,那么如果它不可链接的话。 #confused – grAPPfruit

+0

为什么'Completable'和'Single'需要存在是一个完全不同的问题。你可以用“Observable”来覆盖每个案例。但是,请考虑在您的数据库中插入某些内容并发出Api请求的两种情况。第一个通常会完成或返回错误,第二个通常会返回一个项目或错误。 – masp

loginUser()Completable,当你转换到CompletableObservable,效果可观察到将完成。因此,createUser()没有下游值可以采取行动。

您可能会考虑将表达式更改为loginUser().andThen(() -> createUser().toObservable(),这会导致发出字符串。

+0

让我试着去理解这一点:'Completable'是'Observable'的一种特殊形式,它永远不会调用'onNext()' 'onComplete()',因此,如果链接在一起,“不允许”流将继续,因为Stream只在调用onNext()时才继续。另一方面,'Single'是可链接的,因为它调用onNext() )“恰好一次,这句话是否正确?你的建议ed解决方案工程btw。我结束了'.flatMap(__ - > loginUser()。和Then(createUser())。toObservable())''。这是好的,而不是滥用'flatMap()' – grAPPfruit

+0

你的总结是正确的。而'flatMap()'是为了你的目的而设计的。 –