Multiprocessing.Pipe is even slower than multiprocessing. Queue?

I tried to compare the Pipe speed over Queue with the multiprocessing package. T thought Pipe would be faster since Queue uses Pipe internally.

Strange, Pipe slower than Queue when sending a large numpy array. What am I missing here?

Trumpet:

 import sys import time from multiprocessing import Process, Pipe import numpy as np NUM = 1000 def worker(conn): for task_nbr in range(NUM): conn.send(np.random.rand(400, 400, 3)) sys.exit(1) def main(): parent_conn, child_conn = Pipe(duplex=False) Process(target=worker, args=(child_conn,)).start() for num in range(NUM): message = parent_conn.recv() if __name__ == "__main__": start_time = time.time() main() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print "Duration: %s" % duration print "Messages Per Second: %s" % msg_per_sec # Took 10.86s. 

Turn

 import sys import time from multiprocessing import Process from multiprocessing import Queue import numpy as np NUM = 1000 def worker(q): for task_nbr in range(NUM): q.put(np.random.rand(400, 400, 3)) sys.exit(1) def main(): recv_q = Queue() Process(target=worker, args=(recv_q,)).start() for num in range(NUM): message = recv_q.get() if __name__ == "__main__": start_time = time.time() main() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print "Duration: %s" % duration print "Messages Per Second: %s" % msg_per_sec # Took 6.86s. 
+9
python
source share
3 answers

You can do an experiment and put the following in your pipe code above ..

 def worker(conn): for task_nbr in range(NUM): data = np.random.rand(400, 400, 3) sys.exit(1) def main(): parent_conn, child_conn = Pipe(duplex=False) p = Process(target=worker, args=(child_conn,)) p.start() p.join() 

This gives you the time it takes to create data for your test. On my system, it takes about 2.9 seconds.

Under the hood, the queue object implements a buffer and streaming. The thread is still in the same process, but using it, data creation should not wait for the completion of the system I / O. It effectively parallelizes operations. Try using Pipe code modified with some simple streaming (disclaimer, the code here is for testing only and is not ready for production).

 import sys import time import threading from multiprocessing import Process, Pipe, Lock import numpy as np import copy NUM = 1000 def worker(conn): _conn = conn _buf = [] _wlock = Lock() _sentinel = object() # signal that we're done def thread_worker(): while 1: if _buf: _wlock.acquire() obj = _buf.pop(0) if obj is _sentinel: return _conn.send(data) _wlock.release() t = threading.Thread(target=thread_worker) t.start() for task_nbr in range(NUM): data = np.random.rand(400, 400, 3) data[0][0][0] = task_nbr # just for integrity check _wlock.acquire() _buf.append(data) _wlock.release() _wlock.acquire() _buf.append(_sentinel) _wlock.release() t.join() sys.exit(1) def main(): parent_conn, child_conn = Pipe(duplex=False) Process(target=worker, args=(child_conn,)).start() for num in range(NUM): message = parent_conn.recv() assert num == message[0][0][0], 'Data was corrupted' if __name__ == "__main__": start_time = time.time() main() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print "Duration: %s" % duration print "Messages Per Second: %s" % msg_per_sec 

On my machine, it takes 3.4 seconds to run, which is almost exactly the same as the Queue code above.

From https://docs.python.org/2/library/threading.html

In Cython, because of the Global Interpreter Lock, only one thread can execute Python code at once ... however, streaming is still the right model if you want to run multiple tasks at once with I / O binding.

The differences between queue and pipe are certainly an odd part of the implementation until you dig into it a bit.

+8
source share

I assume that with the print command you are using Python2. However, weird behavior cannot be replicated using Python3, where Pipe is actually faster than Queue .

 import sys import time from multiprocessing import Process, Pipe, Queue import numpy as np NUM = 20000 def worker_pipe(conn): for task_nbr in range(NUM): conn.send(np.random.rand(40, 40, 3)) sys.exit(1) def main_pipe(): parent_conn, child_conn = Pipe(duplex=False) Process(target=worker_pipe, args=(child_conn,)).start() for num in range(NUM): message = parent_conn.recv() def pipe_test(): start_time = time.time() main_pipe() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print("Pipe") print("Duration: " + str(duration)) print("Messages Per Second: " + str(msg_per_sec)) def worker_queue(q): for task_nbr in range(NUM): q.put(np.random.rand(40, 40, 3)) sys.exit(1) def main_queue(): recv_q = Queue() Process(target=worker_queue, args=(recv_q,)).start() for num in range(NUM): message = recv_q.get() def queue_test(): start_time = time.time() main_queue() end_time = time.time() duration = end_time - start_time msg_per_sec = NUM / duration print("Queue") print("Duration: " + str(duration)) print("Messages Per Second: " + str(msg_per_sec)) if __name__ == "__main__": for i in range(2): queue_test() pipe_test() 

Results in:

 Queue Duration: 3.44321894646 Messages Per Second: 5808.51822408 Pipe Duration: 2.69065594673 Messages Per Second: 7433.13169575 Queue Duration: 3.45295906067 Messages Per Second: 5792.13354361 Pipe Duration: 2.78426194191 Messages Per Second: 7183.23218766 ------------------ (program exited with code: 0) Press return to continue 
+5
source share

On my system, Pipe(duplex=True) slower than Pipe(duplex=False) . I think the reasons described by others are more relevant to your situation, but for those looking for performance, there is a parallel comparison here:

 from time import time from multiprocessing import Process, Queue, Pipe n = 1000 buffer = b'\0' * (1000*1000) # 1 megabyte def print_elapsed(name, start): elapsed = time() - start spi = elapsed / n ips = n / elapsed print(f'{name}: {spi*1000:.3f} ms/item, {ips:.0f} item/sec') def producer(q): start = time() for i in range(n): q.put(buffer) print_elapsed('producer', start) def consumer(q): start = time() for i in range(n): out = q.get() print_elapsed('consumer', start) class PipeQueue(): def __init__(self, **kwargs): self.out_pipe, self.in_pipe = Pipe(**kwargs) def put(self, item): self.in_pipe.send_bytes(item) def get(self): return self.out_pipe.recv_bytes() def close(self): self.out_pipe.close() self.in_pipe.close() print('duplex=True') q = PipeQueue(duplex=True) producer_process = Process(target=producer, args=(q,)) consumer_process = Process(target=consumer, args=(q,)) consumer_process.start() producer_process.start() consumer_process.join() producer_process.join() q.close() print('duplex=False') q = PipeQueue(duplex=False) producer_process = Process(target=producer, args=(q,)) consumer_process = Process(target=consumer, args=(q,)) consumer_process.start() producer_process.start() consumer_process.join() producer_process.join() q.close() 

Results:

 duplex=True consumer: 0.301 ms/item, 3317 item/sec producer: 0.298 ms/item, 3358 item/sec duplex=False consumer: 0.673 ms/item, 1486 item/sec producer: 0.669 ms/item, 1494 item/sec 

I think this should come down to CPython using os.pipe versus socket.socketpair , but I'm not sure.

0
source share

All Articles