The growth in the number of threads when using a parallel task library

I use C # TPL and I have a problem with the producer / consumer code ... for some reason TPL does not reuse threads and continues to create new ones without stopping

I made a simple example to demonstrate this behavior:

class Program { static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1); static CancellationTokenSource m_Cts = new CancellationTokenSource(); static void Producer() { try { while (!m_Cts.IsCancellationRequested) { Console.WriteLine("Enqueuing job"); m_Buffer.Add(0); Thread.Sleep(1000); } } finally { m_Buffer.CompleteAdding(); } } static void Consumer() { Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run); } static void Run(int i) { Console.WriteLine ("Job Processed\tThread: {0}\tProcess Thread Count: {1}", Thread.CurrentThread.ManagedThreadId, Process.GetCurrentProcess().Threads.Count); } static void Main(string[] args) { Task producer = new Task(Producer); Task consumer = new Task(Consumer); producer.Start(); consumer.Start(); Console.ReadKey(); m_Cts.Cancel(); Task.WaitAll(producer, consumer); } } 

This code creates 2 tasks, producer and consumer. Produces adds 1 work item every second, and Consumer only prints a line with information. I would suggest that in this situation one consumer flow is enough, because tasks are processed much faster than they are added to the queue, but in fact it happens that every second number of threads in the process grows by 1 ... as if TPL creates new thread for each item

trying to understand what was going on, I also noticed one more thing: although the size of the BlockingCollection is 1, after a while the consumer begins to receive calls in packets, for example, this is how it starts:

Task assignment

Work Processed Subject: 4 Number of process threads: 9

Task assignment

Work Processed Subject: 6 Number of process threads: 9

Task assignment

Work Processed Subject: 5 Number of process threads: 10

Task assignment

Work Processed Subject: 4 Number of process threads: 10

Task assignment

Work Processed Subject: 6 Number of process threads: 11

and so it processes the elements in less than a minute:

Task assignment

Work Processed Subject: 25 Number of process threads: 52

Task assignment

Task assignment

Work Processed Subject: 5 Number of process threads: 54

Work Processed Subject: 5 Number of process threads: 54

and because the threads are deleted after completing the Parallel.ForEach loop (I do not show it in this example, but it was in a real project) I assumed that it has something to do with ForEach specifically ... I found this artice http: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/ , and I thought my problem was caused by this separator by default, so I took a custom separator from TPL examples that feed the consumer flows the element one by one, and although it corrected the execution order (got rid of the delay) ...

Task assignment

Work Processed Subject: 71 Number of process threads: 140

Task assignment

Work Processed Subject: 12 Number of process threads: 141

Task assignment

Work Processed Subject: 72 Number of process threads: 142

Task assignment

Work Processed Subject: 38 Number of process threads: 143

Task assignment

Work Processed Subject: 73 Number of process threads: 143

Task assignment

Work Processed Subject: 21 Number of process threads: 144

Task assignment

Work Processed Subject: 74 Number of process threads: 145

... this did not stop the pace of growth

I know about ParallelOptions.MaxDegreeOfParallelism, but I still want to understand what is happening with TPL and why it creates hundreds of threads for no reason.

in my project I have a code that has to work for several hours and read new data from the database, put it in BlockingCollections and have data processed by another code, there is 1 new element every 5 seconds and takes from a few milliseconds to almost a minute to process it, and after working about 10 minutes the number of threads reached 1000 threads

+8
multithreading c # parallel-processing task-parallel-library producer-consumer
source share
1 answer

There are two things that together cause this behavior:

  • ThreadPool trying to use the optimal number of threads for your situation. But if one of the threads in the pool blocks, the pool sees it as if this thread were not doing any useful work, and therefore it will soon create another thread. This means that if you have a lot of blocking, ThreadPool really does not guess the optimal number of threads, and it seeks to create new threads until it reaches the limit.

  • Parallel.ForEach() trusts ThreadPool guess the correct number of threads, unless you specify the maximum number of threads explicitly. Parallel.ForEach() also mainly intended for limited collections, not data streams.

When you combine these two things with GetConsumingEnumerable() , you get what Parallel.ForEach() creates threads that are almost always blocked. ThreadPool sees this and, trying to keep the processor in use, creates more and more threads.

The correct solution here is to install MaxDegreeOfParallelism . If your calculations are CPU related, the best value is likely Environment.ProcessorCount . If they are related to IO, you will need to find the optimal value experimentally.

Another option, if you can use .Net 4.5, is to use the TPL data stream. This library was specifically designed to handle data streams, just like yours, so it has no problems with your code. It is actually even better than this, and does not use any threads at all when it is not currently processing anything.

Note. There is also a good reason why a new thread is created for each new element, but explaining this I will need to explain how Parallel.ForEach() works in more detail, and I feel that it is not necessary here.

+6
source share

All Articles