How to ensure that asynchronous code is executed after stream processing is complete?

I have a thread that processes by listening to the data , error and end events, and I call a function to handle each data event in the first thread. Naturally, the data processing function calls other callbacks, making it asynchronous. So, how do I start executing more code when processing data in a stream? Listening to the end event in the stream does NOT mean that the asynchronous data processing functions are finished.

How can I guarantee that the flow processing functions are finished when I execute the following statement?

Here is an example:

 function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { var self = this; var promises = []; accountStream .on('data', function (account) { migrateAccount.bind(self)(account, finishMigration); }) .on('error', function (err) { return console.log(err); }) .on('end', function () { console.log("Finished updating account stream (but finishMigration is still running!!!)"); callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running! }); } var migrateAccount = function (oldAccount, callback) { executeSomeAction(oldAccount, function(err, newAccount) { if (err) return console.log("error received:", err); return callback(newAccount); }); } var finishMigration = function (newAccount) { // some code that is executed asynchronously... } 

How can I guarantee that callThisOnlyAfterAllAccountsAreMigrated is called AFTER the thread that has been processed?

Can this be done using promises? Can this be done using threads? I work with Nodejs, so links to other npm modules may be useful.

+5
source share
3 answers

As you said, listening to the end event in a stream is useless in itself. The thread does not know and does not care about what you are doing with the data in your data handler, so you will need to write code to track your own migrateAccount state.

If it were me, I would rewrite this entire section. If you use the readable event with .read() in your stream, you can read as many items as possible at a time, in your opinion. If this is not a problem. If it's 30, great. The reason you do this is because you will not overwhelm everything that works with data coming from the stream. Like now, if AccountStream is fast, your application will surely crash at some point.

When you read an element from the stream and begin work, accept the promise that you will return (use Bluebird or similar) and throw it into the array. When the promise is resolved, remove it from the array. When the thread ends, attach the .done() handler to .all() (basically, making one big promise from all the promises still in the array).

You can also use a simple counter for your work.

+2
source

Using pass -through (npm through2 module, I solved this problem using the following code that controls asynchronous behavior:

 var through = require('through2').obj; function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { var self = this; var promises = []; accountStream.pipe(through(function(account, _, next) { migrateAccount.bind(self)(account, finishMigration, next); })) .on('data', function (account) { }) .on('error', function (err) { return console.log(err); }) .on('end', function () { console.log("Finished updating account stream"); callThisOnlyAfterAllAccountsAreMigrated(); }); } var migrateAccount = function (oldAccount, callback, next) { executeSomeAction(oldAccount, function(err, newAccount) { if (err) return console.log("error received:", err); return callback(newAccount, next); }); } var finishMigration = function (newAccount, next) { // some code that is executed asynchronously, but using 'next' callback when migration is finished... } 
+1
source

This is much easier when you process streams through promises.

Copied from here , an example that uses the spex library:

 var spex = require('spex')(Promise); var fs = require('fs'); var rs = fs.createReadStream('values.txt'); function receiver(index, data, delay) { return new Promise(function (resolve) { console.log("RECEIVED:", index, data, delay); resolve(); // ok to read the next data; }); } spex.stream.read(rs, receiver) .then(function (data) { // streaming successfully finished; console.log("DATA:", data); }, function (reason) { // streaming has failed; console.log("REASON:", reason); }); 
+1
source

All Articles