How to pull X amount of previous data into a row in CSV

I have very large CSV data and I need to add the previous data to each row for each name in column 2 for dates preceding the current one indicated in column2. I think the easiest way to present this problem is to provide a detailed example similar to my real data, but greatly reduced:

Datatitle,Date,Name,Score,Parameter
data,01/09/13,george,219,dataa,text
data,01/09/13,fred,219,datab,text
data,01/09/13,tom,219,datac,text
data,02/09/13,george,229,datad,text
data,02/09/13,fred,239,datae,text
data,02/09/13,tom,219,dataf,text
data,03/09/13,george,209,datag,text
data,03/09/13,fred,217,datah,text
data,03/09/13,tom,213,datai,text
data,04/09/13,george,219,dataj,text
data,04/09/13,fred,212,datak,text
data,04/09/13,tom,222,datal,text
data,05/09/13,george,319,datam,text
data,05/09/13,fred,225,datan,text
data,05/09/13,tom,220,datao,text
data,06/09/13,george,202,datap,text
data,06/09/13,fred,226,dataq,text
data,06/09/13,tom,223,datar,text
data,06/09/13,george,219,dataae,text

So, for the first three lines of this csv there is no previous data. Therefore, if we said that we want to display columns 3 and 4 for the last 3 events from george (row1) on a date preceding the current one, it would look:

data,01/09/13,george,219,dataa,text,x,y,x,y,x,y

However, when the previous data starts to become avaialble, we hope to create a csv, for example:

Datatitle,Date,Name,Score,Parameter,LTscore,LTParameter,LTscore+1,LTParameter+1,LTscore+2,LTParameter+3,
data,01/09/13,george,219,dataa,text,x,y,x,y,x,y
data,01/09/13,fred,219,datab,text,x,y,x,y,x,y
data,01/09/13,tom,219,datac,text,x,y,x,y,x,y
data,02/09/13,george,229,datad,text,219,dataa,x,y,x,y
data,02/09/13,fred,239,datae,text,219,datab,x,y,x,y
data,02/09/13,tom,219,dataf,text,219,datac,x,y,x,y
data,03/09/13,george,209,datag,text,229,datad,219,dataa,x,y
data,03/09/13,fred,217,datah,text,239,datae,219,datab,x,y
data,03/09/13,tom,213,datai,text,219,dataf,219,datac,x,y
data,04/09/13,george,219,dataj,text,209,datag,229,datad,219,dataa
data,04/09/13,fred,212,datak,text,217,datah,239,datae,219,datab
data,04/09/13,tom,222,datal,text,213,datai,219,dataf,219,datac
data,05/09/13,george,319,datam,text,219,dataj,209,datag,229,datad
data,05/09/13,fred,225,datan,text,212,datak,217,datah,239,datae
data,05/09/13,tom,220,datao,text,222,datal,213,datai,219,dataf
data,06/09/13,george,202,datap,text,319,datam,219,dataj,209,datag
data,06/09/13,fred,226,dataq,text,225,datan,212,datak,217,datah
data,06/09/13,tom,223,datar,text,220,datao,222,datal,213,datai
data,06/09/13,george,219,datas,text,319,datam,219,dataj,209,datag

