How do reactive streams work in JS?

I am new to jet streams and am now trying to understand them. The idea looks pretty clear and simple, but in practice I can’t understand what is really happening there.

Right now I'm playing with most.js trying to implement a simple dispatcher. The scan method seems to be exactly what I need for this.

My code is:

var dispatch;
// expose method for pushing events to stream:
var events = require("most").create(add => dispatch = add); 
// initialize stream, so callback in `create` above is actually called
events.drain();

events.observe(v => console.log("new event", v));

dispatch(1);

var scaner = events.scan(
  (state, patch) => {
    console.log("scaner", patch);
    // update state here
    return state;
  },
  { foo: 0 }
);

scaner.observe(v => console.log("scaner state", v));

dispatch(2);

As I understand it, the first observer must be called twice (once per event) and the scanner callback and the second observer once each (because they were added after the first event was triggered).

However, in practice, the console shows this:

new event 1
new event 2
scaner state { foo: 0 }

The scanner is never called, no matter how many events I insert into the stream.

dispatch ( ), , .

? , , . ?

+4
2

, API:

most.from(['a', 'b', 'c', 'd'])
    .scan(function(string, letter) {
         return string + letter;
        }, '')
    .forEach(console.log.bind(console));

:

  • ['a', 'b', 'c', 'd'] .
  • scan().
  • ... forEach().

. .

, most.js , 1340 ff.:

exports.from = from;

function from(a) {
    if(Array.isArray(a) || isArrayLike(a)) {
        return fromArray(a);
    }  
...

, from() fromArray(). fromArray() ( ) Stream:

...
function fromArray (a) {
    return new Stream(new ArraySource(a));
}
...

, Stream sink.event(0, array[i]);, 0 . setTimeout, .event = function, , . , 4692 Scheduler delay() .

: , 0 .

, , - , . . , :-)?

. , :

https://jsfiddle.net/aak18y0m/1/

dispatch() . setTimeout():

setTimeout( function() { dispatch( 1 /* or 2 */); }, 0);

, , .

( ), . , :

doc ready
(index):61 Most loaded: [object Object]
(index):82 scanner state Object {foo: 0}
(index):75 scanner! 1
(index):82 scanner state Object {foo: 0}
(index):75 scanner! 2
(index):82 scanner state Object {foo: 0}

drain(), .

( ), dispatch() , , , JavaScript . // Alternative solution, .

+1

, , . lib.

- most.js. , " , ".

- Kefir.js lib, . . , , most.js, Kefir.js.

, lib, .

0

All Articles