Observable.prototype.scan() ( RxJS)
scan() , reduce(), . (. Rx scan reduce)
OP ( fiddle):
var subject = new Rx.ReplaySubject();
subject
.selectMany(x => Rx.Observable.fromArray(x.transactions))
.scan((agg, val) => agg+val.value, 0)
.subscribe(function (value) {
console.log(value)
});
setInterval(injectData, 2000);
function injectData () {
subject.onNext({id: Date.now(), transactions: [
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)},
{value: Math.round(Math.random() * 5000)}
]});
}
:
- selectMany(). , 2 , , reduce() ( fiddle)
subject
// note: use "selectMany" to flatten observable of observables
// note: using "reduce" inside here so that we only emit the aggregate
.selectMany(x =>
Rx.Observable
.fromArray(x.transactions)
.reduce((agg, val) => agg + val.value, 0)
)
// note: use "scan" to aggregate values
.scan((agg, val) => agg+val, 0)
.subscribe(function (value) {
console.log(value)
});
:
Rx ; onCompleted(), . , reduce(). fiddle .