Running tf.RandomShuffleQueue from multiple processes using multiprocessing

I would like to use several processes ( not threads ) to do some preprocessing and paste the results into tf.RandomShuffleQueue, which can be used by my main schedule for training.

Is there any way to do this?

My actual problem

I converted my dataset to TFRecords, broken into 256 shards. I want to start 20 processes using multiprocessing and allow each process a range of fragments. Each process must read the images and then enlarge them and insert them into tf.RandomShuffleQueue , from which input can be provided to the schedule for training.

Some people advised me to pass the inception example in tensorflow . However, this is a completely different situation, because only reading of data fragments is performed by several threads ( not processes ), while preprocessing (for example, an increase) occurs in the main thread.

+5
source share
3 answers

(The goal is to solve your actual problem)

In another topic, someone told you that Python has a global interpreter lock (GIL), and therefore there will be no speed advantages from multi-core processors if you have not used multiple processes.

This probably caused your desire to use multiprocessing .

However, with TF, Python is usually used only for plotting. Actual execution takes place in native code (or GPU), where the GIL does not play any role.

In light of this, I recommend just letting TF use multithreading. This can be controlled using the intra_op_parallelism_threads argument, for example:

 with tf.Session(graph=graph, config=tf.ConfigProto(allow_soft_placement=True, intra_op_parallelism_threads=20)) as sess: # ... 

(Note: if you have, say, a 2-processor 32-core system, intra_op_parallelism_threads=16 may be the best argument, depending on many factors)

+1
source

Comment : TFRecords etching is not that important. I can pass a list of lists containing the range names of pending TFRecord files.

There I have to restart the decision-making process!

Comment I can pass it to Pool.map () as an argument.

Check if multiprocesing.Queue() can handle this.
The results of tensor functions are a tensor Tensor object .
Try the following:

 tensor_object = func(TFRecord) q = multiprocessing.Manager().Queue() q.put(tensor_object) data = q.get() print(data) 

Comment : how can I make sure that all processes in the queue are in the same queue?

It is easier to make enqueue results from Pool.map(... after the process .
Alternatively, we can enqueue parallel, queueing data from all processes .

But it depends on the pickleabel data, as described above.


For instance:

 import multiprocessing as mp def func(filename): TFRecord = read(filename) tensor_obj = tf.func(TFRecord) return tensor_obj def main_Tensor(tensor_objs): tf = # ... instantiat Tensor Session rsq = tf.RandomShuffleQueue(...) for t in tensor_objs: rsq.enqueue(t) if __name__ == '__main__': sharded_TFRecords = ['file1', 'file2'] with mp.Pool(20) as pool: tensor_objs = pool.map(func, sharded_TFRecords) pool.join() main_Tensor(tensor_objs) 
+1
source

It seems that the recommended way to start TF with multiprocessing is to create a separate tf.Session for each child, since sharing it between processes is not feasible.

You can look at this example , I hope this helps.

[EDIT: old answer]

You can use multiprocessing.Pool and rely on your callback mechanism to put the results in tf.RandomShuffleQueue as soon as they are ready.

Here is a very simple example of how to do this.

 from multiprocessing import Pool class Processor(object): def __init__(self, random_shuffle_queue): self.queue = random_shuffle_queue self.pool = Pool() def schedule_task(self, task): self.pool.apply_async(processing_function, args=[task], callback=self.task_done) def task_done(self, results): self.queue.enqueue(results) 

This assumes Python 2, for Python 3 I recommend using concurrent.futures.ProcessPoolExecutor .

0
source

All Articles