RxJS:将Node.js套接字转换为Observable并将它们合并为一个流

RxJS:将Node.js套接字转换为Observable并将它们合并为一个流

问题描述:

我试图将Node的套接字转换为使用RxJS的流。目标是让每个套接字创建它自己的流,并将所有流合并为一个。当新的套接字连接时,将使用socketStream = Rx.Observable.fromEvent(socket, 'message')创建一个流。RxJS:将Node.js套接字转换为Observable并将它们合并为一个流

然后流合并到的东西主流像

mainStream = mainStream.merge(socketStream)

这似乎做工精细,问题是,经过200-250客户端连接,服务器抛出RangeError: Maximum call stack size exceeded

我有样的服务器和客户端代码,演示了在这里要点此行为: Sample Server and Client

我怀疑,作为客户端连接/断开,主流不正确清理。

问题是您正在递归合并您的Observable。每次你做

cmdStream = cmdStream.merge(socketStream); 

你正在创建一个新的MergeObservable/MergeObserver对。

看一下source,你可以看到你基本上对每个订阅所做的事情是按顺序订阅你以前的每一个流,所以不应该很难看到在250个连接处你的调用栈可能至少有1000个电话深度。

解决此问题的更好方法是转换使用flatMap运算符并将您的连接视为创建ObservableObservables

//Turn the connections themselves into an Observable 
var connections = Rx.Observable.fromEvent(server, 'connection', 
        socket => new JsonSocket(socket)); 

connections 
    //flatten the messages into their own Observable 
    .flatMap(socket => { 
    return Rx.Observable.fromEvent(socket.__socket, 'message') 
      //Handle the socket closing as well 
      .takeUntil(Rx.Observable.fromEvent(socket.__socket, 'close')); 
    }, (socket, msg) => { 
    //Transform each message to include the socket as well. 
    return { socket : socket.__socket, data : msg}; 
    }) 
    .subscribe(processData, handleError); 

上面我没有测试过,但应该修复你的SO错误。

我也许会质疑这个的总体设计。通过将所有Observable合并在一起,你究竟获得了什么?您仍然通过将套接字对象与消息一起传递来区分它们,因此看起来这些可能都是不同的流。

+0

哦,不仅是这个解决了我的问题,它也将连接变成流(我想这样做,但不明白flatMap),并在10行代码(从我的版本100+是我为了要点而清理的)。谢谢! ++啤酒 –

+0

哦,关于设计,正如你可以告诉我只是进入Rx的东西,所以这部分是“一切都是流”的思维练习,也意图以后能够缓冲和设置一些如果需要的话反压。如果所有传入的消息都在一个流中,缓冲将更直接。 –

+0

非常欢迎。这不是一条硬性和快速的规则,对于你的情况来说,合并更有意义。无论哪种方式欢迎到溪流之美! – paulpdaniels