Turns out it was easier than I thought! After the support of Yu.F. Sebastian, here is my crack in response:
import time import ctypes import logging import Queue import multiprocessing as mp import numpy as np info = mp.get_logger().info def main(): logger = mp.log_to_stderr() logger.setLevel(logging.INFO) data_pipeline = Image_Data_Pipeline( num_data_buffers=5, buffer_shape=(60, 256, 512)) start = time.clock() data_pipeline.load_buffers(data_pipeline.num_data_buffers) end = time.clock() data_pipeline.close() print "Elapsed time:", end-start class Image_Data_Pipeline: def __init__(self, num_data_buffers, buffer_shape): """ Allocate a bunch of 16-bit buffers for image data """ self.num_data_buffers = num_data_buffers self.buffer_shape = buffer_shape pix_per_buf = np.prod(buffer_shape) self.data_buffers = [mp.Array(ctypes.c_uint16, pix_per_buf) for b in range(num_data_buffers)] self.idle_data_buffers = range(num_data_buffers) """ Launch the child processes that make up the pipeline """ self.camera = Data_Pipeline_Process( target=child_process, name='Camera', data_buffers=self.data_buffers, buffer_shape=buffer_shape) self.display_prep = Data_Pipeline_Process( target=child_process, name='Display Prep', data_buffers=self.data_buffers, buffer_shape=buffer_shape, input_queue=self.camera.output_queue) self.file_saving = Data_Pipeline_Process( target=child_process, name='File Saving', data_buffers=self.data_buffers, buffer_shape=buffer_shape, input_queue=self.display_prep.output_queue) return None def load_buffers(self, N, timeout=0): """ Feed the pipe! """ for i in range(N): self.camera.input_queue.put(self.idle_data_buffers.pop()) """ Wait for the buffers to idle. Here would be a fine place to feed them back to the pipeline, too. """ while True: try: self.idle_data_buffers.append( self.file_saving.output_queue.get_nowait()) info("Buffer %i idle"%(self.idle_data_buffers[-1])) except Queue.Empty: time.sleep(0.01) if len(self.idle_data_buffers) >= self.num_data_buffers: break return None def close(self): self.camera.input_queue.put(None) self.display_prep.input_queue.put(None) self.file_saving.input_queue.put(None) self.camera.child.join() self.display_prep.child.join() self.file_saving.child.join() class Data_Pipeline_Process: def __init__( self, target, name, data_buffers, buffer_shape, input_queue=None, output_queue=None, ): if input_queue is None: self.input_queue = mp.Queue() else: self.input_queue = input_queue if output_queue is None: self.output_queue = mp.Queue() else: self.output_queue = output_queue self.command_pipe = mp.Pipe()
Background: This is the skeleton of a data collection pipeline. I want to receive data at a very high speed, process it for display on the screen and save it to disk. I donβt want the show speed or disk speed to limit the receipt, so I believe that using separate child processes in separate processing cycles is appropriate.
Here's a typical output from a dummy program:
C:\code\instrument_control>c:\Python27\python.exe test.py [INFO/MainProcess] allocating a new mmap of length 15728640 [INFO/MainProcess] allocating a new mmap of length 15728640 [INFO/MainProcess] allocating a new mmap of length 15728640 [INFO/MainProcess] allocating a new mmap of length 15728640 [INFO/MainProcess] allocating a new mmap of length 15728640 [[INFO/Camera] child process calling self.run() INFO/Display Prep] child process calling self.run() [INFO/Camera] start buffer 4 [INFO/File Saving] child process calling self.run() [INFO/Camera] end buffer 4 [INFO/Camera] start buffer 3 [INFO/Camera] end buffer 3 [INFO/Camera] start buffer 2 [INFO/Display Prep] start buffer 4 [INFO/Camera] end buffer 2 [INFO/Camera] start buffer 1 [INFO/Camera] end buffer 1 [INFO/Camera] start buffer 0 [INFO/Camera] end buffer 0 [INFO/Display Prep] end buffer 4 [INFO/Display Prep] start buffer 3 [INFO/File Saving] start buffer 4 [INFO/Display Prep] end buffer 3 [INFO/Display Prep] start buffer 2 [INFO/File Saving] end buffer 4 [INFO/File Saving] start buffer 3 [INFO/MainProcess] Buffer 4 idle [INFO/Display Prep] end buffer 2 [INFO/Display Prep] start buffer 1 [INFO/File Saving] end buffer 3 [INFO/File Saving] start buffer 2 [INFO/MainProcess] Buffer 3 idle [INFO/Display Prep] end buffer 1 [INFO/Display Prep] start buffer 0 [INFO/File Saving] end buffer 2 [INFO/File Saving] start buffer 1 [[INFO/MainProcess] Buffer 2 idle INFO/Display Prep] end buffer 0 [INFO/File Saving] end buffer 1 [INFO/File Saving] start buffer 0 [INFO/MainProcess] Buffer 1 idle [INFO/File Saving] end buffer 0 [INFO/MainProcess] Buffer 0 idle [INFO/Camera] process shutting down [INFO/Camera] process exiting with exitcode 0 [INFO/Display Prep] process shutting down [INFO/File Saving] process shutting down [INFO/Display Prep] process exiting with exitcode 0 [INFO/File Saving] process exiting with exitcode 0 Elapsed time: 0.263240348548 [INFO/MainProcess] process shutting down C:\code\instrument_control>
It seems that I am doing what I want: the data is processed for display and saved to disk, without interfering with the reception speed.