How to generate an observable from an initial observable when I can only generate other observables?

I want to create an observable, where each value of the observable depends on what was before it, starting with one value. If I have a simple conversion between values ​​of type Func<int, int> , this is easy to do with Observable.Generate as follows:

 Func<int, IObservable<int>> mkInts = init => Observable.Generate( init, // start value _ => true, // continue ? i => i + 1, // transformation function i => i); // result selector using (mkInts(1).Subscribe(Console.WriteLine)) { Console.ReadLine(); } 

It will happily record numbers on my screen until I press the enter button. However, my conversion function does some IO network, so the type is Func<int, IObservable<int>> , so I can not use this approach. Instead, I tried this:

 // simulate my transformation function Func<int, IObservable<int>> mkInt = ts => Observable.Return(ts) .Delay(TimeSpan.FromMilliseconds(10)); // pre-assign my generator function, since the function calls itself recursively Func<int, IObservable<int>> mkInts = null; // my generator function mkInts = init => { var ints = mkInt(init); // here is where I depend on the previous value. var nextInts = ints.SelectMany(i => mkInts(i + 1)); return ints.Concat(nextInts); }; using (mkInts(1).Subscribe(Console.WriteLine)) { Console.ReadLine(); } 

But it will be a stackoverflow after printing about 5,000 numbers. How can i solve this?

+4
source share
5 answers

I think I have a good clean solution for you.

First, go back to using Func<int, int> - you can easily turn it into Func<int, IObservable<int>> using Observable.FromAsyncPattern .

I used this for testing:

 Func<int, int> mkInt = ts => { Thread.Sleep(100); return ts + 1; }; 

Now here is the money maker:

 Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) => Observable.Create<int>(o => { var ofn = Observable .FromAsyncPattern<int, int>( fn.BeginInvoke, fn.EndInvoke); var s = new Subject<int>(); var q = s.Select(x => ofn(x)).Switch(); var r = new CompositeDisposable(new IDisposable[] { q.Subscribe(s), s.Subscribe(o), }); s.OnNext(i0); return r; }); 

The iterating function turns into an asynchronous observable.

The variable q supplies values ​​from the subject to the observed iterative function and selects the calculated observable. The Switch method equalizes the result and ensures that every call to the observed iterative function is correctly cleared.

In addition, using CompositeDisposable allows both subscriptions to be deleted as one. Very neat!

It is easily used as follows:

 using (mkInts(7, mkInt).Subscribe(Console.WriteLine)) { Console.ReadLine(); } 

You now have a fully parameterized version of your generator function. Nice, huh?

+3
source

I find the following answers correct but a bit complicated. The only change I suggest is the mkInts method:

 Func<int, Func<int, int>, IObservable<int>> mkInts = (i0, fn) => { var s = new Subject<int>(); s.ObserveOn(Scheduler.NewThread).Select(fn).Subscribe(s); s.OnNext(i0); return s; }; 
+3
source

I was not completely sure that you want to pass the result of the function back to the function again, or if you have a separate function that will receive the next input, so I did both. The trick here is to allow IScheduler to do the hard work of callbacks.

 public Func<T, IObservable<T>> Feedback<T>(Func<T, IObservable<T>> generator, IScheduler scheduler) { return seed => Observable.Create((IObserver<T> observer) => scheduler.Schedule(seed, (current, self) => generator(current).Subscribe(value => { observer.OnNext(value); self(value); }))); } public Func<T, IObservable<T>> GenerateAsync<T>(Func<T, IObservable<T>> generator, Func<T, T> seedTransform, IScheduler scheduler) { return seed => Observable.Create((IObserver<T> observer) => scheduler.Schedule(seed, (current, self) => generator(current).Subscribe(value => { observer.OnNext(value); self(seedTransform(current)); }))); } 
+1
source

I believe the code is not tail recursive and therefore throws an SO exception. Below is the code that works fine without such an exception.

 public static IObservable<int> GetObs(int i) { return Observable.Return(i).Delay(TimeSpan.FromMilliseconds(10)); } public static IObservable<int> MakeInts(int start) { return Observable.Generate(start, _ => true, i => i + 1, i => GetObs(i)) .SelectMany(obs => obs); } using (MakeInts(1).Subscribe(Console.WriteLine)) { Console.ReadLine(); } 

Or by changing your code as follows:

 Action<int, IObserver<int>> mkInt = (i,obs) => Observable.Return(i) .Delay(TimeSpan.FromMilliseconds(10)).Subscribe<int>(ii => obs.OnNext(ii)); // pre-assign my generator function, since the function calls itself recursively Func<int, IObservable<int>> mkInts = null; // my generator function mkInts = init => { var s = new Subject<int>(); var ret = s.Do(i => { mkInt(i + 1, s); }); mkInt(init, s); return ret; }; using (mkInts(1).Subscribe(Console.WriteLine)) { Console.ReadLine(); } 
+1
source

I found a solution that, although it may not be the most beautiful, does what I want. If anyone has a better solution, I will mark this as an answer.

 Func<int, IObservable<int>> mkInt = ts => Observable.Return(ts) .Delay(TimeSpan.FromMilliseconds(10)); Func<int, IObservable<int>> mkInts = init => { Subject<int> subject = new Subject<int>(); IDisposable sub = null; Action<int> onNext = null; onNext = i => { subject.OnNext(i); sub.Dispose(); sub = mkInt(i + 1).Subscribe(onNext); }; sub = mkInt(init).Subscribe(onNext); return subject; }; using (mkInts(1).Subscribe(Console.WriteLine)) { Console.ReadLine(); } 
0
source

All Articles