How to emit / pass array values ​​as a readable stream in node.js?

What is the best way to create a readable stream from array and channel values ​​into a writable stream? I saw a substack example using setInterval, and I can implement this successfully using 0 for the interval value, but I repeat a lot of data and running gc slows down every time.

// Working with the setInterval wrapper var arr = [1, 5, 3, 6, 8, 9]; function createStream () { var t = new stream; t.readable = true; var times = 0; var iv = setInterval(function () { t.emit('data', arr[times]); if (++times === arr.length) { t.emit('end'); clearInterval(iv); } } }, 0); // Create the writable stream s // .... createStream().pipe(s); 

What I would like to do is emit values ​​without setInterval. Perhaps using such an asynchronous module:

 async.forEachSeries(arr, function(item, cb) { t.emit('data', item); cb(); }, function(err) { if (err) { console.log(err); } t.emit('end'); }); 

In this case, I iterate over the array and emit data, but never translate any values. I already saw the shinout ArrayStream , but I think it was created before v0.10, and that is a bit more overhead than I'm looking for.

+8
source share
4 answers

I ended up using ArrayStream for this. This solved the problem when the GC was launched too often. I got warnings for the recursive process.nextTick from node, so I changed nextTick callbacks in ArrayStream to setImmediate and fixed the warnings and it seems to work fine.

+2
source

You can solve this problem by creating a readable stream and clicking values ​​on it.

Streams are a pain, but it is often easier to work with them directly than using libraries.

Array of strings or buffers for the stream

If you are working with an array of strings or buffers, this will work:

 'use strict' const Stream = require('stream') const readable = new Stream.Readable() readable.pipe(process.stdout) const items = ['a', 'b', 'c'] items.forEach(item => readable.push(item)) // no more data readable.push(null) 

Notes:

  • readable.pipe(process.stdout) performs two functions: transfers the stream to "stream" mode and configures the process.stdout stream to write data from readable
  • The Readable#push method is intended for the creator of the readable stream, not for the user of the stream.
  • You need to do Readable#push(null) to indicate that there is no more data.

Array of non-playing streams

To create a stream from an array of things that are neither strings nor buffers, you need both a readable stream and a writeable stream in "Object Mode" . In the example below, I made the following changes:

  • Initialize a readable stream using {objectMode: true}
  • Instead of connecting to process.stdout , a pipe to a simple writable stream in object mode.

      'use strict' const Stream = require('stream') const readable = new Stream.Readable({objectMode: true}) const writable = new Stream.Writable({objectMode: true}) writable._write = (object, encoding, done) => { console.log(object) // ready to process the next chunk done() } readable.pipe(writable) const items = [1, 2, 3] items.forEach(item => readable.push(item)) // end the stream readable.push(null) 

Performance note

Where does the data come from? If it is a streaming data source, it is better to control the stream using a conversion stream than to convert to / from an array.

+20
source

TL; dr;

 const items = [1,2,3] const stream = new Readable({ objectMode: true, read() { const item = items.pop() if (!item) { this.push(null) } this.push(item) }, }) 
+6
source

This is an old question, but if someone came across this, node-stream-array is a much simpler and more elegant implementation for Node. js> = v0.10

 var streamify = require('stream-array'), os = require('os'); streamify(['1', '2', '3', os.EOL]).pipe(process.stdout); 
+2
source

All Articles