I need to create a function to process large CSV files for use in a call to bluebird.map (). Given the potential file sizes, I would like to use streaming.
This function should take a stream (CSV file) and a function (which processes pieces from the stream) and return a promise when the file is read to the end (resolved) or errors (rejected).
So, I start with:
'use strict'; var _ = require('lodash'); var promise = require('bluebird'); var csv = require('csv'); var stream = require('stream'); var pgp = require('pg-promise')({promiseLib: promise}); api.parsers.processCsvStream = function(passedStream, processor) { var parser = csv.parse(passedStream, {trim: true}); passedStream.pipe(parser);
Now I have two interconnected questions:
- I need to throttle the actual amount of data being processed, so as not to create pressure in the memory.
- A function passed as a
processor parameter will often be asynchronous, such as storing the contents of a file in db through a promise-based library (right now: pg-promise ). Thus, it will create a promise in mind and continue, repeatedly.
The pg-promise library has functions to manage this, such as page () , but I can't wrap my work around how to mix stream event handlers using these search methods. Right now, I am returning a promise in the handler for readable after each read() , which means that I create a huge number of promised database operations and ultimately fail because I fell into the process memory.
Does anyone have a working example of this that I can use as a transition point?
UPDATE : There may be more than one way to throw a cat, but this works:
'use strict'; var _ = require('lodash'); var promise = require('bluebird'); var csv = require('csv'); var stream = require('stream'); var pgp = require('pg-promise')({promiseLib: promise}); api.parsers.processCsvStream = function(passedStream, processor) { // some checks trimmed out for example var db = pgp(api.config.mailroom.fileMakerDbConfig); var parser = csv.parse(passedStream, {trim: true}); passedStream.pipe(parser); var readDataFromStream = function(index, data, delay) { var records = []; var record; do { record = parser.read(); if(record != null) records.push(record); } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency)) parser.pause(); if(records.length) return records; }; var processData = function(index, data, delay) { console.log('processData(' + index + ') > data: ', data); parser.resume(); }; parser.on('readable', function() { db.task(function(tsk) { this.page(readDataFromStream, processData); }); }); return new Promise(function(resolve, reject) { parser.on('end', resolve); parser.on('error', reject); }); }
Does anyone see a potential problem with this approach?
alphadogg
source share