TFRecordReader seems very slow and reading multiple streams does not work

My tutorial uses the tfrecord format for the & eval train dataset.

I am testing a reader test, only 8,000 records / second. and io (see iotop command) only 400 KB-500 KB / s.

I am using protopuf cpp version here

https://github.com/tensorflow/tensorflow/blob/master/tensorflow/g3doc/get_started/os_setup.md#protobuf-library-related-issues

If possible, specify a minimal reproducible example (usually we don’t have time to read hundreds of lines of your code)

def read_and_decode(filename_queue): reader = tf.TFRecordReader() _, serialized_example = reader.read(filename_queue) return serialized_example serialized_example = read_and_decode(filename_queue) batch_serialized_example = tf.train.shuffle_batch( [serialized_example], batch_size=batch_size, num_threads=thread_number, capacity=capacity, min_after_dequeue=min_after_dequeue) features = tf.parse_example( batch_serialized_example, features={ "label": tf.FixedLenFeature([], tf.float32), "ids": tf.VarLenFeature(tf.int64), "values": tf.VarLenFeature(tf.float32), }) 

What other solution attempts have you tried?

I am trying to set num_threads in tf.train.shuffle_batch but it does not work.

It seems that when installed in 2 streams, it works with 8000 records / s, when the number of streams increases, it becomes slower. (I remove all operating systems that cost cpus. Just read the data.)

My section is 24 cores.

+7
python tensorflow
source share
3 answers

The problem here is that for each session.run there is a fixed overhead cost, and filling the queue with many small examples in the queue will be slow.

In particular, each session.run is about 100-200 usec, so you can only make about 5k-10k session.run calls per second.

This problem is obvious if you are profiling Python (python -m cProfile), but it's hard to see if you are starting with a timeline profile or a processor profile.

A enqueue_many to use enqueue_many to add things to the queue in a batch. I took your test from https://gist.github.com/ericyue/7705407a88e643f7ab380c6658f641e8 and modified it to insert many elements into the .run call, and this gives .run speedup.

The modification is to modify the tf.batch call as follows:

 if enqueue_many: reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)) queue_batch = [] for i in range(enqueue_many_size): _, serialized_example = reader.read(filename_queue) queue_batch.append(serialized_example) batch_serialized_example = tf.train.shuffle_batch( [queue_batch], batch_size=batch_size, num_threads=thread_number, capacity=capacity, min_after_dequeue=min_after_dequeue, enqueue_many=True) 

For the full source, check here: https://github.com/yaroslavvb/stuff/blob/master/ericyue-slowreader/benchmark.py

It is difficult to optimize it to go much faster, since now most of the time is spent on operations with the queue. Considering a stripped-down version that simply adds integers to the queue, you also get a similar speed, and looking at the timeline, time is wasted on dequeue ops.

enter image description here

Each dequeue op takes about 60 ΞΌs, but an average of 5 starts in parallel, so you get 12 usec at a time. Thus, this means that at best you will get examples <200,000 per second.

+7
source share

Here is a simple acceleration of construction in Yaroslavl:

Tensorflow has a built-in function tf.TFRecordReader.read_up_to , which reads several records in each call to session.run() , thereby removing the overhead caused by several calls.

 enqueue_many_size = SOME_ENQUEUE_MANY_SIZE reader = tf.TFRecordReader(options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)) _, queue_batch = reader.read_up_to(filename_queue, enqueue_many_size) batch_serialized_example = tf.train.shuffle_batch( [queue_batch], batch_size=batch_size, num_threads=thread_number, capacity=capacity, min_after_dequeue=min_after_dequeue, enqueue_many=True) 

As with the answer from Yaroslav, you need to set enqueue_many=True so that the batch function knows that it accepts several entries.

It was very fast in my use case.

+5
source share

Adding to Yaroslav: You can use tf.python_io.tf_record_iterator to repeat the examples to add them to the list, which you can pass to tf.train.shuffle_batch using enqueue_many=true :

 queue_batch = [] for serialized_example in tf.python_io.tf_record_iterator(filename,options = tf.python_io.TFRecordOptions(tf.python_io.TFRecordCompressionType.ZLIB)): queue_batch.append(serialized_example) batch_serialized_example = tf.train.shuffle_batch( [queue_batch], batch_size=batch_size, num_threads=thread_number, capacity=capacity, min_after_dequeue=min_after_dequeue, enqueue_many=True) 

It seems that trying to iterate through examples using reader.read() will result in one reading per batch. those. The nth batch will be batch_num copies of the nth record, not batch_num many unique records.

+1
source share

All Articles