I had problems with the recursive chain of observables.
I work with RxJS, which is currently in version 1.0.10621, and contains most of the basic Rx functions in conjunction with Rx for jQuery.
Let me introduce an example scenario for my problem: I will poll the Twitter Search API (JSON response) for tweets / updates containing a specific keyword. The response also includes a refresh_url, which should be used to generate a subsequent request. The response to this subsequent request will again contain a new refresh_url, etc.
Rx.jQuery allows me to get the Twitter search API to observe an observable event that one of them creates and then completes. What I have tried so far is for the onNext handler to remember refresh_url and use it in the onCompleted handler to create both a new observable and the corresponding observer for the next request. Thus, one observable + observational pair follows it indefinitely.
The problem with this approach:
The observed observer / observer is already alive when their predecessors have not yet been removed.
I have to do a lot of unpleasant accounting reports in order to maintain a valid reference to the current living observer, who may actually be two. (One in onCompleted, and the other in a different life cycle). This link, of course, is necessary to refuse the subscription / order of the observer. An alternative to accounting would be to implement a side effect with βstill running?β - logical, as I did in my example.
Code example:
running = true; twitterUrl = "http://search.twitter.com/search.json"; twitterQuery = "?rpp=10&q=" + encodeURIComponent(text); twitterMaxId = 0; //actually twitter ignores its since_id parameter newTweetObserver = function () { return Rx.Observer.create( function (tweet) { if (tweet.id > twitterMaxId) { twitterMaxId = tweet.id; displayTweet(tweet); } } ); } createTwitterObserver = function() { twitterObserver = Rx.Observer.create( function (response) { if (response.textStatus == "success") { var data = response.data; if (data.error == undefined) { twitterQuery = data.refresh_url; var tweetObservable; tweetObservable = Rx.Observable.fromArray(data.results.reverse()); tweetObservable.subscribe(newTweetObserver()); } } }, function(error) { alert(error); }, function () { //create and listen to new observer that includes a delay if (running) { twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000); twitterObservable.subscribe(createTwitterObserver()); } } ); return twitterObserver; } twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery); twitterObservable.subscribe(createTwitterObserver());
Don't be fooled by a double layer of watchers / watchers from tweeting requests. My example is mainly about the first level: requesting data from Twitter. If in solving this problem the second layer (converting answers to tweets) can become one with the first, this would be fantastic; But I think this is a completely different matter. Till.
Eric Meyer pointed me to the Expand operator (see example below) and suggested joining the templates as an alternative.
var ys = Observable.Expand (new[]{0}.ToObservable()
This should be copied to LINQPad. It accepts singleton observables and produces a single end observer.
So my question is: how can I do the extension trick in RxJS?
EDIT:
The extension operator can probably be implemented as shown in this thread . But generators are needed (and I only have JS <1.6).
Unfortunately, RxJS 2.0.20304-beta does not implement the Extend method.