How to write stream extension function for PLINQ?

Does anyone know how to write an extension function that returns ParallelQuery in PLINQ?

In particular, I have the following problem: I want to perform a conversion in a PLINQ query that needs an engine that is expensive to create and that cannot be accessed simultaneously.

I could do the following:

var result = source.AsParallel ().Select ( (i) => { var e = new Engine (); return e.Process(i); } ) 

Here, the engine is created once for each element, which is too expensive.

I want the engine to be created once per thread.

With Aggregate, I can get closer to what I want, with something like

 // helper class: engine to use plus list of results obtained in thread so far class EngineAndResults { public Engine engine = null; public IEnumerable<ResultType> results; } var result = source.AsParallel ().Aggregate ( // done once per block of items (=thread), // returning an empty list, but a new engine () => new EngineAndList () { engine = new Engine (), results = Enumerable.Empty<ResultType> () }, // we process a new item and put it to the thread-local list, // preserving the engine for further use (engineAndResults, item) => new EngineAndResults () { engine = engineAndResults.engine, results = Enumerable.Concat ( engineAndResults.results, new ResultType [] { engineAndResults.engine.Process (item) } ) }, // tell linq how to aggregate across threads (engineAndResults1, engineAndResults2) => new EngineAndResults () { engine = engineAndResults1.engine, results = Enumerable.Concat (engineAndResults1.results, engineAndResults2.results) }, // after all aggregations, how do we come to the result? engineAndResults => engineAndResults.results ); 

As you can see, I am not using the battery correctly to transfer the engine to the stream. The problem here is that PLINQ at the end combines the results into one IEnumerable, which causes the threads to sync. This is not very nice if I want to add another PLINQ extension.

I would appreciate something like

  var result = source.AsParallel () .SelectWithThreadwiseInitWhichIAmLookingFor ( () => new Engine (), (engine, item) => engine.Process (item) ) 

Does anyone know how to achieve this?

+8
c # plinq
source share
1 answer

You can use ThreadLocal<T> for this. Something like:

 var engine = new ThreadLocal<Engine>(() => new Engine()); var result = source.AsParallel() .Select(item => engine.Value.Process(item)); 
+5
source share

All Articles