There seems to be a lot of open-ended questions about using TensorFlow there, and some tenorflow developer here is active on stackoverflow. Here's another question. I want to generate training data "on the fly" in other threads using numpy or something that does not belong to TensorFlow. But I do not want to recycle the entire TensorFlow source again and again. I'm just waiting for another way. "tf.py_func" seems to be a workaround. But
This is due to [like-pre-fetch-data-use-a-custom-python-function-in-tensorflow] [1]
Here is my MnWE (minmal-not-working-example):
Update (now there is a way out, but the race condition):
import numpy as np
import tensorflow as tf
import threading
import os
import glob
import random
import matplotlib.pyplot as plt
IMAGE_ROOT = "/graphics/projects/data/mscoco2014/data/images/"
files = ["train/COCO_train2014_000000178763.jpg",
"train/COCO_train2014_000000543841.jpg",
"train/COCO_train2014_000000364433.jpg",
"train/COCO_train2014_000000091123.jpg",
"train/COCO_train2014_000000498916.jpg",
"train/COCO_train2014_000000429865.jpg",
"train/COCO_train2014_000000400199.jpg",
"train/COCO_train2014_000000230367.jpg",
"train/COCO_train2014_000000281214.jpg",
"train/COCO_train2014_000000041920.jpg"];
def pre_process(data):
"""Pre-process image with arbitrary functions
does not only use tf.functions, but arbitrary
"""
return data[0:81,0,0].flatten()
def populate_queue(sess, thread_pool, qData_enqueue_op ):
"""Put stuff into the data queue
is responsible such that there is alwaays data to process
for tensorflow
"""
while not thread_pool.should_stop():
idx = random.randint(0,len(files))-1
data = np.array(plt.imread(os.path.join(IMAGE_ROOT,files[idx])))
data = pre_process(data)
sess.run(qData_enqueue_op, feed_dict={data_input: data})
qData = tf.FIFOQueue(100, [tf.float32], shapes=[[9,9]])
data_input = tf.placeholder(tf.float32)
qData_enqueue_op = qData.enqueue([tf.reshape(data_input,[9,9])])
qData_dequeue_op = qData.dequeue()
init_op = tf.initialize_all_variables()
with tf.Session() as sess:
sess.run(init_op)
thread_pool = tf.train.Coordinator()
t = threading.Thread(target=populate_queue, args=(sess, thread_pool, qData_enqueue_op))
t.start()
try:
while not thread_pool.should_stop():
print "iter"
batch = sess.run([qData_dequeue_op])
print batch
except tf.errors.OutOfRangeError:
print('Done training -- no more data')
finally:
thread_pool.request_stop()
thread_pool.request_stop()
thread_pool.join([t])
I basically have three questions:
- ? ( ). . " ..."
- , ?
- tf.Record , ?