Is it possible to call OnNexts subscribers to different threads in Rx?

I am new to Rx. I want to know if it is possible to send messages to different subscribers so that they run in different threads? How can an IObserable manage this? The usual implementation of an object, as I understand it, calls subscribers one by one in the same thread.


public class Subsciber : IObserver<int> { public void OnNext(int a) { // Do something } public void OnError(Exception e) { // Do something } public void OnCompeleted() { } } public static class Program { public void static Main() { var observable = new <....SomeClass....>(); var sub1 = new Subscriber(); var sub2 = new Subscriber(); observable.Subscribe(sub1); observable.Subscribe(sub2); // some waiting function } } 

If I use Subject as "SomeClass", then sub2 OnNext () will not be called until sub1 OnNext () completes. If sub1 takes a long time, I don't want it to delay sub2 reception. Can someone tell me how Rx allows such an implementation for SomeClass.

+3
source share
2 answers

The code you wrote is almost ready for parallel observation. If you write your observer as follows:

 public class Subscriber : IObserver<int> { public void OnNext(int a) { Console.WriteLine("{0} on {1} at {2}", a, Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString()); } public void OnError(Exception e) { } public void OnCompleted() { } } 

Then run this code:

 var observable = Observable .Interval(TimeSpan.FromSeconds(1.0)) .Select(x => (int)x) .Take(5) .ObserveOn(Scheduler.ThreadPool); var sub1 = new Subscriber(); var sub2 = new Subscriber(); observable.Subscribe(sub1); observable.Subscribe(sub2); Thread.Sleep(10000); 

The following will do:

 0 on 28 at 2011/10/20 00:13:49 0 on 16 at 2011/10/20 00:13:49 1 on 29 at 2011/10/20 00:13:50 1 on 22 at 2011/10/20 00:13:50 2 on 27 at 2011/10/20 00:13:51 2 on 29 at 2011/10/20 00:13:51 3 on 27 at 2011/10/20 00:13:52 3 on 19 at 2011/10/20 00:13:52 4 on 27 at 2011/10/20 00:13:53 4 on 27 at 2011/10/20 00:13:53 

It already performs subscriptions in parallel on different threads.

The important thing I used was the extension method .ObserveOn is what made this work.

You should keep in mind that observers usually do not have the same instance of observables. Signing to the observable effectively links a unique β€œchain” of observable operators from the source of the observable to the observer. This is almost the same as calling GetEnumerator twice in an enumeration, you will not use the same instance of the enumeration, you will get two unique instances.

Now I want to describe what I mean by chain. I want to provide the extracted Reflector.NET code from Observable.Generate and Observable.Where to illustrate this point.

Take this code, for example:

 var xs = Observable.Generate(0, x => x < 10, x => x + 1, x => x); var ys = xs.Where(x => x % 2 == 0); ys.Subscribe(y => { /* produces 0, 2, 4, 6, 8 */ }); 

Under the hood, both Generate and Where each create a new instance of the internal Rx class AnonymousObservable<T> . The constructor for AnonymousObservable<T> accepts the delegate Func<IObserver<T>, IDisposable> , which it uses when it receives a Subscribe call.

Slightly cleared code for Observable.Generate<T>(...) from Reflector.NET:

 public static IObservable<TResult> Generate<TState, TResult>( TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler) { return new AnonymousObservable<TResult>((IObserver<TResult> observer) => { TState state = initialState; bool first = true; return scheduler.Schedule((Action self) => { bool flag = false; TResult local = default(TResult); try { if (first) { first = false; } else { state = iterate(state); } flag = condition(state); if (flag) { local = resultSelector(state); } } catch (Exception exception) { observer.OnError(exception); return; } if (flag) { observer.OnNext(local); self(); } else { observer.OnCompleted(); } }); }); } 

The Action self parameter is a recursive call that iterates the output values. You will notice that nowhere in this code is an observer stored or that values ​​are glued to several observers. This code is run once for each new observer.

Slightly cleared code for Observable.Where<T>(...) from Reflector.NET:

 public static IObservable<TSource> Where<TSource>( this IObservable<TSource> source, Func<TSource, bool> predicate) { return new AnonymousObservable<TSource>(observer => source.Subscribe(x => { bool flag; try { flag = predicate(x); } catch (Exception exception) { observer.OnError(exception); return; } if (flag) { observer.OnNext(x); } }, ex => observer.OnError(ex), () => observer.OnCompleted)); } 

Again, this code does not track multiple observers. It invokes Subscribe , effectively passing its own code as an observer to the underlying source observable.

You should see that in my example above, subscribing to Where creates a subscription to Generate , and therefore this is a chain of observables. In fact, it chains calls to a number of AnonymousObservable objects.

If you have two subscriptions, you have two chains. If you have 1000 subscribers, you have 1000 conversations.

Now, like the side note, even if there are IObservable<T> and IObserver<T> interfaces, you should very rarely implement them in your own classes. Built-in classes and operators handle 99.99% of all cases. This is a bit like IEnumerable<T> - how often do you need to implement this interface yourself?

Let me know if this helps, and if you need further explanation.

+7
source

If you have an IObservable , and you need to make the subscription work on another thread, you can use the ObserveOn function.

If you run the code below, it will make the number generator work in different thread contexts. You can also use the EventLoopScheduler and specify the System.Thread that you want to use, set the priority, set the name, etc.

 void Main() { var numbers = Observable.Interval(TimeSpan.FromMilliseconds(100)); var disposable = new CompositeDisposable() { numbers.ObserveOn(Scheduler.TaskPool).Subscribe(x=> Console.WriteLine("TaskPool: "+ Thread.CurrentThread.ManagedThreadId)), numbers.ObserveOn(Scheduler.ThreadPool).Subscribe(x=> Console.WriteLine("ThreadPool: "+ Thread.CurrentThread.ManagedThreadId)), numbers.ObserveOn(Scheduler.Immediate).Subscribe(x=> Console.WriteLine("Immediate: "+ Thread.CurrentThread.ManagedThreadId)) }; Thread.Sleep(1000); disposable.Dispose(); } 

Exit

 Immediate: 10 ThreadPool: 4 TaskPool: 20 TaskPool: 4 ThreadPool: 24 Immediate: 27 Immediate: 10 TaskPool: 24 ThreadPool: 27 Immediate: 24 TaskPool: 26 ThreadPool: 20 Immediate: 26 ThreadPool: 24 TaskPool: 27 Immediate: 28 ThreadPool: 27 TaskPool: 26 Immediate: 10 

Notice how I used CompositeDisposable to place all subscribers at the end. If you do not do this in LinqPad, for example. Observable.Interval will continue to run in memory until you kill the process.

+1
source

Source: https://habr.com/ru/post/924084/


All Articles