I use C # TPL and I have a problem with the producer / consumer code ... for some reason TPL does not reuse threads and continues to create new ones without stopping
I made a simple example to demonstrate this behavior:
class Program { static BlockingCollection<int> m_Buffer = new BlockingCollection<int>(1); static CancellationTokenSource m_Cts = new CancellationTokenSource(); static void Producer() { try { while (!m_Cts.IsCancellationRequested) { Console.WriteLine("Enqueuing job"); m_Buffer.Add(0); Thread.Sleep(1000); } } finally { m_Buffer.CompleteAdding(); } } static void Consumer() { Parallel.ForEach(m_Buffer.GetConsumingEnumerable(), Run); } static void Run(int i) { Console.WriteLine ("Job Processed\tThread: {0}\tProcess Thread Count: {1}", Thread.CurrentThread.ManagedThreadId, Process.GetCurrentProcess().Threads.Count); } static void Main(string[] args) { Task producer = new Task(Producer); Task consumer = new Task(Consumer); producer.Start(); consumer.Start(); Console.ReadKey(); m_Cts.Cancel(); Task.WaitAll(producer, consumer); } }
This code creates 2 tasks, producer and consumer. Produces adds 1 work item every second, and Consumer only prints a line with information. I would suggest that in this situation one consumer flow is enough, because tasks are processed much faster than they are added to the queue, but in fact it happens that every second number of threads in the process grows by 1 ... as if TPL creates new thread for each item
trying to understand what was going on, I also noticed one more thing: although the size of the BlockingCollection is 1, after a while the consumer begins to receive calls in packets, for example, this is how it starts:
Task assignment
Work Processed Subject: 4 Number of process threads: 9
Task assignment
Work Processed Subject: 6 Number of process threads: 9
Task assignment
Work Processed Subject: 5 Number of process threads: 10
Task assignment
Work Processed Subject: 4 Number of process threads: 10
Task assignment
Work Processed Subject: 6 Number of process threads: 11
and so it processes the elements in less than a minute:
Task assignment
Work Processed Subject: 25 Number of process threads: 52
Task assignment
Task assignment
Work Processed Subject: 5 Number of process threads: 54
Work Processed Subject: 5 Number of process threads: 54
and because the threads are deleted after completing the Parallel.ForEach loop (I do not show it in this example, but it was in a real project) I assumed that it has something to do with ForEach specifically ... I found this artice http: //reedcopsey.com/2010/01/26/parallelism-in-net-part-5-partitioning-of-work/ , and I thought my problem was caused by this separator by default, so I took a custom separator from TPL examples that feed the consumer flows the element one by one, and although it corrected the execution order (got rid of the delay) ...
Task assignment
Work Processed Subject: 71 Number of process threads: 140
Task assignment
Work Processed Subject: 12 Number of process threads: 141
Task assignment
Work Processed Subject: 72 Number of process threads: 142
Task assignment
Work Processed Subject: 38 Number of process threads: 143
Task assignment
Work Processed Subject: 73 Number of process threads: 143
Task assignment
Work Processed Subject: 21 Number of process threads: 144
Task assignment
Work Processed Subject: 74 Number of process threads: 145
... this did not stop the pace of growth
I know about ParallelOptions.MaxDegreeOfParallelism, but I still want to understand what is happening with TPL and why it creates hundreds of threads for no reason.
in my project I have a code that has to work for several hours and read new data from the database, put it in BlockingCollections and have data processed by another code, there is 1 new element every 5 seconds and takes from a few milliseconds to almost a minute to process it, and after working about 10 minutes the number of threads reached 1000 threads