更改多处理缓冲区大小.Queue

问题描述:

所以我有一个生产者和消费者通过无限大小的队列连接的系统,但如果消费者重复调用get直到抛出Empty异常,它不会清除队列。更改多处理缓冲区大小.Queue

我相信这是因为消费者端的队列中的线程会将对象串行化到套接字中,一旦套接字缓冲区已满就会被阻塞,因此它会等待,直到缓冲区有空间为止,但是,它是对于消费者来说可能会得到“太快”,所以它认为队列是空的,事实上另一方的线程有更多的数据要发送,但是不能足够快地将其串行化,以防止套接字对消费者显示为空。

我相信如果我可以改变底层套接字缓冲区大小(我是基于Windows的),这个问题会得到缓解。至于我可以看到我需要做的是一样的东西:

import multiprocessing.connections as conns 
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows 
import multiprocessing.Queue as q 

如果我做以上,这是否意味着multirprocssing初始化队列时,它会用我在设置了新的缓冲区大小我已经导入的multiprocessing.connections的版本?那是对的吗?

另外我相信,这只会影响Windows,因为BUFSIZE不在Linux机器上使用,因为它们的所有套接字默认设置为60千字节?

有没有人试过这个?这会对windows有副作用吗?那么Windows上套接字缓冲区大小的基本限制是什么?

===================要演示的代码示例===================

# import multiprocessing.connection as conn 
# conn.BUFSIZE = 2 ** 19 
import sys 
import multiprocessing as mp 
from Queue import Empty 
from time import sleep 

total_length = 10**8 

def supplier(q): 
    print "Starting feeder" 
    for i in range(total_length) : 
     q.put(i) 


if __name__=="__main__": 

    queue = mp.Queue() 

    p = mp.Process(target=supplier, args=(queue,)) 

    p.start() 

    sleep(120) 

    returned = [] 
    while True : 
     try : 
      returned.append(queue.get(block=False)) 
     except Empty : 
      break 

    print len(returned) 
    print len(returned) == total_length 

    p.terminate() 
    sys.exit() 

此示例在Windows上运行时,通常只会从队列中拉出大约160,000个项目,因为主线程可以更快地清空缓冲区,而不是由供应商重新填充,并最终尝试从队列中取出缓冲区为空并报告它为空。

你可以理论上通过拥有更大的缓冲区大小来改善这个问题。我相信,在Windows系统上,两条线将增加管道的默认缓冲区大小。

如果你评论他们,那么这个脚本将在退出之前提取更多的数据,因为它有更高的数据。我的主要问题是: 1)这是否确实有效。 2)有没有办法让这段代码在windows和linux中使用相同大小的底层缓冲区?3)是否有任何意想不到的副作用,即为管道设置较大的缓冲区大小。

我知道一般情况下,没有办法知道你是否从队列中取出所有数据( - 假设供应商永久运行并且数据非常不均匀),但我正在寻找在尽力而为的基础上改进。

+1

我想一个演示这个问题的小例子会很有用。 – pradyunsg

更新:

的Windows管谁需要在将来人们

有用的链接(该链接由OP提供,phil_20686): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx

原著:

只有当平台是win32时,BUFSIZE才能工作。

multiprocessing.Queue构建在Pipe的顶部,如果更改了BUFSIZE,则生成的队列将使用更新后的值。见下图:

class Queue(object): 

    def __init__(self, maxsize=0): 
     if maxsize <= 0: 
      maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX 
     self._maxsize = maxsize 
     self._reader, self._writer = Pipe(duplex=False) 

当平台是Win32,管代码将调用下面的代码:

def Pipe(duplex=True): 
    ''' 
    Returns pair of connection objects at either end of a pipe 
    ''' 
    address = arbitrary_address('AF_PIPE') 
    if duplex: 
     openmode = win32.PIPE_ACCESS_DUPLEX 
     access = win32.GENERIC_READ | win32.GENERIC_WRITE 
     obsize, ibsize = BUFSIZE, BUFSIZE 
    else: 
     openmode = win32.PIPE_ACCESS_INBOUND 
     access = win32.GENERIC_WRITE 
     obsize, ibsize = 0, BUFSIZE 

    h1 = win32.CreateNamedPipe(
     address, openmode, 
     win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | 
     win32.PIPE_WAIT, 
     1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL 
     ) 

你可以看到,当duplex为False,outbuffer大小为0,inbuffer大小BUFSIZE。

inbuffer是要为输入缓冲区保留的字节数。 2 ** 16 = 65536,这是可以在一次操作中写入的最大字节数量,没有阻塞,但缓冲区大小的容量因系统而异,即使它在同一个系统上也不相同,因此很难说侧面当您将Pipe设置为最大数量时会产生影响。

+0

有关Windows pips行为的一些重要/良好信息,请参阅以下部分的注释部分:https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx如果你添加一个链接/总结,我会接受这个答案。 –

+0

@ phil_20686我更新了我的答案,并在那里发布您提供的链接。 – haifzhan

+0

已经接受了这个答案,但FYI堆栈溢出礼仪总是总结链接内容,以避免网站在几年内移动等链接腐烂。谢谢 –