限制在后台线程中使用信号量同步运行异步协程

问题描述:

作为Python新的asyncio模块的实验,我创建了以下片段来处理后台工作程序中的一组长时间运行的动作(作业)。限制在后台线程中使用信号量同步运行异步协程

为了控制同时运行的作业的数量,我在中引入了一个信号量(第56行)。然而,随着信号量的到位,似乎所获得的锁定从未被释放,因为完成后(执行回调),等待的作业不会开始。当我沟通与块,一切都按预期工作。

import asyncio 

from queue import Queue, Empty 
from datetime import datetime 
from threading import Thread 


class BackgroundWorker(Thread): 
    def __init__(self): 
     super().__init__() 
     self._keep_running = True 
     self._waiting_coros = Queue() 
     self._tasks = [] 
     self._loop = None # Loop must be initialized in child thread. 
     self.limit_simultaneous_processes = asyncio.Semaphore(2) 

    def stop(self): 
     self._keep_running = False 

    def run(self): 
     self._loop = asyncio.new_event_loop()  # Implicit creation of the loop only happens in the main thread. 
     asyncio.set_event_loop(self._loop)   # Since this is a child thread, we need to do in manually. 
     self._loop.run_until_complete(self.process_coros()) 

    def submit_coro(self, coro, callback=None): 
     self._waiting_coros.put((coro, callback)) 

    @asyncio.coroutine 
    def process_coros(self): 
     while self._keep_running: 
      try: 
       while True: 
        coro, callback = self._waiting_coros.get_nowait() 
        task = asyncio.async(coro()) 
        if callback: 
         task.add_done_callback(callback) 
        self._tasks.append(task) 
      except Empty as e: 
       pass 
      yield from asyncio.sleep(3)  # sleep so the other tasks can run 


background_worker = BackgroundWorker() 


class Job(object): 
    def __init__(self, idx): 
     super().__init__() 
     self._idx = idx 

    def process(self): 
     background_worker.submit_coro(self._process, self._process_callback) 

    @asyncio.coroutine 
    def _process(self): 
     with (yield from background_worker.limit_simultaneous_processes): 
      print("received processing slot %d" % self._idx) 
      start = datetime.now() 
      yield from asyncio.sleep(2) 
      print("processing %d took %s" % (self._idx, str(datetime.now() - start))) 

    def _process_callback(self, future): 
     print("callback %d triggered" % self._idx) 


def main(): 
    print("starting worker...") 
    background_worker.start() 

    for idx in range(10): 
     download_job = Job(idx) 
     download_job.process() 

    command = None 
    while command != "quit": 
     command = input("enter 'quit' to stop the program: \n") 

    print("stopping...") 
    background_worker.stop() 
    background_worker.join() 


if __name__ == '__main__': 
    main() 

任何人都可以帮我解释一下这种行为吗?当与块被清除时,为什么不是信号量增加?

+0

我不明白您的问题:信号计数减少,预期在你的代码示例增加。 – 2015-02-11 16:36:12

+0

您是否运行过代码?如果我这样做,前两个作业的回调被执行,但循环中的其他协程不会开始执行。我认为他们仍然在等待从信号量获得锁定,但我可能在这里是错误的。也许他们在等别的东西? – ImapUkua 2015-02-11 18:41:55

我发现了这个bug:信号量是用主线程中的隐式事件回调来初始化的,而不是在线程以run()开始时显式设置的信号量。

修正版本:

import asyncio 

from queue import Queue, Empty 
from datetime import datetime 
from threading import Thread 


class BackgroundWorker(Thread): 
    def __init__(self): 
     super().__init__() 
     self._keep_running = True 
     self._waiting_coros = Queue() 
     self._tasks = [] 
     self._loop = None       # Loop must be initialized in child thread. 
     self.limit_simultaneous_processes = None # Semaphore must be initialized after the loop is set. 

    def stop(self): 
     self._keep_running = False 

    def run(self): 
     self._loop = asyncio.new_event_loop()  # Implicit creation of the loop only happens in the main thread. 
     asyncio.set_event_loop(self._loop)   # Since this is a child thread, we need to do in manually. 
     self.limit_simultaneous_processes = asyncio.Semaphore(2) 
     self._loop.run_until_complete(self.process_coros()) 

    def submit_coro(self, coro, callback=None): 
     self._waiting_coros.put((coro, callback)) 

    @asyncio.coroutine 
    def process_coros(self): 
     while self._keep_running: 
      try: 
       while True: 
        coro, callback = self._waiting_coros.get_nowait() 
        task = asyncio.async(coro()) 
        if callback: 
         task.add_done_callback(callback) 
        self._tasks.append(task) 
      except Empty as e: 
       pass 
      yield from asyncio.sleep(3)  # sleep so the other tasks can run 


background_worker = BackgroundWorker() 


class Job(object): 
    def __init__(self, idx): 
     super().__init__() 
     self._idx = idx 

    def process(self): 
     background_worker.submit_coro(self._process, self._process_callback) 

    @asyncio.coroutine 
    def _process(self): 
     with (yield from background_worker.limit_simultaneous_processes): 
      print("received processing slot %d" % self._idx) 
      start = datetime.now() 
      yield from asyncio.sleep(2) 
      print("processing %d took %s" % (self._idx, str(datetime.now() - start))) 

    def _process_callback(self, future): 
     print("callback %d triggered" % self._idx) 


def main(): 
    print("starting worker...") 
    background_worker.start() 

    for idx in range(10): 
     download_job = Job(idx) 
     download_job.process() 

    command = None 
    while command != "quit": 
     command = input("enter 'quit' to stop the program: \n") 

    print("stopping...") 
    background_worker.stop() 
    background_worker.join() 


if __name__ == '__main__': 
    main()