NewThreadScheduler.Default plans work in one thread

I am currently trying to wrap my head around concurrency using RX.NET and confusing something. I want to run four relatively slow tasks in parallel, so I suggested that NewThreadScheduler.Default would be the way to go, since it "Represents an object that schedules each unit of work in a separate thread."

Here is my installation code:

  static void Test() { Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId); var query = Enumerable.Range(1, 4); var obsQuery = query.ToObservable(NewThreadScheduler.Default); obsQuery.Subscribe(DoWork, Done); Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId); } static void DoWork(int i) { Thread.Sleep(500); Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId); } static void Done() { Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId); } 

I assumed that "X Thread Y" displays a different thread identifier each time, however the actual output is:

 Starting. Thread 1 Last line. Thread 1 1 Thread 3 2 Thread 3 3 Thread 3 4 Thread 3 Done. Thread 3 

All work is done in the same new thread in sequential order, which I did not expect.

I suppose I'm missing something, but I can't figure that out.

+7
c # concurrency system.reactive
source share
1 answer

There are two parts to the observed query: Query and Subscription . (This is also the difference between ObserveOn and SubscribeOn.)

Your Query is

 Enumerable .Range(1, 4) .ToObservable(NewThreadScheduler.Default); 

This creates an observable value that produces the NewThreadScheduler default NewThreadScheduler for this system.

Your subscription

 obsQuery.Subscribe(DoWork, Done); 

This is done by DoWork for each value created by Query and Done when the Query ends with an OnComplete call. I don’t think that there are any guarantees as to which thread the functions in the subscription method will be called on, in practice, if all the request values ​​are created in the same thread, which is the thread on which the subscription will be performed. It seems that they also make sure that all subscription calls are made in the same thread, which is most likely done in order to get rid of many common multithreading errors.

So you have two problems: one with your logging if you change your Query to

 Enumerable .Range(1, 4) .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId); .ToObservable(NewThreadScheduler.Default); 

You will see each value received in a new stream.

Another issue is one of the intent and design of the Rx. He suggested that Query is a lengthy process, and Subscription is a short method that processes results. If you want to run a long-running function like Rx Observable, your best option is to use Observable.ToAsync .

+9
source share

All Articles