How to * actually * read CSV data in TensorFlow?

I am relatively new to the TensorFlow world and quite puzzled by how you actually read the CSV data into a usable example of TensorFlow tag tensors. The example from the TensorFlow CSV reading tutorial is pretty fragmented, and you get only part of the ability to train on CSV data.

Here, my code that I put together was based on this CSV tutorial:

from __future__ import print_function import tensorflow as tf def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 filename = "csv_test_data.csv" # setup text reader file_length = file_len(filename) filename_queue = tf.train.string_input_producer([filename]) reader = tf.TextLineReader(skip_header_lines=1) _, csv_row = reader.read(filename_queue) # setup CSV decoding record_defaults = [[0],[0],[0],[0],[0]] col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults) # turn features back into a tensor features = tf.stack([col1,col2,col3,col4]) print("loading, " + str(file_length) + " line(s)\n") with tf.Session() as sess: tf.initialize_all_variables().run() # start populating filename queue coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) for i in range(file_length): # retrieve a single instance example, label = sess.run([features, col5]) print(example, label) coord.request_stop() coord.join(threads) print("\ndone loading") 

And here is a short example from the CSV file that I upload - pretty basic data - 4 function columns and 1 shortcut column:

 0,0,0,0,0 0,15,0,0,0 0,30,0,0,0 0,45,0,0,0 

All of the above code prints each example from a CSV file, one after another , which, although good, is pretty poor for learning.

What I'm struggling with is how you actually turn these individual examples, downloaded one by one, into a set of training materials. For example, here is a laptop that I worked on in the Udacity Deep Learning course. I basically want to take the CSV data that I upload and flip it to something like train_dataset and train_labels :

 def reformat(dataset, labels): dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32) # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...] labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32) return dataset, labels train_dataset, train_labels = reformat(train_dataset, train_labels) valid_dataset, valid_labels = reformat(valid_dataset, valid_labels) test_dataset, test_labels = reformat(test_dataset, test_labels) print('Training set', train_dataset.shape, train_labels.shape) print('Validation set', valid_dataset.shape, valid_labels.shape) print('Test set', test_dataset.shape, test_labels.shape) 

I tried using tf.train.shuffle_batch like this, but it just inexplicably hangs:

  for i in range(file_length): # retrieve a single instance example, label = sess.run([features, colRelevant]) example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000) print(example, label) 

So, to summarize, here are my questions:

  • What am I missing in this process?
    • There seems to be some kind of key intuition that I don’t know about how to properly build the input pipeline.
  • Is there a way to avoid having to know the length of the CSV file?
    • It feels rather inconvenient to know the number of lines you want to process (the line for in range(file_length) above)



Edit: As soon as Yaroslav pointed out that I was probably mixing imperative and plotting details here, he became clearer. I managed to collect the following code, which, I think, is closer to what was usually done when preparing the model from CSV (with the exception of any model training code):

 from __future__ import print_function import numpy as np import tensorflow as tf import math as math import argparse parser = argparse.ArgumentParser() parser.add_argument('dataset') args = parser.parse_args() def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 def read_from_csv(filename_queue): reader = tf.TextLineReader(skip_header_lines=1) _, csv_row = reader.read(filename_queue) record_defaults = [[0],[0],[0],[0],[0]] colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults) features = tf.stack([colHour,colQuarter,colAction,colUser]) label = tf.stack([colLabel]) return features, label def input_pipeline(batch_size, num_epochs=None): filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True) example, label = read_from_csv(filename_queue) min_after_dequeue = 10000 capacity = min_after_dequeue + 3 * batch_size example_batch, label_batch = tf.train.shuffle_batch( [example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch file_length = file_len(args.dataset) - 1 examples, labels = input_pipeline(file_length, 1) with tf.Session() as sess: tf.initialize_all_variables().run() # start populating filename queue coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) try: while not coord.should_stop(): example_batch, label_batch = sess.run([examples, labels]) print(example_batch) except tf.errors.OutOfRangeError: print('Done training, epoch reached') finally: coord.request_stop() coord.join(threads) 
+76
python csv tensorflow
May 7 '16 at 17:57
source share
4 answers

I think you are mixing imperative and graphic details. The tf.train.shuffle_batch operation creates a new node queue, and one node can be used to process the entire data set. Therefore, I think that you hung because you created a chain of shuffle_batch queues in a for loop and did not start queues for them.

The normal use of the conveyor used is as follows:

  • Add nodes as shuffle_batch to enter the pipeline
  • (optional to prevent inadvertent schedule changes) finalize the schedule

--- The end of plotting, the beginning of imperative programming -

  1. tf.start_queue_runners
  2. while(True): session.run()

To be more scalable (to avoid Python GIL), you could generate all your data using the TensorFlow pipeline. However, if performance is not critical, you can connect the numpy array to the input pipeline using slice_input_producer. Here is an example with some Print nodes to see what happens (messages in Print go to stdout when the node is executed)

 tf.reset_default_graph() num_examples = 5 num_features = 2 data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features)) print data (data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False) data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ") data_batch = tf.batch([data_node_debug], batch_size=2) data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ") sess = tf.InteractiveSession() sess.run(tf.initialize_all_variables()) tf.get_default_graph().finalize() tf.start_queue_runners() try: while True: print sess.run(data_batch_debug) except tf.errors.OutOfRangeError as e: print "No more inputs." 

