生产者/消费者多生产者和单个消费者书面文件的Python
我的要求是类似Multiple producers, single consumer 除非我需要它在Python生产者/消费者多生产者和单个消费者书面文件的Python
我已经创建了一个派生5个并发进程对应用程序(我的利用多重库)这5个过程独立地以字典格式生成输出。
早些时候我打印输出到控制台,但现在想输出到一个文件。
我正在寻找一种模式,我的所有5个生产者都写入支持并发写入的共享队列。
而单个消费者进程也可以访问此队列并从中消耗数据,并且可以在生产者完成任务时等待没有数据写入和终止。
感谢Anuj
,因为你已经在使用多进程,所有你需要的是Queue类
和样品(从队列文档修改)
from multiprocessing import Process, Queue
def child(q, url):
result = my_process(url)
q.put(result)
if __name__ == '__main__':
q = Queue()
urls = [...]
children = []
for url in urls:
p = Process(target=child, args=(q,url))
p.start()
children.append(p)
for p in children:
p.join()
print q.get() #or write to file (might not be the answer from this child)
编辑: 对于每个孩子可多选将最后一个for循环替换为:
while 0 != multiprocessing.active_children():
print q.get()
我实现了在Python这种模式在一个主管进程生成一堆的进程,然后消耗从所有这些日志消息,并写入这些日志信息到一个日志文件。
基本上,我用execve来描述每个进程的stderr被连接到PTY的进程。然后我的主管打开了所有的主人PTY,并用select
从循环中读取他们。 PTY的行是由tty行规定的行缓冲的,你可以在它们上使用readline来进行non = blocking读取。我相信我也使用了PTY的fcntl来设置os.O_NONBLOCK。
工程很好。唯一的麻烦是,当你从选择轮询返回时,你需要每pty读多条行,否则你可能会失去输出(假设你有一些收获子进程并重新启动)。通过阅读每个PTY上可用的所有行,您还可以避免跟踪与其他消息交错。
如果您确实需要发送对象而不是文本行,那么您最好使用真正的pub-sub消息系统,如AMQP或ZeroMQ。 AMQP是一个比你需要的更大的锤子,所以如果你希望构建许多类似的应用程序,只检查一下。否则,请尝试更简单的0MQ http://www.zeromq.org/intro:read-the-manual,它只是一个使套接字更易于使用的消息传递库。
您也可以在合并过程中执行一些行缓冲,insertin根据需要将g横幅放入日志输出中,这样可以保持输出顺畅。 –
感谢Micheal,在我的情况下,我没有真正的日志,我的每个进程都会被赋予一个URL并使用selenium webdriver API来浏览URL,一旦完成,我将以dict格式从DOM中获取某些数据。每个进程都会从输入url生成一个字典,并将其放入队列中,插入顺序并不重要。在写入时,单个使用者将尝试从队列和块中获取5个对象(如果不可用)并将写入文件。如果你也可以包含一些代码,你的回答很有帮助。 –
不应该这个childs.append(child)是childs.append(p) –
请让我知道当我从队列中获取对象时的行为让我们考虑当队列很大时的情况,我想连续轮询结果队列,当它为空时等待它。 –
@anuj singh - 编辑,谢谢 –