Filtering a Stream object in node.js

It seems to me that the elegant way to process certain data types in Node.js is through a chain of processing objects such as UNIX pipes.

For example, grep:

function Grep(pattern) {
    ...
}
util.inherits(Grep, stream.Stream);

Grep.prototype.???? = ???????  // What goes here?

grep = new Grep(/foo/);

process.stdin.pipe(grep);
myStream.pipe(process.stdout);

However, it’s not entirely clear to me how to redefine the various Stream methods in order for this to work.

How to create a Stream object that simply copies its input to its output? Presumably, with this in mind, more complex filtering flows become trivial.

Update: the following seems to work (expressed in CoffeeScript, so I am not populating this field with JS syntax!):

class Forwarder extends stream.Stream
    write: (chunk, encoding) ->
        @emit 'data', chunk
    end: (chunk, encoding) =>
        if chunk?
            @emit 'data', chunk
        @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

However, binding something to this script does not output anything. Calling 'fwd.write ()' explicitly in a script calls the output to stdout.

+5
2

.

, , , . , . .

class Forwarder extends stream.Stream
  constructor: ->
    @writable = true
  write: (chunk, encoding) ->
    @emit 'data', chunk
  end: ->
    @emit 'end'

fwd = new Forwarder()
fwd.pipe(process.stdout);
process.stdin.pipe(fwd);
process.stdin.resume();

Update

V1 Node <= 0,8. > 0,8, Node , , :

class Forwarder extends stream.Transform
    _transform: (chunk, encoding, callback) ->
      this.push(chunk);
      callback();

chunk , .

+8

, , , - , .

, OP, API Node 0.10.

var stream = require('stream')
var util = require('util')

function Grep(pattern) {
  stream.Transform.call(this)

  this.pattern = pattern
}

util.inherits(Grep, stream.Transform)

Grep.prototype._transform = function(chunk, encoding, callback) {
  var string = chunk.toString()

  if (string.match(this.pattern)) {
    this.push(chunk)
  }

  callback()
}

var grep = new Grep(/foo/)
process.stdin.pipe(grep)
grep.pipe(process.stdout)
+1

All Articles