rxjs5:延迟取消订阅共享可观察者

rxjs5:延迟取消订阅共享可观察者

问题描述:

我有一个可观察的创建成本很高,所以我的shared它。然而,在某些情况下,所有订户都会取消订阅,然后立即(或在短暂延迟后)新订阅者订阅。rxjs5:延迟取消订阅共享可观察者

实际观察到的太复杂,这里复制的,但对于参数的缘故:

const heavyObservable = Rx.Observable.create((observer) => { 
    console.log('I am expensive, avoid hitting this code'); 

    return Rx.Observable 
      .interval(500) // these updates are cheap though! 
      .subscribe(observer) 
       .add(() => { 
        console.log('Cache has been destroyed, will have to be rebuild on next call'); 
       }); 
}); 

我不想打了创建这个观察到昂贵的代码。我想延迟断开,直到n ms。有没有办法做到这一点?

const sharedObservable = heavyObservable 
    .publish() 
    // ideally I'm looking for a way to get refCount to wait for new 
    // subscribers for n ms before unsubscribing when refcount === 0 
    .refCount(); 

// calling subscribe here invokes heavyObservable which can take a bit of time 
const subscription1 = sharedObservable.subscribe(); 
// log: I am expensive, avoid hitting this code 

// second call is "free" - the underlying observable is reused 
const subscription2 = sharedObservable.subscribe(); 

subscription1.unsubscribe(); 
subscription2.unsubscribe(); 

// calling subscribe again here invokes heavyObservable over again 
const subscription3 = sharedObservable.subscribe(); 
// log: I am expensive, avoid hitting this code 

当没有完成退订,再没有新的数据将被发射(除非是在流,这不是明显的在你的问题的开始触发)。 - 您的案例中的subscription1subscription2应该收到相同的值。 如果这是设计,那么你可以不使用refCount(),但只是发布,然后做sharedObservable.connect(),在这种情况下,它总是“热”。 另一种选择可能是publishReplay(1)而不是publish()

以任何方式,您的情况听起来有点奇怪,最有可能通过改变数据流的一般体系结构来解决 - 但是,如果不知道真实用例,很难说出哪些rxjs操作这里是最好的。

+0

有关调查可观察到的数据,并将其转换,并保持它是最新的。只要变换后的数据发生变化,它就会发射。这个转换最初很昂贵,但维护起来很便宜。我可以使用连接,这将保持服务器连接永久打开。如果可能的话,我想断开连接。大多数情况下,这工作得很好,但我有一些边界情况下快速退订和重新结束。我可以在调用者中通过添加一个明确的,长期存在的sub来处理这个问题,但是这是一个需要记住要做的和清理的额外事情。我宁愿有可观察的处理订阅“平滑” – studds

+0

您可以发布该流吗? - 如果没有完整的图片,几乎不可能提供适当的帮助(当然,你可以改变网址等。) – olsn

+0

我很喜欢,但它太大而无法在这里复制。显然,如果我可以让代码更便宜运行或避免运行代码,那最好,但这超出了堆栈溢出问题的范围,我想。我添加了一个micky-mouse示例来尝试并更好地说明目标。 – studds

试图解决这个问题。下面的函数包装提供的ConnectableObservable source并维护用户的refCount。当第一用户订阅时,它呼叫connect(),然后当最后一个用户退订delayms后,从source呼叫setTimeoutunsubscribes

理想情况下,我宁愿修改现有的refCount observable,但我不明白代码是诚实的。

不知道这是否涵盖所有可能的边缘情况或是否会产生意想不到的副作用。

Plunker:https://jsbin.com/wafahusitu/edit?js,console

function refCountWithUnsubscriptionDelay<T>(source: Rx.ConnectableObservable<T>, delay: number): Rx.Observable<T> { 

    const refCount = 0; 
    const sub; 
    let timeoutRef; 

    return Rx.Observable.create((observer: Rx.Observer<T>) => { 
     refCount++; 
     if (timeoutRef) { 
      clearTimeout(timeoutRef); 
     } 
     console.log('refCount = ' + refCount); 
     if (!sub) { 
      // connect on first call 
      sub = source.connect(); 
     } 

     return source.subscribe(observer) 
       .add(function() { 
        refCount --; 
        if (refCount <= 0) { 
         // trigger delayed unsubscription if there are no listeners 
         timeoutRef = setTimeout(() => { 
          // don't unsubscribe if new listeners have subscribed 
          if (refCount <= 0) { 
           console.log('unsub'); 
           sub.unsubscribe(); 
           sub = undefined; 
           timeoutRef = undefined; 
          } 
         }, delay); 
        } 
       }); 
    }) 
} 
+0

完全诚实:这看起来不像是一种解决方案,应该与其他人共享 - 它可能对你“有点”起作用 - 但它肯定不是rxjs打算工作的方式 - 而且我绝对相信这一点可以通过为“沉重”的流使用不同的设置来避免 - 但是如果没有看到那个流,就不可能帮助你 - 为了“拯救”可能偶然遇到这个问题的其他用户:请删除你的答案因为它对你的情况来说太具体了,并且使用非rxjs实践。 – olsn

+0

谢谢olsn。您能否更具体地了解非rxjs的做法?或者是否有一些引用可以指向我?你似乎强烈地感觉这是一个不好的解决方案。如果你能够提供更多的指导,为什么这么糟糕,这将有助于我的学习。 – studds

+0

改变订阅机制就像购买扳手并修改它成为一把刀 - 如果你不能发布你的流,也许你可以在几个步骤中解释你的流应该做什么,什么时候进行订阅和预期什么数据在订阅内(请随时在gmail的pure.onh上给我留言) – olsn