RxJS 5.0 "do it for now" as a mechanism

I am trying to use RxJS for a simple short poll. It should make a request once every delay seconds to the path location on the server, ending after one of two conditions is reached: either the isComplete(data) callback returns true, or the server tried more than maxTries . Here is the basic code:

 newShortPoll(path, maxTries, delay, isComplete) { return Observable.interval(delay) .take(maxTries) .flatMap((tryNumber) => http.get(path)) .doWhile((data) => !isComplete(data)); } 

However, doWhile does not exist in RxJS 5.0, so the condition that it can only try the maxTries server maxTries working, thanks to the take () call, but the isComplete condition isComplete not working. How can I make the observed next () values ​​change until isComplete returns true, and at that moment it will be the next () this value and complete ().

I should note that takeWhile() does not work for me here. It does not return the last value, which is actually the most important, as we know it.

Thanks!

+6
source share
3 answers

You can achieve this using retry and first .

 // helper observable that can return incomplete/complete data or fail. var server = Rx.Observable.create(function (observer) { var x = Math.random(); if(x < 0.1) { observer.next(true); } else if (x < 0.5) { observer.error("error"); } else { observer.next(false); } observer.complete(); return function () { }; }); function isComplete(data) { return data; } var delay = 1000; Rx.Observable.interval(delay) .switchMap(() => { return server .do((data) => { console.log('Server returned ' + data); }, () => { console.log('Server threw'); }) .retry(3); }) .first((data) => isComplete(data)) .subscribe(() => { console.log('Got completed value'); }, () => { console.log('Got error'); }); 
 <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.1/Rx.min.js"></script> 
+1
source

We can create a utility function to create a second observable that emits every element that emits an inner observable; however, we will call the onCompleted function after our condition is met:

 function takeUntilInclusive(inner$, predicate) { return Rx.Observable.create(observer => { var subscription = inner$.subscribe(item => { observer.onNext(item); if (predicate(item)) { observer.onCompleted(); } }, observer.onError, observer.onCompleted); return () => { subscription.dispose(); } }); } 

And here's a quick snippet using our new utility method:

 const inner$ = Rx.Observable.range(0, 4); const data$ = takeUntilInclusive(inner$, (x) => x > 2); data$.subscribe(x => console.log(x)); // >> 0 // >> 1 // >> 2 // >> 3 

This answer is based on: RX Observable.TakeWhile checks the state before each element, but I need to check after

0
source

This is an old question, but I also had to question the endpoint and come to this question. Here is my own doWhile I ended up creating:

 import { pipe, from } from 'rxjs'; import { switchMap, takeWhile, filter, map } from 'rxjs/operators'; export function doWhile<T>(shouldContinue: (a: T) => boolean) { return pipe( switchMap((data: T) => from([ { data, continue: true }, { data, continue: shouldContinue(data), exclude: true } ])), takeWhile(message => message.continue), filter(message => !message.exclude), map(message => message.data) ); } 

This is a little strange, but so far it works for me. You can use it with take as you tried.

0
source

All Articles