Python 3 merges data from large files that are sorted

I have several large files (> 5M data lines) that are sorted by a unique timestamp. All files contain almost all of the same timestamps, with the exception of a few randomly missing lines (<1000). I would like to efficiently combine data from all files into one data set with one row per timestamp, preferably using a generator.

With the exception of the missing lines, I could just use zip:

def get_data(list_of_iterables): for data in zip(*list_of_iterables): yield data 

However, since there are some missing lines, I need to combine the data in a timestamp instead of just zipping. I can simply ignore any lines that do not have corresponding timestamps in each file.

Is there a pythonic way to implement this functionality in multiple lines?

My approach would be to have each one repeat in turn until the timestamp is less than the maximum timestamp for a group of iterations. Whenever all timestamps match, give a line and advance all repeated ones. But the logic seems messy when I try to implement this approach.

Edit: performance.

The implementation should start returning rows without first reading all the data into memory. It takes some time to read all the data, and many times you only need to check the first part of the lines.

+7
python
source share
4 answers

In the end, I wrote the following code to solve my problem, which turned out to be easier than I expected:

 def advance_values(iters): for it in iters: yield next(it) def align_values(iters, values, key): for it, value in zip(iters, values): while (value[0],value[1]) < key: value = next(it) yield value def merge_join(*iters): values = list(advance_values(iters)) while True: if len(values) != len(iters): return tms = [(v[0],v[1]) for v in values] max_tm = max(tms) if all((v[0],v[1]) == max_tm for v in values): yield values values = list(advance_values(iters)) else: values = list(align_values(iters, values, max_tm)) 
+1
source share

If each iterable in list_of_iterables sorted by timestamp , you can use heapq.merge() to combine them taking into account possible spaces in the data and itertools.groupby() to group records with the same timestamp:

 from heapq import merge from itertools import groupby from operator import attrgetter for timestamp, group in groupby(merge(*list_of_iterables), key=attrgetter('timestamp')): print(timestamp, list(group)) # same timestamp 

An implementation gives groups without first reading all the data in memory.

+1
source share

My first assumption would be to use a dictionary with timestamps as keys, and the rest of the data in rows as values, then for each line in each file add it to the dictionary only if the element with the same time stamp (key) is still not present.

However, if you are really dealing with gigantic data sets (which you seem to be in this case), then your approach to your original question would be your best option.

0
source share

ok, I was interested in the problem (a similar problem recently occurred) and worked a little on it. you can try something like this:

 import io import datetime from csv import DictReader file0 = io.StringIO('''timestamp,data 2015-06-01 10:00, data00 2015-06-01 11:00, data01 2015-06-01 12:00, data02 2015-06-01 12:30, data03 2015-06-01 13:00, data04 ''') file1 = io.StringIO('''timestamp,data 2015-06-01 09:00, data10 2015-06-01 10:30, data11 2015-06-01 11:00, data12 2015-06-01 12:30, data13 ''') class Data(object): def __init__(self): self.timestamp = None self.data = None @staticmethod def new_from_dict(dct=None): if dct is None: return None ret = Data() ret.data = dct['data'].strip() ret.timestamp = datetime.datetime.strptime(dct['timestamp'], '%Y-%m-%d %H:%M') return ret def __lt__(self, other): if other is None: return False return self.timestamp < other.timestamp def __gt__(self, other): if other is None: return False return self.timestamp > other.timestamp def __str__(self): ret = '{0.__class__.__name__}'.format(self) +\ '(timestamp={0.timestamp}, data={0.data})'.format(self) return ret def next_or_none(reader): try: return Data.new_from_dict(next(reader)) except StopIteration: return None def yield_in_order(reader0, reader1): data0 = next_or_none(reader0) data1 = next_or_none(reader1) while not data0 == data1 == None: if data0 is None: yield None, data1 data1 = next_or_none(reader1) continue if data1 is None: yield data0, None data0 = next_or_none(reader0) continue while data0 < data1: yield data0, None data0 = next_or_none(reader0) while data0 > data1: yield None, data1 data1 = next_or_none(reader1) if data0 is not None and data1 is not None: if data0.timestamp == data1.timestamp: yield data0, data1 data0 = next_or_none(reader0) data1 = next_or_none(reader1) csv0 = DictReader(file0) csv1 = DictReader(file1) FMT = '{!s:50s} | {!s:50s}' print(FMT.format('file0', 'file1')) print(101*'-') for dta0, dta1 in yield_in_order(csv0, csv1): print(FMT.format(dta0, dta1)) 

it is only for 2 files.

0
source share

All Articles