如何在RxJs中执行操作后重试观察值?

如何在RxJs中执行操作后重试观察值?

问题描述:

我正在使用Angular 2和RxJS从我的API层检索数据。我有一个基础服务,我已经写出来处理HTTP请求。下面的代码做到这一点我的服务范围内:如何在RxJs中执行操作后重试观察值?

protected apiGet(uri: string): Observable<HttpMappedResponse<T>> { 
    return this.mapAndHandle<T>(this.http.get(uri)); 
} 

private mapAndHandle<T>(observable: Observable<Response>): Observable<HttpMappedResponse<T>> { 
    const o = observable 
     .map(res => { 
      const data = res.json() || {}; 
      const mappedResponse = new HttpMappedResponse<T>(res, data); 
      // Retrieve page information from HTTP headers 
      mappedResponse.pageInfo = new PageInfo(res.headers); 
      return mappedResponse; 
     }) 
     .share(); 

    o.subscribe(null, e => this.errorService.handleHttpResponse(e)); 
    return o; 
} 

这工作得很好,给了我一切,当我打电话apiGet()我需要的数据。什么我现在要做的是以下情况:

  • 发送一个HTTP请求
  • 如果返回401错误与特定的字符串:
    • 发送另一个请求到不同的URL
    • 更新另一个服务
    • 重试该原始请求
    • (然后可能添加逻辑以处理可能无限的重试)
  • 否则
    • 重新抛出原来的HTTP错误
  • 捕获错误并使用.catch()处理它

我试过,.retryWhen(),.MAP ()和.flatMapTo()没有成功 - 最接近我得到如下:

enum HttpVerb { 
    Put, 
    Post, 
    Get, 
    Delete 
} 

private mapAndHandle<T>(uri: string, type: HttpVerb): Observable<HttpMappedResponse<T>> { 
    const restObservable = this.getRestObservable(uri, type); 

    const o = restObservable 
     .map(res => this.mapToHttpMappedResponse<T>(res)) 
     .catch((error, sourceObservable) => { 
      const data = error.json() || {}; 

      if (error.status === 401 && data.Message === 'Token expired') { 
       console.log('Retrying'); 
       // Refresh the expired auth token 
       const refreshToken = this.authService.userData.RefreshToken; 
       console.log(refreshToken); 

       this.authService 
        .refresh(new RefreshRequestModel(refreshToken)) 
        .subscribe(x => null); 

       // Retry original 
       return this 
        .getRestObservable(uri, type) 
        .map(x => this.mapToHttpMappedResponse(x)); 
      } 
      throw error; 
     }) 
     .share(); 

    o.subscribe(null, e => { 
     this.errorService.handleHttpResponse(e); 
    }); 
    return o; 
} 

private mapToHttpMappedResponse<T>(res: Response): HttpMappedResponse<T> { 
    console.log('Attempt ' + Date.now()); 
    const data = res.json() || {}; 
    const mappedResponse = new HttpMappedResponse<T>(res, data); 
    // Retrieve page information from HTTP headers 
    mappedResponse.pageInfo = new PageInfo(res.headers); 
    return mappedResponse; 
} 

但这并没有给我正确的结果。

有没有我可以用来做这个操作符? retryWhen()似乎是最合乎逻辑的,但我只是不断让自己陷入与RxJs混为一谈!

+0

让我知道如果我的答案引导你在正确的方向。这都是伪代码,但它应该给你一个你可以做什么的总体概念(你已经知道你需要采取的步骤)。只需在重试请求时使用'flatMap',并且可以将其保留在相同的可观察流中。 – Lansana

您遇到的问题之一是您没有链接您的电话。您的刷新电话应与您的重试呼叫链接在一起,他们应该利用flatMap或类似的东西将其保持在相同的可观察的流中。


看看这个问题my answer

这是如何可以实现它:

您需要从RxJS添加catch操作。这是错误发生的地方,你可以相应地处理它。

为了让你的刷新逻辑通过拦截器,你需要返回调用,函数也应该返回Observable。例如修改上面的原始逻辑:

this.http.request(url, options) 
     .map((res: Response) => res.json()) 
     .catch((error: any) => { 
      const err = error.json(); 

      // Refresh JWT 
      if (err.status === 401) { 
       // refreshToken makes another HTTP call and returns an Observable. 
       return this.refreshToken(...); 
      } 

      return Observable.throw(err); 
     }); 

如果你希望能够重试原来的请求,你可以做的是通过原来的请求数据,这样就成功刷新令牌可以再次拨打电话。例如,这是您的refreshToken函数可能看起来像:

refreshToken(url: string, options: RequestOptionsArgs, body: any, tokenData: any): Observable<any> 
    return this.post(`${this.url}/token/refresh`, tokenData) 
     .flatMap((res: any) => { 
      // May want to use response data to set new tokens in your local storage or whatever here... 

      // This is where I retry the original request 
      return this.request(url, options, body); 
     }); 
} 
+0

谢谢!我见过flatMap()的例子,但是现在还不能真正弄清楚它在做什么。我还花了几个小时来跟踪一下我引入我的代码中的一个愚蠢的重构错误,这个代码是用POST和PUT命令替换标题中的正文。这当然没有帮助我在这里理解! – Spikeh

+0

很高兴我可以帮助:)我被困在这一段时间,但最终发现我可以利用'flatMap'来解决它。你或许可以做得更干净,但你现在就明白了。 – Lansana

+1

我的完整实现非常复杂 - 试图使事物尽可能通用的本质:) – Spikeh