How can I apply temporary back pressure in RxJS5?

Suppose I have the following code:

let a = Rx.Observable.of(1, 2, 3) let b = Observable.zip(a, a, (a, b) => a + b) b.forEach(t => console.log(t)) 

This immediately displays the results. Now, how to set the time delay between each message as a backpressure method (note that I do not want to use a buffer, instead I want a and b become Cold observables ), for example:

 b.takeEvery(1000).forEach(t => console.log(t)) 

And get the same answer:

 <wait 1s> 2 <wait 1s> 4 <wait 1s> 6 

Alternative: If backpressure (output mechanisms for some observables) is not supported in RxJS, then how can you create an infinite generator without running out of resources?

Alternative 2: Other JS frameworks that support pull and push mechanisms

+8
javascript observable rxjs rxjs5 reactivex
source share
2 answers

In case of back pressure, RxJS 5.x is not supported, but there is, for example, a pausable operator in version 4.x, It works only with hot observables. More information on backpressure in case 4.x and here (especially take the loot below and the description related to RxJS).

This Erik Meijer tweet may be a bit controversial, but relevant: https://twitter.com/headinthebox/status/774635475071934464

For your own implementation of the backpressure mechanism, you need to have a 2-way communication channel that can be easily created by two objects - one for each end. Basically use next to send messages and .subscribe to go to the other end.

Creating a generator is also doable - again using the object to navigate between push and pull based worlds. Below is an example implementation for generating Fibonacci numbers.

 const fib = () => { const n = new Rx.Subject() const f = n .scan(c => ({ a: cb, b: cb + ca }), { a: 0, b: 1 }) .map(c => ca) return { $: f, next: () => n.next() } } const f = fib() f.$.subscribe(n => document.querySelector('#r').innerHTML = n) Rx.Observable.fromEvent(document.querySelector('#f'), 'click') .do(f.next) .subscribe() 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script> <button id='f'>NEXT FIBONACCI</button> <div id='r'>_?_<div> 

Another js library that may interest you, https://github.com/ubolonton/js-csp - did not use it, so I'm not sure how it relates to backpressure.

+3
source share

The idea is to queue timeouts one by one when the previous one completes the fiddle

 let a = Rx.Observable.of(1, 2, 3); let b = Rx.Observable.zip(a, a, (a, b) => a + b); // getting values into array var x = []; b.forEach(t => x.push(t)); var takeEvery = function(msec,items,action,index=0){ if(typeof(action) == "function") if(index<items.length) setTimeout( function(item,ind){ action(item); takeEvery(msec,items,action,ind); },msec, items[index],++index); }; // queueing over time takeEvery(1000,x, function(item){ console.log(item); }); 
0
source share

All Articles