, 06/09/13 , 319,datam,219,dataj,209,datag, . , , , 3 . ( " , ".

, 3 3 . , . , , fred, tom, george . - , ( ), . - , , , . . SMNALLY

+4
5

, . deque , :

import csv
from collections import deque, defaultdict
from itertools import chain, islice, groupby
from operator import itemgetter

# defaultdict whose first access of a key will create a deque of size 3
# defaulting to [['x', 'y'], ['x', 'y'], ['x' ,'y']]
# Since deques are efficient at head/tail manipulation, then an insert to
# the start is efficient, and when the size is fixed it will cause extra
# elements to "fall off" the end... 
names_previous = defaultdict(lambda: deque([['x', 'y']] * 3, 3))
with open('sample.csv', 'rb') as fin, open('sample_new.csv', 'wb') as fout:
    csvin = csv.reader(fin)
    csvout = csv.writer(fout)
    # Use groupby to detect changes in the date column. Since the data is always
    # asending, the items within the same data are contigious in the data. We use
    # this to identify the rows within the *same* date.
    # date=date we're looking at, rows=an iterable of rows that are in that date...
    for date, rows in groupby(islice(csvin, 1, None), itemgetter(1)):
        # After we've processed entries in this date, we need to know what items of data should
        # be considered for the names we've seen inside this date. Currently the data
        # is taken from the last occurring row for the name.
        to_add = {}
        for row in rows:
            # Output the row present in the file with a *flattened* version of the extra data
            # (previous items) that we wish to apply. eg:
            # [['x, 'y'], ['x', 'y'], ['x', 'y']] becomes ['x', 'y', 'x', 'y', 'x', y'] 
            # So we're easily able to store 3 pairs of data, but flatten it into one long
            # list of 6 items...
            # If the name (row[2]) doesn't exist yet, then by trying to do this, defaultdict
            # will automatically create the default key as above.
            csvout.writerow(row + list(chain.from_iterable(names_previous[row[2]])))
            # Here, we store for the name any additional data that should be included for the name
            # on the next date group. In this instance we store the information seen for the last
            # occurrence of that name in this date. eg: If we've seen it more than once, then
            # we only include data from the last occurrence. 
            # NB: If you wanted to include more than one item of data for the name, then you could
            # utilise a deque again by building it within this date group
            to_add[row[2]] = row[3:5]            
        for key, val in to_add.iteritems():
            # We've finished the date, so before processing the next one, update the previous data
            # for the names. In this case, we push a single item of data to the front of the deck.
            # If, we were storing multiple items in the data loop, then we could .extendleft() instead
            # to insert > 1 set of data from above.
            names_previous[key].appendleft(val)

3 .

, / , .

+11

, , , , . "input.csv" / , "output.csv" . , , - , . , - , - . [: 6] 6 ( /) .

import csv

myInput = open('input.csv','rb')
myOutput = open('output.csv','wb')
myFields = ['Datatitle','Date','Name','Score','Parameter','Text',
            'LTscore','LTParameter','LTscore+1','LTParameter+1',
            'LTscore+2','LTParameter+2']
inCsv = csv.DictReader(myInput,myFields)
outCsv = csv.writer(myOutput)
outCsv.writerow(myFields) # Write header row

previous_dict = dict() # store scores from previous dates
new_dict = dict() # buffer for records on current-date only

def add_new():
    # merge new_dict into previous_dict
    global new_dict, previous_dict
    for k in new_dict:
        if not previous_dict.has_key(k):
            previous_dict[k] = list()
        # put new items first
        previous_dict[k] = new_dict[k] + previous_dict[k]
    new_dict = dict() # reset buffer

old_date = '00/00/00' # start with bogus *oldest* date string
inCsv.next() # skip header row
for row in inCsv:
    myTitle = row['Datatitle']
    myDate = row['Date']
    myName = row['Name']
    myScore = row['Score']
    myParameter = row['Parameter']
    myText = row['Text']
    if old_date != myDate:
        add_new() # store new_dict buffer with previous data
        old_date = myDate
    if not new_dict.has_key(myName):
        new_dict[myName] = []
    # put new scores first
    new_dict[myName] = [myScore,myParameter] + new_dict[myName]
    if not previous_dict.has_key(myName):
        previous_dict[myName] = []
    outCsv.writerow([myTitle,myDate,myName,myScore,myParameter,myText] \
                     + previous_dict[myName][:6])
# end loop for each row

myInput.close()
myOutput.close()

. , - , . , sqlite dict , . 8G 2G , in-memory python dictionary, . , 64- Python 64- . , , , N (, 100, 1000, ). , .

+3

:
- Python 2.7.5
- defaultdict .
- , , fullo. - .
- operator.itemgetter() , .

from collections import deque, defaultdict
import csv
from functools import partial
from operator import itemgetter

# use a 3 item deque to hold the 
# previous three rows for each name
deck3 = partial(deque, maxlen = 3)
data = defaultdict(deck3)


name = itemgetter(2)
date = itemgetter(1)
sixplus = itemgetter(slice(6,None))

fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter',
          'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1',
          'LTscore+2', 'LTParameter+3']
