NodeJS, promises, streams - processing of large CSV files

I need to create a function to process large CSV files for use in a call to bluebird.map (). Given the potential file sizes, I would like to use streaming.

This function should take a stream (CSV file) and a function (which processes pieces from the stream) and return a promise when the file is read to the end (resolved) or errors (rejected).

So, I start with:

'use strict'; var _ = require('lodash'); var promise = require('bluebird'); var csv = require('csv'); var stream = require('stream'); var pgp = require('pg-promise')({promiseLib: promise}); api.parsers.processCsvStream = function(passedStream, processor) { var parser = csv.parse(passedStream, {trim: true}); passedStream.pipe(parser); // use readable or data event? parser.on('readable', function() { // call processor, which may be async // how do I throttle the amount of promises generated }); var db = pgp(api.config.mailroom.fileMakerDbConfig); return new Promise(function(resolve, reject) { parser.on('end', resolve); parser.on('error', reject); }); } 

Now I have two interconnected questions:

  • I need to throttle the actual amount of data being processed, so as not to create pressure in the memory.
  • A function passed as a processor parameter will often be asynchronous, such as storing the contents of a file in db through a promise-based library (right now: pg-promise ). Thus, it will create a promise in mind and continue, repeatedly.

The pg-promise library has functions to manage this, such as page () , but I can't wrap my work around how to mix stream event handlers using these search methods. Right now, I am returning a promise in the handler for readable after each read() , which means that I create a huge number of promised database operations and ultimately fail because I fell into the process memory.

Does anyone have a working example of this that I can use as a transition point?

UPDATE : There may be more than one way to throw a cat, but this works:

 'use strict'; var _ = require('lodash'); var promise = require('bluebird'); var csv = require('csv'); var stream = require('stream'); var pgp = require('pg-promise')({promiseLib: promise}); api.parsers.processCsvStream = function(passedStream, processor) { // some checks trimmed out for example var db = pgp(api.config.mailroom.fileMakerDbConfig); var parser = csv.parse(passedStream, {trim: true}); passedStream.pipe(parser); var readDataFromStream = function(index, data, delay) { var records = []; var record; do { record = parser.read(); if(record != null) records.push(record); } while(record != null && (records.length < api.config.mailroom.fileParserConcurrency)) parser.pause(); if(records.length) return records; }; var processData = function(index, data, delay) { console.log('processData(' + index + ') > data: ', data); parser.resume(); }; parser.on('readable', function() { db.task(function(tsk) { this.page(readDataFromStream, processData); }); }); return new Promise(function(resolve, reject) { parser.on('end', resolve); parser.on('error', reject); }); } 

Does anyone see a potential problem with this approach?

+7
promise bluebird pg-promise
source share
3 answers

Find below a complete application that correctly performs the same task as you: it reads the file as a stream, analyzes it as CSV and inserts each row into the database.

 const fs = require('fs'); const promise = require('bluebird'); const csv = require('csv-parse'); const pgp = require('pg-promise')({promiseLib: promise}); const cn = "postgres://postgres: password@localhost :5432/test_db"; const rs = fs.createReadStream('primes.csv'); const db = pgp(cn); function receiver(_, data) { function source(index) { if (index < data.length) { // here we insert just the first column value that contains a prime number; return this.none('insert into primes values($1)', data[index][0]); } } return this.sequence(source); } db.task(t => { return pgp.spex.stream.read.call(t, rs.pipe(csv()), receiver); }) .then(data => { console.log('DATA:', data); } .catch(error => { console.log('ERROR:', error); }); 

Please note that the only thing I changed: instead of csv instead of csv use the csv-parse library.

Added the use of the stream.read method from spex , which properly serves the Readable stream for use with promises.

+3
source share

You might want to watch promise-streams

 var ps = require('promise-streams'); passedStream .pipe(csv.parse({trim: true})) .pipe(ps.map({concurrent: 4}, row => processRowDataWhichMightBeAsyncAndReturnPromise(row))) .wait().then(_ => { console.log("All done!"); }); 

It works with back pressure and everything.

+6
source share

So to speak, you do not want streaming, but some parts of the data ?; -)

Do you know https://github.com/substack/stream-handbook ?

I think that the simplest approach without changing your architecture would be a kind of promising pool. e.g. https://github.com/timdp/es6-promise-pool

+1
source share

All Articles