Why localInit Func is called multiple times in a thread in Parallel.ForEach

I wrote code to process a lot of data, and I thought it would be helpful if Parallel.ForEach would create a file for each thread that it creates, so the output does not need to be synchronized (at least to me).

It looks something like this:

Parallel.ForEach(vals, new ParallelOptions { MaxDegreeOfParallelism = 8 }, ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name (item, state, writer)=> { if(something) { state.Break(); return writer; } List<Result> results = new List<Result>(); foreach(var subItem in item.SubItems) results.Add(ProcessItem(subItem)); if(results.Count > 0) { foreach(var result in results) result.Write(writer); } return writer; }, (writer)=>writer.Dispose()); 

I was expecting up to 8 files to be created and persisted throughout the run time. Then each of them will be Disposed when the entire ForEach call ends. What really happens is that localInit seems to be called once for each element, so I get hundreds of files. In addition, authors are processed at the end of each item being processed.

This shows the same thing:

 var vals = Enumerable.Range(0, 10000000).ToArray(); long sum = 0; Parallel.ForEach(vals, new ParallelOptions { MaxDegreeOfParallelism = 8 }, () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; }, (i, state, common) => { Thread.Sleep(10); return common + i; }, (common) => Interlocked.Add(ref sum, common)); 

I see:

 init 10 init 14 init 11 init 13 init 12 init 14 init 11 init 12 init 13 init 11 ... // hundreds of lines over < 30 seconds init 14 init 11 init 18 init 17 init 10 init 11 init 14 init 11 init 14 init 11 init 18 

Note: if I do not use the Thread.Sleep call, it sometimes seems "correct". localInit is only called once for each of the 4 threads that it decides to use on my computer. Not every time, however.

Is this the desired behavior of the function? What happens behind the scenes that make it do this? And finally, what is a good way to get the desired functionality, ThreadLocal?

This is, by the way, on .NET 4.5.

+7
source share
4 answers

Parallel.ForEach does not work as you think. It is important to note that the method is built on top of the Task classes and that the ratio between Task and Thread not 1: 1 . You can have, for example, 10 tasks that are performed on 2 managed threads.

Try using this line in your test method instead of the current one:

 Console.WriteLine("ThreadId {0} -- TaskId {1} ", Thread.CurrentThread.ManagedThreadId, Task.CurrentId); 

You should see that ThreadId will be reused in many different tasks, showing their unique identifiers. You will see this more if you left or increased your call to Thread.Sleep .

The main idea of ​​how the Parallel.ForEach method works is that it requires your counter to create a series of tasks that will run sections of the enumeration process, since this is done, a lot depends on the input. There is also a special logic that checks the case when a task exceeds a certain number of milliseconds without completion. If this case is true, then a new task can be created to help facilitate the work.

If you looked at the documentation for the localinit function in Parallel.ForEach , you will notice that it says that it returns the initial state of the local data for each _task_ , and not every stream.

You may ask why there are more than 8 tasks. This answer is similar to the last one found in the documentation for ParallelOptions.MaxDegreeOfParallelism .

Changing MaxDegreeOfParallelism by default limits the number of parallel tasks.

This restriction applies only to the number of simultaneous tasks, and not to the hard limit of the number of tasks that will be created during the entire processing time. And, as I mentioned above, there are times when a separate task arises, as a result, your localinit function localinit called several times and writes hundreds of files to disk.

Writing to disk is an operation with a slight delay, especially if you use synchronous I / O. When a disk operation occurs, it blocks the entire stream; the same thing happens with Thread.Sleep . If a Task does this, it blocks the thread in which it is currently running, and no other tasks can run on it. Usually in these cases, the scheduler will create a new Task to help get slack.

And finally, what is a good way to get the desired functionality, ThreadLocal?

The bottom line is that thread locators do not make sense with Parallel.ForEach , because you are not dealing with threads; you are dealing with tasks. A local thread can be shared between tasks because many tasks can use the same thread at the same time. In addition, the local task flow can change the average execution, since the scheduler can prevent it from starting, and then continue its execution in another thread, which will have a different thread.

I'm not sure if this is the best way, but you can rely on the localinit function to pass any resource you need, only allowing you to use the resource in one thread at a time. You can use localfinally to mark it as no longer in use and therefore available for another task. This is what these methods were developed for; each method is called only once for each given task (see the Parallel.ForEach comments section of the MSDN documentation).

You can also split the work yourself and create your own set of threads and do your work. However, in my opinion, this is less, since the Parallel class is already doing this hard climb for you.

+7
source

What you see is an implementation trying to get your work done as quickly as possible.

To do this, he tries to use a different number of tasks to maximize throughput. It captures a certain number of threads from the thread pool and does a bit of your work. He then tries to add and remove threads to find out what will happen. He continues to do this until all your work has been completed.

The algorithm is pretty dumb because it doesn't know if your work uses a lot of CPU or a lot of I / O, or even if there is a lot of synchronization and the threads are blocking each other. All he can do is add and remove threads and measure how quickly each unit of work completes.

This means that it constantly calls your localInit and localFinally , as it injects and deletes streams - this is what you found.

Unfortunately, there is no easy way to control this algorithm. Parallel.ForEach is a high-level construct that intentionally hides most of the flow control code.


Using ThreadLocal may help a little, but it depends on the thread pool reusing the same threads when Parallel.ForEach requests new ones. This is not guaranteed - it is actually unlikely that the thread pool will use exactly 8 threads for the entire call. This means that you will again create more files than necessary.


One thing guaranteeing , is that Parallel.ForEach will never use more than MaxDegreeOfParallelism threads at any given time.

You can take advantage of this by creating a “pool” of fixed-size files that can be reused by any thread running at a specific time. You know that only MaxDegreeOfParallelism threads can be started right away, so you can create this number of files before calling ForEach . Then take one in your localInit and release it in localFinally .

Of course, you will have to write this pool yourself, and it must be thread safe, because it will be called at the same time. However, a simple blocking strategy should be good enough, because threads are not entered and deleted very quickly compared to the cost of blocking.

+2
source

According to MSDN, the localInit method localInit called once for each task , and not for each thread.

The localInit delegator is called once for each task that participates in the loop and returns the initial local state for each of these tasks.

+1
source

localInit is called when a stream is created. if the body takes so long, it must create another thread and pause the current thread, and if it creates another thread, it calls localInit

also when called Parallel.ForEach creates threads as much as the value of MaxDegreeOfParallelism, for example:

 var k = Enumerable.Range(0, 1); Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}..... 

it creates 4 threads when it is first called

-one
source

All Articles