Buffer resizing during multiprocessing.

Thus, I have a system with a manufacturer, and the consumer connects to the queue of unlimited size, but if the consumer repeatedly calls get until the empty exception is thrown, it will not clear the queue.

I believe this is due to the fact that the flow in the queue on the consumer side, which serializes objects to the socket, is blocked after the socket buffer is full, and therefore it waits until the buffer takes place, however, this is possible so that the consumer can call "too fast", and therefore he thinks the queue is empty when in fact the stream on the other hand has much more data to send, but simply cannot serialize it fast enough to prevent the socket from appearing to the user.

I believe this problem would be alleviated if I could change the buffer size on the underlying socket (I am windows based). As far as I can see, what I need to do is something like:

import multiprocessing.connections as conns conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows import multiprocessing.Queue as q 

If I do the above, does this mean that when multirprocssing initializes the queue, it will use the new buffer size that I set in the version of multiprocessor connections that I already imported? It is right?

I also believe that this will only affect windows, since BUFSIZE is not used on Linux machines, because by default all their sockets are set to 60 kilobytes?

Has anyone tried this before? Will it have side effects on the windows? And what are the main limitations on socket buffer sizes on windows?

==================== Sample code for demonstration =====================

 # import multiprocessing.connection as conn # conn.BUFSIZE = 2 ** 19 import sys import multiprocessing as mp from Queue import Empty from time import sleep total_length = 10**8 def supplier(q): print "Starting feeder" for i in range(total_length) : q.put(i) if __name__=="__main__": queue = mp.Queue() p = mp.Process(target=supplier, args=(queue,)) p.start() sleep(120) returned = [] while True : try : returned.append(queue.get(block=False)) except Empty : break print len(returned) print len(returned) == total_length p.terminate() sys.exit() 

This example, when it runs in windows, will usually pull only about 160,000 items from the queue, because the main thread can free the buffer faster than it refills the supplier, and ultimately it tries to pull out of the queue when the buffer is empty and reports that it is empty.

You can theoretically improve this problem by having a larger buffer size. The two lines at the top, I suppose, on a Windows system, increase the default buffer size for the channel.

If you comment on them, then this script will pull out more data before it exits, as it is much higher. My main questions: 1) It really works. 2) Is there a way to force this code to use the same base buffer size in windows and linux 3) Are there any unexpected side effects from setting up large buffer sizes for pipes.

I know that in general there is no way to find out if you pulled all the data out of the queue (- given that the provider works constantly and produces data very unevenly), but I'm looking for ways to improve that on the best basis.

+8
python python-multiprocessing
source share
1 answer

Update:

A useful Windows Pipe link for people who need it in the future (link provided by OP, phil_20686 ): https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85). aspx

Origianl:

BUFSIZE only works when the platform is win32.

multiprocessing.Queue is built on top of the Pipe, if you change BUFSIZE, the Queue you created will use the updated value. see below:

 class Queue(object): def __init__(self, maxsize=0): if maxsize <= 0: maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX self._maxsize = maxsize self._reader, self._writer = Pipe(duplex=False) 

When the platform is win32, the pipe code will call the following code:

 def Pipe(duplex=True): ''' Returns pair of connection objects at either end of a pipe ''' address = arbitrary_address('AF_PIPE') if duplex: openmode = win32.PIPE_ACCESS_DUPLEX access = win32.GENERIC_READ | win32.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: openmode = win32.PIPE_ACCESS_INBOUND access = win32.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE h1 = win32.CreateNamedPipe( address, openmode, win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE | win32.PIPE_WAIT, 1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL ) 

You can see that when duplex is False, the size of the external buffer is 0, and the size of the buffer is BUFSIZE.

inbuffer is the number of bytes reserved for the input buffer. 2 ** 16 = 65536, the maximum amount of bytes can be written in one operation without blocking, but the buffer size capacity varies in different systems, it changes even in the same system, so it's hard to say which side the effect is when you set the maximum pipe size.

+6
source share

All Articles