How the AsParallel Extension Actually Works

So the topic is the questions.

I get this AsParallel method returns a wrapper ParallelQuery<TSource> that uses the same LINQ keywords, but from System.Linq.ParallelEnumerable instead of System.Linq.Enumerable

This is clear enough, but when I look for decompiled sources, I don’t understand how it works.

Let's start with the simplest extension: the Sum () method. The code:

 [__DynamicallyInvokable] public static int Sum(this ParallelQuery<int> source) { if (source == null) throw new ArgumentNullException("source"); else return new IntSumAggregationOperator((IEnumerable<int>) source).Aggregate(); } 

clear, release the Aggregate() method. This is the wrapper of the InternalAggregate method, which delays some exceptions. Now take a look at that.

 protected override int InternalAggregate(ref Exception singularExceptionToThrow) { using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions?(ParallelMergeOptions.FullyBuffered), true)) { int num = 0; while (enumerator.MoveNext()) checked { num += enumerator.Current; } return num; } } 

and here's the question: how does it work? I do not see any compatibility for the variable changed by many threads, we see only iterator and summation. Is this a magic enumerator? Or how does it work? GetEnumerator() returns a QueryOpeningEnumerator<TOutput> , but the code is too complicated.

+7
c # linq plinq task-parallel-library
source share
3 answers

Finally, in my second PLINQ attack, I found the answer. And that is pretty clear. The problem is that the enumerator is not simple. This is a special multithreading . So how does it work? The answer is that enumerator does not return the next value of the source, it returns the integer sum of the next section. Thus, this code is only executed 2,4,6,8 ... times (based on Environment.ProcessorCount ) when the actual sum operation is performed inside enumerator.MoveNext in the enumerator.OpenQuery method .

Thus, TPL obviosly splits the original list, then sums each section independently, and then pefrorm this summation, see IntSumAggregationOperatorEnumerator<TKey> . There is no magic, you can just dive deeper.

+2
source share

The Sum statement combines all values ​​in a single stream. There is no multithreading. The trick is that multithreading is happening somewhere else.

The PLINQ Sum method can process PLINQ lists. These enumerations can be created using other constructs (for example, where) that allow the collection to be processed over several threads.

The Sum statement is always the last statement in the chain. Although it is possible to process this amount across multiple threads, the TPL team probably found that it adversely affected performance, which is reasonable since the only thing this method should do is simply add the whole.

Thus, this method processes all the results available from other threads, and processes them in one thread and returns this value. The real trick is in other PLINQ extension methods.

+1
source share
 protected override int InternalAggregate(ref Exception singularExceptionToThrow) { using (IEnumerator<int> enumerator = this.GetEnumerator(new ParallelMergeOptions? (ParallelMergeOptions.FullyBuffered), true)) { int num = 0; while (enumerator.MoveNext()) checked { num += enumerator.Current; } return num; } } 

This code will not be executed in parallel, while it will be executed sequentially by its innerscope.

Try this instead

  List<int> list = new List<int>(); int num = 0; Parallel.ForEach(list, (item) => { checked { num += item; } }); 

The internal action will be extended to ThreadPool, and the ForEach statement will complete when all elements are processed.

Here you will need the threads:

  List<int> list = new List<int>(); int num = 0; Parallel.ForEach(list, (item) => { Interlocked.Add(ref num, item); }); 
-2
source share

All Articles