RxJS: recursive list of observed and single observers

I had problems with the recursive chain of observables.

I work with RxJS, which is currently in version 1.0.10621, and contains most of the basic Rx functions in conjunction with Rx for jQuery.

Let me introduce an example scenario for my problem: I will poll the Twitter Search API (JSON response) for tweets / updates containing a specific keyword. The response also includes a refresh_url, which should be used to generate a subsequent request. The response to this subsequent request will again contain a new refresh_url, etc.

Rx.jQuery allows me to get the Twitter search API to observe an observable event that one of them creates and then completes. What I have tried so far is for the onNext handler to remember refresh_url and use it in the onCompleted handler to create both a new observable and the corresponding observer for the next request. Thus, one observable + observational pair follows it indefinitely.

The problem with this approach:

  • The observed observer / observer is already alive when their predecessors have not yet been removed.

  • I have to do a lot of unpleasant accounting reports in order to maintain a valid reference to the current living observer, who may actually be two. (One in onCompleted, and the other in a different life cycle). This link, of course, is necessary to refuse the subscription / order of the observer. An alternative to accounting would be to implement a side effect with β€œstill running?” - logical, as I did in my example.

Code example:

running = true; twitterUrl = "http://search.twitter.com/search.json"; twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); twitterMaxId = 0; //actually twitter ignores its since_id parameter newTweetObserver = function () { return Rx.Observer.create( function (tweet) { if (tweet.id > twitterMaxId) { twitterMaxId = tweet.id; displayTweet(tweet); } } ); } createTwitterObserver = function() { twitterObserver = Rx.Observer.create( function (response) { if (response.textStatus == "success") { var data = response.data; if (data.error == undefined) { twitterQuery = data.refresh_url; var tweetObservable; tweetObservable = Rx.Observable.fromArray(data.results.reverse()); tweetObservable.subscribe(newTweetObserver()); } } }, function(error) { alert(error); }, function () { //create and listen to new observer that includes a delay if (running) { twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); twitterObservable.subscribe(createTwitterObserver()); } } ); return twitterObserver; } twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); twitterObservable.subscribe(createTwitterObserver()); 

Don't be fooled by a double layer of watchers / watchers from tweeting requests. My example is mainly about the first level: requesting data from Twitter. If in solving this problem the second layer (converting answers to tweets) can become one with the first, this would be fantastic; But I think this is a completely different matter. Till.

Eric Meyer pointed me to the Expand operator (see example below) and suggested joining the templates as an alternative.

 var ys = Observable.Expand (new[]{0}.ToObservable() // initial sequence , i => ( i == 10 ? Observable.Empty<int>() // terminate : new[]{i+1}.ToObservable() // recurse ) ); ys.ToArray().Select(a => string.Join(",", a)).DumpLive(); 

This should be copied to LINQPad. It accepts singleton observables and produces a single end observer.

So my question is: how can I do the extension trick in RxJS?

EDIT:
The extension operator can probably be implemented as shown in this thread . But generators are needed (and I only have JS <1.6).
Unfortunately, RxJS 2.0.20304-beta does not implement the Extend method.

+7
source share
1 answer

So, I will try to solve your problem a little differently than you, and accept some freedoms that you can solve easier.

So, I can’t say that you are trying to follow these steps.

  • Get the first tweet list containing the following URL
  • After receiving a list of tweets, onNext the current observer and get the following set of tweets
    • Do it endlessly

Or there is a user action (get more / scroll down). In any case, this is indeed the same problem. I may be reading your problem incorrectly. Here is the answer for that.

 function getMyTweets(headUrl) { return Rx.Observable.create(function(observer) { innerRequest(headUrl); function innerRequest(url) { var next = ''; // Some magic get ajax function Rx.get(url).subscribe(function(res) { observer.onNext(res); next = res.refresh_url; }, function() { // Some sweet handling code // Perhaps get head? }, function() { innerRequest(next); }); } }); } 

This may not be the answer you requested. If not, sorry!


Edit: by looking at the code, it looks like you want to accept the results as an array and examine it.

 // From the results perform a select then a merge (if ordering does not matter). getMyTweets('url') .selectMany(function(data) { return Rx.Observable.fromArray(data.results.reverse()); }); // Ensures ordering getMyTweets('url') .select(function(data) { return Rx.Observable.fromArray(data.results.reverse()); }) .concat(); 
+4
source

All Articles