You can do an experiment and put the following in your pipe code above ..
def worker(conn): for task_nbr in range(NUM): data = np.random.rand(400, 400, 3) sys.exit(1) def main(): parent_conn, child_conn = Pipe(duplex=False) p = Process(target=worker, args=(child_conn,)) p.start() p.join()
This gives you the time it takes to create data for your test. On my system, it takes about 2.9 seconds.
Under the hood, the queue object implements a buffer and streaming. The thread is still in the same process, but using it, data creation should not wait for the completion of the system I / O. It effectively parallelizes operations. Try using Pipe code modified with some simple streaming (disclaimer, the code here is for testing only and is not ready for production).
import sys import time import threading from multiprocessing import Process, Pipe, Lock import numpy as np import copy NUM = 1000 def worker(conn): _conn = conn _buf = [] _wlock = Lock() _sentinel = object()
On my machine, it takes 3.4 seconds to run, which is almost exactly the same as the Queue code above.
From https://docs.python.org/2/library/threading.html
In Cython, because of the Global Interpreter Lock, only one thread can execute Python code at once ... however, streaming is still the right model if you want to run multiple tasks at once with I / O binding.
The differences between queue and pipe are certainly an odd part of the implementation until you dig into it a bit.
bivouac0
source share