with open('data.txt') as infile, open('processed.txt', 'wb') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    writer.writerow(fields)
    # comment out the next line if the data file does not have a header row
    reader.next()
    for row in reader:
        default = deque(['x', 'y', 'x', 'y', 'x', 'y'], maxlen = 6)
        try:
            previous_row = data[name(row)][-1]
            previous_date = date(previous_row)
        except IndexError:
            previous_date = None
        if  previous_date == date(row):
            # use the xtra stuff from last time
            row.extend(sixplus(previous_row))
            # discard the previous row because
            # there is a new row with the same date
            data[name(row)].pop()
        else:
            # add columns 3 and 4 from each previous row
            for deck in data[name(row)]:
                # adding new items to a full deque causes
                # items to drop off the other end
                default.appendleft(deck[4])
                default.appendleft(deck[3])
            row.extend(default)
        writer.writerow(row)
        data[name(row)].append(row)

, , - , . , - .

, . . . itemgetters, .

import csv
from operator import itemgetter

fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter',
          'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1',
          'LTscore+2', 'LTParameter+3']

name = itemgetter(2)
date = itemgetter(1)
cols_sixplus = itemgetter(slice(6,None))
cols34 = itemgetter(slice(3, 5))
cols6_9 = itemgetter(slice(6, 10))
data_alt = {}

with open('data.txt') as infile, open('processed_alt.txt', 'wb') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile)
    writer.writerow(fields)
    # comment out the next line if the data file does not have a header row
    reader.next()
    for row in reader:
        try:
            previous_row = data_alt[name(row)]
        except KeyError:
            # first time this name encountered
            row.extend(['x', 'y', 'x', 'y', 'x', 'y'])
            data_alt[name(row)] = row
            writer.writerow(row)
            continue
        if  date(previous_row) == date(row):
            # use the xtra stuff from last time
            row.extend(cols_sixplus(previous_row))
        else:
            row.extend(cols34(previous_row))
            row.extend(cols6_9(previous_row))
        data_alt[name(row)] = row
        writer.writerow(row)

, , , . , , .

+3

- , .

CSV.

  • . , . {'Tom' : [(date1, values),(date2, values)], 'George' : [(date1, values), (date2,values)]}. , , . {'Tom' : {date1: values, date2: values}, 'George' : {date1: values, date2: values}}. .

  •    .

. , , , , , . dataDict['Tom'][i-3:i]. , , , , . :

  • , , , . {'Tom' :(date1, [val1, val2, val3]),(date2, values)], 'George' : [(date1, values),(date2,values)]}.

  • , . , , KeyError, . , .

0

, . , . , ( ). - , . .

