限制在后台线程中使用信号量同步运行异步协程
问题描述:
作为Python新的asyncio模块的实验,我创建了以下片段来处理后台工作程序中的一组长时间运行的动作(作业)。限制在后台线程中使用信号量同步运行异步协程
为了控制同时运行的作业的数量,我在中引入了一个信号量(第56行)。然而,随着信号量的到位,似乎所获得的锁定从未被释放,因为完成后(执行回调),等待的作业不会开始。当我沟通与块,一切都按预期工作。
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
任何人都可以帮我解释一下这种行为吗?当与块被清除时,为什么不是信号量增加?
答
我发现了这个bug:信号量是用主线程中的隐式事件回调来初始化的,而不是在线程以run()
开始时显式设置的信号量。
修正版本:
import asyncio
from queue import Queue, Empty
from datetime import datetime
from threading import Thread
class BackgroundWorker(Thread):
def __init__(self):
super().__init__()
self._keep_running = True
self._waiting_coros = Queue()
self._tasks = []
self._loop = None # Loop must be initialized in child thread.
self.limit_simultaneous_processes = None # Semaphore must be initialized after the loop is set.
def stop(self):
self._keep_running = False
def run(self):
self._loop = asyncio.new_event_loop() # Implicit creation of the loop only happens in the main thread.
asyncio.set_event_loop(self._loop) # Since this is a child thread, we need to do in manually.
self.limit_simultaneous_processes = asyncio.Semaphore(2)
self._loop.run_until_complete(self.process_coros())
def submit_coro(self, coro, callback=None):
self._waiting_coros.put((coro, callback))
@asyncio.coroutine
def process_coros(self):
while self._keep_running:
try:
while True:
coro, callback = self._waiting_coros.get_nowait()
task = asyncio.async(coro())
if callback:
task.add_done_callback(callback)
self._tasks.append(task)
except Empty as e:
pass
yield from asyncio.sleep(3) # sleep so the other tasks can run
background_worker = BackgroundWorker()
class Job(object):
def __init__(self, idx):
super().__init__()
self._idx = idx
def process(self):
background_worker.submit_coro(self._process, self._process_callback)
@asyncio.coroutine
def _process(self):
with (yield from background_worker.limit_simultaneous_processes):
print("received processing slot %d" % self._idx)
start = datetime.now()
yield from asyncio.sleep(2)
print("processing %d took %s" % (self._idx, str(datetime.now() - start)))
def _process_callback(self, future):
print("callback %d triggered" % self._idx)
def main():
print("starting worker...")
background_worker.start()
for idx in range(10):
download_job = Job(idx)
download_job.process()
command = None
while command != "quit":
command = input("enter 'quit' to stop the program: \n")
print("stopping...")
background_worker.stop()
background_worker.join()
if __name__ == '__main__':
main()
我不明白您的问题:信号计数减少,预期在你的代码示例增加。 – 2015-02-11 16:36:12
您是否运行过代码?如果我这样做,前两个作业的回调被执行,但循环中的其他协程不会开始执行。我认为他们仍然在等待从信号量获得锁定,但我可能在这里是错误的。也许他们在等别的东西? – ImapUkua 2015-02-11 18:41:55