Node.js: split content flow for n-parts

I am trying to understand node flows and their life cycle. So, I want to split the contents of the stream into n-parts. The following code explains my intentions and shows that I have already tried something myself. I omitted some details

I have a stream that just generates some data (just a sequence of numbers):

class Stream extends Readable { constructor() { super({objectMode: true, highWaterMark: 1}) this.counter = 0 } _read(size) { if(this.counter === 30) { this.push(null) } else { this.push(this.counter) } this.counter += 1 } } const stream = new Stream() stream.pause(); 

function that tries to take n of the following fragments:

 function take(stream, count) { const result = [] return new Promise(function(resolve) { stream.once('readable', function() { var chunk; do { chunk = stream.read() if (_.isNull(chunk) || result.length > count) { stream.pause() break } result.push(chunk) } while(true) resolve(result) }) }) } 

and want to use it as follows:

 take(stream, 3) .then(res => { assert.deepEqual(res, [1, 2, 3]) return take(stream, 3) }) .then(res => { assert.deepEqual(res, [4, 5, 6]) }) 

What is an idiomatic way to do this?

+8
javascript node.js-stream
source share
2 answers

Using ReadableStream , you can use one function to check if the elements of the current piece of data match the expected result.

Create variables CHUNK and N , where CHUNK is the number of elements for slicing or splicing from the original array, N is a variable increased by CHUNK each time .enqueue() called inside the pull() call.

 const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []]; let N = 0; const stream = new ReadableStream({ pull(controller) { if (N < data.length) // slice `N, N += CHUNK` elements from `data` controller.enqueue(data.slice(N, N += CHUNK)) else // if `N` is equal to `data.length` call `.close()` on stream controller.close() } }); const reader = stream.getReader(); const processData = ({value, done}) => { // if stream is closed return `result`; `reader.closed` returns a `Promise` if (done) return reader.closed.then(() => result); if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) { console.log(`N: ${N}, value: [${value}]`) result.push(...value); return reader.read().then(data => processData(data)) } } const readComplete = res => console.log(`result: [${res}]`); reader.read() .then(processData) .then(readComplete) .catch(err => console.log(err)); 

Using the .then() chain

 const [data, CHUNK, result] = [[1,2,3,4,5,6], 3, []]; let N = 0; const stream = new ReadableStream({ pull(controller) { if (N < data.length) // slice `N, N += CHUNK` elements from `data` controller.enqueue(data.slice(N, N += CHUNK)) else // if `N` is equal to `data.length` call `.close()` on stream controller.close() } }); const reader = stream.getReader(); const processData = ({value, done}) => { // if stream is closed return `result`; `reader.closed` returns a `Promise` if (done) return reader.closed.then(() => result); if (data.slice(N - CHUNK, N).every((n, index) => n === value[index])) { console.log(`N: ${N}, value: [${value}]`) result.push(...value); return reader.read().then(data => processData(data)) } } const readComplete = res => console.log(`result: [${res}]`); reader.read() .then(({value, done}) => { if ([1,2,3].every((n, index) => n === value[index])) { console.log(`N: ${N}, value: [${value}]`) result.push(...value); return reader.read() } }) .then(({value, done}) => { if ([4,5,6].every((n, index) => n === value[index])) { console.log(`N: ${N}, value: [${value}]`) result.push(...value); // return `result`; `reader.closed` returns a `Promise` return reader.closed.then(() => result); } }) .then(readComplete) .catch(err => console.log(err)); 

See also Chrome memory issue - File API + AngularJS

+5
source share

I think this may help you - https://github.com/substack/stream-handbook

This is an amazingly detailed reference with sample code for various streaming scenarios, and I use the same as the link for my own project, and have found it useful so far! It has sample code in / examples as well

0
source share

All Articles