The code you wrote is almost ready for parallel observation. If you write your observer as follows:
public class Subscriber : IObserver<int> { public void OnNext(int a) { Console.WriteLine("{0} on {1} at {2}", a, Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString()); } public void OnError(Exception e) { } public void OnCompleted() { } }
Then run this code:
var observable = Observable .Interval(TimeSpan.FromSeconds(1.0)) .Select(x => (int)x) .Take(5) .ObserveOn(Scheduler.ThreadPool); var sub1 = new Subscriber(); var sub2 = new Subscriber(); observable.Subscribe(sub1); observable.Subscribe(sub2); Thread.Sleep(10000);
The following will do:
0 on 28 at 2011/10/20 00:13:49 0 on 16 at 2011/10/20 00:13:49 1 on 29 at 2011/10/20 00:13:50 1 on 22 at 2011/10/20 00:13:50 2 on 27 at 2011/10/20 00:13:51 2 on 29 at 2011/10/20 00:13:51 3 on 27 at 2011/10/20 00:13:52 3 on 19 at 2011/10/20 00:13:52 4 on 27 at 2011/10/20 00:13:53 4 on 27 at 2011/10/20 00:13:53
It already performs subscriptions in parallel on different threads.
The important thing I used was the extension method .ObserveOn is what made this work.
You should keep in mind that observers usually do not have the same instance of observables. Signing to the observable effectively links a unique βchainβ of observable operators from the source of the observable to the observer. This is almost the same as calling GetEnumerator twice in an enumeration, you will not use the same instance of the enumeration, you will get two unique instances.
Now I want to describe what I mean by chain. I want to provide the extracted Reflector.NET code from Observable.Generate and Observable.Where to illustrate this point.
Take this code, for example:
var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x); var ys = xs.Where(x => x % 2 == 0); ys.Subscribe(y => { });
Under the hood, both Generate and Where each create a new instance of the internal Rx class AnonymousObservable<T> . The constructor for AnonymousObservable<T> accepts the delegate Func<IObserver<T>, IDisposable> , which it uses when it receives a Subscribe call.
Slightly cleared code for Observable.Generate<T>(...) from Reflector.NET:
public static IObservable<TResult> Generate<TState, TResult>( TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler) { return new AnonymousObservable<TResult>((IObserver<TResult> observer) => { TState state = initialState; bool first = true; return scheduler.Schedule((Action self) => { bool flag = false; TResult local = default(TResult); try { if (first) { first = false; } else { state = iterate(state); } flag = condition(state); if (flag) { local = resultSelector(state); } } catch (Exception exception) { observer.OnError(exception); return; } if (flag) { observer.OnNext(local); self(); } else { observer.OnCompleted(); } }); }); }
The Action self parameter is a recursive call that iterates the output values. You will notice that nowhere in this code is an observer stored or that values ββare glued to several observers. This code is run once for each new observer.
Slightly cleared code for Observable.Where<T>(...) from Reflector.NET:
public static IObservable<TSource> Where<TSource>( this IObservable<TSource> source, Func<TSource, bool> predicate) { return new AnonymousObservable<TSource>(observer => source.Subscribe(x => { bool flag; try { flag = predicate(x); } catch (Exception exception) { observer.OnError(exception); return; } if (flag) { observer.OnNext(x); } }, ex => observer.OnError(ex), () => observer.OnCompleted)); }
Again, this code does not track multiple observers. It invokes Subscribe , effectively passing its own code as an observer to the underlying source observable.
You should see that in my example above, subscribing to Where creates a subscription to Generate , and therefore this is a chain of observables. In fact, it chains calls to a number of AnonymousObservable objects.
If you have two subscriptions, you have two chains. If you have 1000 subscribers, you have 1000 conversations.
Now, like the side note, even if there are IObservable<T> and IObserver<T> interfaces, you should very rarely implement them in your own classes. Built-in classes and operators handle 99.99% of all cases. This is a bit like IEnumerable<T> - how often do you need to implement this interface yourself?
Let me know if this helps, and if you need further explanation.