"""uses coroutines.

2 gig file, 1M lines, 2K characters/line:
- read and send one line at a time
- process and send one line
- accumulate 720 lines before write
Wed Nov 13 08:04:34 2013    fooprof
    10947682 function calls (9946973 primitive calls) in 82.147 seconds

   Ordered by: standard name

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
        1    0.000    0.000   82.147   82.147 <string>:1(<module>)
        1   59.896   59.896   82.147   82.147 optimizations.py:45(reader)
  1000710    8.864    0.000   21.703    0.000 optimizations.py:57(processor)
  1000710    1.506    0.000    6.137    0.000 optimizations.py:94(writer)
  1002098    0.185    0.000    0.185    0.000 {len}
  1000708    0.209    0.000    0.209    0.000 {method 'append' of 'list' objects}
      2/1    0.073    0.036    0.078    0.078 {method 'close' of 'generator' objects}
        1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
  1937129    0.295    0.000    0.295    0.000 {method 'extend' of 'list' objects}
  1002097    3.115    0.000    3.115    0.000 {method 'join' of 'str' objects}
2001416/1000708    0.839    0.000   22.172    0.000 {method 'send' of 'generator' objects}
  1000708    4.305    0.000    4.305    0.000 {method 'split' of 'str' objects}
  1000708    0.823    0.000    0.823    0.000 {method 'strip' of 'str' objects}
     1390    2.033    0.001    2.033    0.001 {method 'write' of 'file' objects}
        1    0.004    0.004    0.004    0.004 {method 'writelines' of 'file' objects}
        2    0.001    0.001    0.001    0.001 {open}

Running a few in a row helps:
Fri Nov 15 22:12:02 2013    fooprof
    10947671 function calls (9946963 primitive calls) in 69.237 seconds
Fri Nov 15 22:13:44 2013    fooprof
    10947671 function calls (9946963 primitive calls) in 64.330 seconds

using a dummy reader that sends the same line 1M times
Wed Nov 13 13:36:57 2013    fooprof
    10004374 function calls (9004373 primitive calls) in 23.013 seconds

using dummy reader AND writer --> processor time
Wed Nov 13 13:45:08 2013    fooprof
    10001730 function calls (9001729 primitive calls) in 10.523 seconds

using a dummy processor and writer --> mostly reader time
Wed Nov 13 22:45:24 2013    fooprof
        6005839 function calls (5005131 primitive calls) in 24.502 seconds

using a dummy reader and processor --> writer time
Wed Nov 13 22:52:12 2013    fooprof
    6004374 function calls (5004373 primitive calls) in 24.326 seconds

"""

import csv
from operator import itemgetter

# data,01/09/13,george,219,dataa,text
# data,01/09/13,george,219,dataa,text,x,y,x,y,x,y
# just keep the previous row

fields = ['Datatitle', 'Date', 'Name', 'Score', 'Parameter',
          'LTscore', 'LTParameter', 'LTscore+1', 'LTParameter+1',
          'LTscore+2', 'LTParameter+3']

def reader(processor, filename = 'data.txt'):
    processor.next()
    with open(filename) as f:
        #skip the header
        f.next()
        for line in f:
            processor.send(line)
    processor.close()
    return 'done'

def processor(writer):
    """Process line and send to writer.

    line --> str, a complete row of data
    sends str
    """
    date = itemgetter(1)
    name = itemgetter(2)
    cols_sixplus = itemgetter(slice(6,None))
    cols34 = itemgetter(slice(3, 5))
    cols6_9 = itemgetter(slice(6, 10))
    data = {}
    writer.next()
    try:
        while True:
            line = yield
            row = line.strip().split(',')
            try:
                previous_row = data[name(row)]
            except KeyError as e:
                # first time this name encountered
                row.extend(['x', 'y', 'x', 'y', 'x', 'y'])
                data[name(row)] = row
                writer.send(','.join(row) + '\n' )
                continue
            if  date(previous_row) == date(row):
                # use the xtra stuff from last time
                row.extend(cols_sixplus(previous_row))
            else:
                row.extend(cols34(previous_row))
                row.extend(cols6_9(previous_row))
            data[name(row)] = row
            writer.send(','.join(row) + '\n')
    except GeneratorExit:
        writer.close()

def writer(filename = 'processed.txt', accum = 1000):
    with open(filename, 'wb') as f:
        f.write('Datatitle,Date,Name,Score,Parameter,LTscore,LTParameter,LTscore+1,LTParameter+1,LTscore+2,LTParameter+3\n')
        try:
            while True:
                # dataout = list()
                dataout = list()
                while len(dataout) < accum:
                    dataout.append((yield))
                f.write(''.join(dataout))
        except GeneratorExit:
            f.writelines(dataout)


if __name__ == '__main__':
    import cProfile, pstats

    cProfile.run("reader(processor(writer(accum = 720)), filename = 'biggerdata.txt')", 'fooprof')
    p = pstats.Stats('fooprof')
    p.strip_dirs().sort_stats(-1).print_stats()

If you look at the profiler time using dummy functions (mocks?), They cannot be summed up over time for all three real functions - I don’t understand this either.

I tried using linecache in the reader, but it was slower. I tried mmap in the reader reading 200M chunks, but it was slower - probably because I used re.finditer () to highlight the lines. I will probably return to the mmap reader for my purposes.

0
source

All Articles