You should see something like this.

 [[0 1] [2 3] [4 5] [6 7] [8 9]] [[0 1] [2 3]] [[4 5] [6 7]] No more inputs. 

The numbers "8, 9" did not fill the entire lot, so they were not received. In addition, tf.Print printed on sys.stdout, so they are displayed separately in the terminal for me.

PS: the minimum batch connection to a manually initialized queue is in github issue 2193

In addition, for debugging purposes, you can set the timeout to your session so that your IPython laptop does not hang in empty queues. I use this helper function for my sessions

 def create_session(): config = tf.ConfigProto(log_device_placement=True) config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM config.operation_timeout_in_ms=60000 # terminate on long hangs # create interactive session to register a default session sess = tf.InteractiveSession("", config=config) return sess 

Scalability Notes:

  • tf.constant inserts a copy of your data into the graph. There is a fundamental limit of 2 GB in the size of the Graph definition, so there is an upper limit in the size of the data
  • You can get around this limitation by using v=tf.Variable and save the data there by running v.assign_op with tf.placeholder on the right side and feeding the numpy array to placeholder ( feed_dict )
  • This still creates two copies of the data, so to save memory, you can create your own version of slice_input_producer , which works with numpy arrays, and download lines one at a time with feed_dict
+21
May 7 '16 at 20:03
source share

Or you can try this, the code loads the Iris dataset into a tensor stream using pandas and numpy, and a simple single output of neurons is printed in the session. Hope this helps in a basic understanding .... [I have not added a path to the bright decoding shortcuts].

 import tensorflow as tf import numpy import pandas as pd df=pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [0,1,2,3,4],skiprows = [0],header=None) d = df.values l = pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [5] ,header=None) labels = l.values data = numpy.float32(d) labels = numpy.array(l,'str') #print data, labels #tensorflow x = tf.placeholder(tf.float32,shape=(150,5)) x = data w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32) y = tf.nn.softmax(tf.matmul(w,x)) with tf.Session() as sess: print sess.run(y) 
+13
Jan 06 '17 at 20:54 on
source share

You can use the latest tf.data API:

 dataset = tf.contrib.data.make_csv_dataset(filepath) iterator = dataset.make_initializable_iterator() columns = iterator.get_next() with tf.Session() as sess: sess.run([iteator.initializer]) 
+2
Sep 03 '18 at 13:52
source share

If someone came here to find an easy way to read absolutely large and fragmented CSV files in the tf.estimator API, then please see below my code

 CSV_COLUMNS = ['ID','text','class'] LABEL_COLUMN = 'class' DEFAULTS = [['x'],['no'],[0]] #Default values def read_dataset(filename, mode, batch_size = 512): def _input_fn(v_test=False): # def decode_csv(value_column): # columns = tf.decode_csv(value_column, record_defaults = DEFAULTS) # features = dict(zip(CSV_COLUMNS, columns)) # label = features.pop(LABEL_COLUMN) # return add_engineered(features), label # Create list of files that match pattern file_list = tf.gfile.Glob(filename) # Create dataset from file list #dataset = tf.data.TextLineDataset(file_list).map(decode_csv) dataset = tf.contrib.data.make_csv_dataset(file_list, batch_size=batch_size, column_names=CSV_COLUMNS, column_defaults=DEFAULTS, label_name=LABEL_COLUMN) if mode == tf.estimator.ModeKeys.TRAIN: num_epochs = None # indefinitely dataset = dataset.shuffle(buffer_size = 10 * batch_size) else: num_epochs = 1 # end-of-input after this batch_features, batch_labels = dataset.make_one_shot_iterator().get_next() #Begins - Uncomment for testing only -----------------------------------------------------< if v_test == True: with tf.Session() as sess: print(sess.run(batch_features)) #End - Uncomment for testing only -----------------------------------------------------< return add_engineered(batch_features), batch_labels return _input_fn 

Example usage in TF.estimator:

 train_spec = tf.estimator.TrainSpec(input_fn = read_dataset( filename = train_file, mode = tf.estimator.ModeKeys.TRAIN, batch_size = 128), max_steps = num_train_steps) 
0
Dec 05 '18 at 9:01
source share



All Articles