使用信号优雅地终止Autobahn ApplicationRunner()。run().SIGINT
我以前问过how to run several autobahn.ApplicationSession
instances from within the same python process,没有阻止它们。使用信号优雅地终止Autobahn ApplicationRunner()。run().SIGINT
问题已解决,但我遇到了一个新问题。
终止这些mp.Process
实例很困难。我知道ApplicationRunner.run()
中的代码在KeyboardInterrupt
上退出,但我无法正确触发它。
示例代码:
class PoloniexSession(ApplicationSession):
@coroutine
def onJoin(self, *args, **kwargs):
channel = self.config.extra['channel']
def onTicker(*args, **kwargs):
self.config.extra['queue'].put((channel, (args, kwargs, time.time())))
try:
yield from self.subscribe(onTicker, self.config.extra['channel'])
except Exception as e:
raise
class PlnxEndpoint(mp.Process):
def __init__(self, endpoint, q, **kwargs):
super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
endpoint, **kwargs)
self.endpoint = endpoint
self.q = q
def run(self):
self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
extra={'channel': self.endpoint,
'queue': self.q})
self.runner.run(PoloniexSession)
def join(self, *args, **kwargs):
def sig_handler(x, y):
pass
signal.signal(signal.SIGINT, sig_handler)
super(PlnxEndpoint, self).join(*args, **kwargs)
class PoloniexWSS(WSSAPI):
def __init__(self, endpoints=None):
super(PoloniexWSS, self).__init__(None, 'Poloniex')
self.data_q = mp.Queue()
self.connections = {}
if endpoints:
self.endpoints = endpoints
else:
r = requests.get('https://poloniex.com/public?command=returnTicker')
self.endpoints = list(r.json().keys())
self.endpoints.append('ticker')
for endpoint in self.endpoints:
self.connections[endpoint] = PlnxEndpoint(endpoint, self.data_q)
def start(self):
super(PoloniexWSS, self).start()
for conn in self.connections:
self.connections[conn].start()
def stop(self):
for conn in self.connections:
self.connections[conn].join()
super(PoloniexWSS, self).stop()
虽然这充分填补self.q
,我仍然收到当我的子进程停止的错误:这使我相信我的signal.SIGINT
不会被触发,其中
RuntimeError: Event loop stopped before Future completed.
Traceback (most recent call last):
File "/home/nils/anaconda3/lib/python3.5/multiprocessing /process.py", line 254, in _bootstrap
self.run()
File "/home/nils/git/tools/bitexwss/bitexws//api/poloniex.py", line 46, in run
self.runner.run(PoloniexSession)
File "/home/nils/anaconda3/lib/python3.5/site-packages/autobahn-0.14.1-py3.5.egg/autobahn/asyncio/wamp.py", line 172, in run
loop.run_until_complete(protocol._session.leave())
File "/home/nils/anaconda3/lib/python3.5/asyncio/base_events.py", line 335, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
我想要它。
According to the source code of ApplicationRunner.run()
,a SIGINT
/KeyboardInterrupt
应该优雅地导致结束serve_forever()
方法。
手动关闭asyncio.event_loop
导致上述错误,以及:
class PlnxEndpoint(mp.Process):
#...
def join(self, *args, **kwargs):
loop = get_event_loop()
loop.stop()
super(PlnxEndpoint, self).join(*args, **kwargs)
#...
毕竟,摆弄周围的一点点取得了相当简单的解决方案:
使用multiprocessing.Event()
,我才得以正常结束我的过程。
class PoloniexSession(ApplicationSession):
@coroutine
def onJoin(self, *args, **kwargs):
channel = self.config.extra['channel']
def onTicker(*args, **kwargs):
self.config.extra['queue'].put((channel, (args, kwargs, time.time())))
if self.config.extra['is_killed'].is_set():
raise KeyboardInterrupt()
try:
yield from self.subscribe(onTicker, self.config.extra['channel'])
except Exception as e:
raise
class PlnxEndpoint(mp.Process):
def __init__(self, endpoint, q, **kwargs):
super(PlnxEndpoint, self).__init__(name='%s Endpoint Process' %
endpoint, **kwargs)
self.endpoint = endpoint
self.q = q
self.is_killed = mp.Event()
def run(self):
self.runner = ApplicationRunner("wss://api.poloniex.com:443", 'realm1',
extra={'channel': self.endpoint,
'queue': self.q,
'is_killed': self.is_killed})
self.runner.run(PoloniexSession)
def join(self, *args, **kwargs):
self.is_killed.set()
super(PlnxEndpoint, self).join(*args, **kwargs)
'def join()'现在可以按照推荐使用了。但仍然没有说明你为什么打算在join()中停止'process'。 – stovfl
我们应该在上一个问题中处理这个问题,我建议关闭这个问题。 – stovfl
嗯,我不同意。这是一个单独的问题,毕竟。 '在单独的进程中运行2个ApplicationRunner()。run()方法'vs'在分离进程中关闭ApplicationRunner'。它们无疑是相关的,但仍然是两个不同的问题。 – nlsdfnbch