使用信号优雅地终止Autobahn ApplicationRunner()。run().SIGINT

问题描述:

我以前问过how to run several autobahn.ApplicationSession instances from within the same python process,没有阻止它们。使用信号优雅地终止Autobahn ApplicationRunner()。run().SIGINT

问题已解决,但我遇到了一个新问题。

终止这些mp.Process实例很困难。我知道ApplicationRunner.run()中的代码在KeyboardInterrupt上退出,但我无法正确触发它。

示例代码:

class PoloniexSession(ApplicationSession): 

    @coroutine 
    def onJoin(self, *args, **kwargs): 
     channel = self.config.extra['channel'] 

     def onTicker(*args, **kwargs): 
      self.config.extra['queue'].put((channel, (args, kwargs, time.time()))) 

     try: 
      yield from self.subscribe(onTicker, self.config.extra['channel']) 

     except Exception as e: 
      raise 


class PlnxEndpoint(mp.Process): 
    def __init__(self, endpoint, q, **kwargs): 
     super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' % 
               endpoint, **kwargs) 
     self.endpoint = endpoint 
     self.q = q 

    def run(self): 
     self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1', 
            extra={'channel': self.endpoint, 
              'queue': self.q}) 
     self.runner.run(PoloniexSession) 

    def join(self, *args, **kwargs): 
     def sig_handler(x, y): 
      pass 
     signal.signal(signal.SIGINT, sig_handler) 
     super(PlnxEndpoint, self).join(*args, **kwargs) 


class PoloniexWSS(WSSAPI): 
    def __init__(self, endpoints=None): 
     super(PoloniexWSS, self).__init__(None, 'Poloniex') 
     self.data_q = mp.Queue() 
     self.connections = {} 
     if endpoints: 
      self.endpoints = endpoints 
     else: 
      r = requests.get('https://poloniex.com/public?command=returnTicker') 
      self.endpoints = list(r.json().keys()) 
      self.endpoints.append('ticker') 

     for endpoint in self.endpoints: 
      self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q) 

    def start(self): 
     super(PoloniexWSS, self).start() 
     for conn in self.connections: 
      self.connections[conn].start() 

    def stop(self): 
     for conn in self.connections: 
      self.connections[conn].join() 
     super(PoloniexWSS, self).stop() 

虽然这充分填补self.q,我仍然收到当我的子进程停止的错误:这使我相信我的signal.SIGINT不会被触发,其中

RuntimeError: Event loop stopped before Future completed. 
    Traceback (most recent call last): 
    File "/home/nils/anaconda3/lib/python3.5/multiprocessing /process.py", line 254, in _bootstrap 
    self.run() 
    File "/home/nils/git/tools/bitexwss/bitexws//api/poloniex.py", line 46, in run 
    self.runner.run(PoloniexSession) 
    File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 172, in run 
    loop.run_until_complete(protocol._session.leave()) 
    File "/home/nils/anaconda3/lib/python3.5/asyncio/base_events.py", line 335, in run_until_complete 
    raise RuntimeError('Event loop stopped before Future completed.') 

我想要它。

According to the source code of ApplicationRunner.run(),a SIGINT/KeyboardInterrupt应该优雅地导致结束serve_forever()方法。

手动关闭asyncio.event_loop导致上述错误,以及:

class PlnxEndpoint(mp.Process): 
#... 
    def join(self, *args, **kwargs): 
     loop = get_event_loop() 
     loop.stop() 
     super(PlnxEndpoint, self).join(*args, **kwargs) 
#... 
+0

我们应该在上一个问题中处理这个问题,我建议关闭这个问题。 – stovfl

+0

嗯,我不同意。这是一个单独的问题,毕竟。 '在单独的进程中运行2个ApplicationRunner()。run()方法'vs'在分离进程中关闭ApplicationRunner'。它们无疑是相关的,但仍然是两个不同的问题。 – nlsdfnbch

毕竟,摆弄周围的一点点取得了相当简单的解决方案:

使用multiprocessing.Event(),我才得以正常结束我的过程。

class PoloniexSession(ApplicationSession): 

    @coroutine 
    def onJoin(self, *args, **kwargs): 
     channel = self.config.extra['channel'] 

     def onTicker(*args, **kwargs): 
      self.config.extra['queue'].put((channel, (args, kwargs, time.time()))) 

     if self.config.extra['is_killed'].is_set(): 
      raise KeyboardInterrupt() 
     try: 
      yield from self.subscribe(onTicker, self.config.extra['channel']) 

     except Exception as e: 
      raise 


class PlnxEndpoint(mp.Process): 
    def __init__(self, endpoint, q, **kwargs): 
     super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' % 
               endpoint, **kwargs) 
     self.endpoint = endpoint 
     self.q = q 
     self.is_killed = mp.Event() 

    def run(self): 
     self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1', 
            extra={'channel': self.endpoint, 
              'queue': self.q, 
              'is_killed': self.is_killed}) 
     self.runner.run(PoloniexSession) 

    def join(self, *args, **kwargs): 
     self.is_killed.set() 
     super(PlnxEndpoint, self).join(*args, **kwargs) 
+0

'def join()'现在可以按照推荐使用了。但仍然没有说明你为什么打算在join()中停止'process'。 – stovfl