Anyway, C # parallel output

I have several counters that list flatter files. Initially, I had every enumerator in the Parallel Invoke, and each action added to the BlockingCollection<Entity> and that the collections returned the value ConsumingEnumerable ();

 public interface IFlatFileQuery { IEnumerable<Entity> Run(); } public class FlatFile1 : IFlatFileQuery { public IEnumerable<Entity> Run() { // loop over a flat file and yield each result yield return Entity; } } public class Main { public IEnumerable<Entity> DoLongTask(ICollection<IFlatFileQuery> _flatFileQueries) { // do some other stuff that needs to be returned first: yield return Entity; // then enumerate and return the flat file data foreach (var entity in GetData(_flatFileQueries)) { yield return entity; } } private IEnumerable<Entity> GetData(_flatFileQueries) { var buffer = new BlockingCollection<Entity>(100); var actions = _flatFileQueries.Select(fundFileQuery => (Action)(() => { foreach (var entity in fundFileQuery.Run()) { buffer.TryAdd(entity, Timeout.Infinite); } })).ToArray(); Task.Factory.StartNew(() => { Parallel.Invoke(actions); buffer.CompleteAdding(); }); return buffer.GetConsumingEnumerable(); } } 

However, after a little testing, it turns out that changing the code below is 20-25% faster.

 private IEnumerable<Entity> GetData(_flatFileQueries) { return _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run()); } 

The problem with changing the code is that it waits until all requests with a flat file are listed before it returns the entire batch, which can then be listed and provided.

Is it possible to concede in the above bit of code to somehow make it even faster?

I must add that, at a minimum, the combined results of all flat queries can only be 1000 or so.

Edit : Changing it to the following does not affect the execution time. (R # even suggests getting back to how it was)

 private IEnumerable<Entity> GetData(_flatFileQueries) { foreach (var entity in _flatFileQueries.AsParallel().SelectMany(ffq => ffq.Run())) { yield return entity; } } 
+8
c # parallel-processing
source share
3 answers

The problem with changing the code is that it waits until all requests with a flat file are listed before it returns the entire batch, which can then be listed and provided.

Prove that this is not true with a simple example. First, create a TestQuery class that will give a single object after a specified time. Secondly, let it run several test queries in parallel and measure how long it took to give their result.

 public class TestQuery : IFlatFileQuery { private readonly int _sleepTime; public IEnumerable<Entity> Run() { Thread.Sleep(_sleepTime); return new[] { new Entity() }; } public TestQuery(int sleepTime) { _sleepTime = sleepTime; } } internal static class Program { private static void Main() { Stopwatch stopwatch = Stopwatch.StartNew(); var queries = new IFlatFileQuery[] { new TestQuery(2000), new TestQuery(3000), new TestQuery(1000) }; foreach (var entity in queries.AsParallel().SelectMany(ffq => ffq.Run())) Console.WriteLine("Yielded after {0:N0} seconds", stopwatch.Elapsed.TotalSeconds); Console.ReadKey(); } } 

This code prints:

Allowed after 1 second
Allowed after 2 seconds
Exit after 3 seconds

You can see with this output that AsParallel() will give each result as soon as it becomes available, so everything will be fine. Please note that you can get different timings depending on the degree of parallelism (for example, “2s, 5s, 6s” with degree of parallelism 1, which actually makes the whole operation not parallel at all). This output comes from a 4-core machine.

Your long processing is likely to scale with the number of cores if there is no common bottleneck between threads (for example, a shared blocked resource). You might want to review your algorithm to see if there are slow parts that can be improved with tools like dotTrace .

+4
source share

I don’t think there is a red flag in your code. There is no outrageous inefficiency. I think it comes down to a few smaller differences.

PLINQ is very good at handling data streams. Inside, it works more efficiently than adding items to the synchronized list one by one. I suspect your TryAdd calls are the bottleneck because at least two Interlocked operations are required for each call. This can put a huge load on the interprocessor memory bus, since all threads will compete for the same cache line.

PLINQ is cheaper because it does some buffering internally. I am sure that it does not output elements one by one. This is probably their party and amortizes the cost of synchronization on several points.

The second problem will be the limited capacity of the BlockingCollection . 100 is not much. This can lead to high expectations. The wait is expensive because it requires a kernel call and a context switch.

+2
source share

I am making this alternative that works well for me in any scenario:

This works for me:

  • In a job in the Parallel.Foreach Enqueue in ConcurrentQueue, the item is transformed for processing.
  • The task has a sequel that marks the flag with this task completing.
  • In the same thread of execution with tasks ends at the time dequeue and gives

Fast and excellent results for me:

 Task.Factory.StartNew (() => { Parallel.ForEach<string> (TextHelper.ReadLines(FileName), ProcessHelper.DefaultParallelOptions, (string currentLine) => { // Read line, validate and enqeue to an instance of FileLineData (custom class) }); }). ContinueWith ( ic => isCompleted = true ); while (!isCompleted || qlines.Count > 0) { if (qlines.TryDequeue (out returnLine)) { yield return returnLine; } } 
+1
source share

All Articles