MultiThreading: limiting concurrent threads

I need to develop an application that uses multithreading.

Basically, I have a DataTable that contains about 200 thousand rows. From each row, I need to take a field, compare it with a web page, and then remove it from the data.

The fact is that the server serving these pages has a limit on simultaneous requests. therefore at max. I can request 3 pages at the same time.

I want to do this using threadpool, I even managed to create a simple application that does this (blocks data) but I could not limit parallel threads (even using SetMaxThreads), it looks like it just ignored the limit.

Does anyone have something ready that does something like this? I would like to see.

I tried using semaphores but got into problems:

static SemaphoreSlim _sem = new SemaphoreSlim(3); // Capacity of 3 static List<string> records = new List<string>(); static void Main() { records.Add("aaa"); records.Add("bbb"); records.Add("ccc"); records.Add("ddd"); records.Add("eee"); records.Add("fff"); records.Add("ggg"); records.Add("iii"); records.Add("jjj"); for (int i = 0; i < records.Count; i++ ) { new Thread(ThreadJob).Start(records[i]); } Console.WriteLine(records.Count); Console.ReadLine(); } static void ThreadJob(object id) { Console.WriteLine(id + " wants to enter"); _sem.Wait(); Console.WriteLine(id + " is in!"); // Only three threads //Thread.Sleep(1000 * (int)id); // can be here at Console.WriteLine(id + " is leaving"); // a time. lock (records) { records.Remove((string)id); } _sem.Release(); } 

this works pretty well, the only problem is:

 Console.WriteLine(records.count); 

returns excellent results. even due to the fact that I understand that this is happening, since not all threads are completed (im calling record.count before all records have been deleted), I could not find how to wait for all to finish.

+4
source share
4 answers

To wait for multiple threads to finish, you can use several EventWaitHandle and then call WaitHandle.WaitAll to block the main thread while all events are signaled:

 // we need to keep a list of synchronization events var finishEvents = new List<EventWaitHandle>(); for (int i = 0; i < records.Count; i++ ) { // for each job, create an event and add it to the list var signal = new EventWaitHandle(false, EventResetMode.ManualReset); finishEvents.Add(signal); // we need to catch the id in a separate variable // for the closure to work as expected var id = records[i]; var thread = new Thread(() => { // do the job ThreadJob(id); // signal the main thread signal.Set(); }); } WaitHandle.WaitAll(finishEvents.ToArray()); 

Since most of these threads will be suspended in most cases, it would be better to use ThreadPool in this case, so you can replace the new Thread with:

  ThreadPool.QueueUserWorkItem(s => { ThreadJob(id); signal.Set(); }); 

When you are finished with events, do not forget to delete them:

 foreach (var evt in finishEvents) { evt.Dispose(); } 

[change]

To put it all in one place, here is what your sample code looks like:

 static Semaphore _sem = new Semaphore(3, 3); // Capacity of 3 static List<string> _records = new List<string>(new string[] { "aaa", "bbb", "ccc", "ddd", "eee", "fff", "ggg", "hhh" }); static void Main() { var finishEvents = new List<EventWaitHandle>(); for (int i = 0; i < _records.Count; i++) { var signal = new EventWaitHandle(false, EventResetMode.ManualReset); finishEvents.Add(signal); var id = _records[i]; var t = new Thread(() => { ThreadJob(id); signal.Set(); }); t.Start(); } WaitHandle.WaitAll(finishEvents.ToArray()); Console.WriteLine(_records.Count); Console.ReadLine(); } static void ThreadJob(object id) { Console.WriteLine(id + " wants to enter"); _sem.WaitOne(); Console.WriteLine(id + " is in!"); Thread.Sleep(1000); Console.WriteLine(id + " is leaving"); lock (_records) { _records.Remove((string)id); } _sem.Release(); } 

(note that I used Semaphore instead of SemaphoreSlim because I don't have .NET 4 on this computer and I wanted to test the code before updating the response)

+2
source

Why not use parallel extensions - this will simplify the work.

Anyway, what you probably want to see is like Semaphores. I wrote a blog post on this topic in a month or two, which may seem useful to you: https://colinmackay.scot/2011/03/30/using-semaphores-to-restrict-access-to-resources/

+1
source

you can use semaphore if you are under .net 3.5

or

SemaphoreSlim . net 4.0

0
source

Firstly, Console.WriteLine follows (id + "leaves"); not to be a bit later, after locking and just before he releases the semaphore?

Regarding the actual wait for all threads to complete, Groo's answer looks better and more reliable in the long run, but as a faster / simpler solution to this particular piece of code, I think you can also get away with just calling .Join () for all threads that you want to expect consistently.

 static List<Thread> ThreadList = new List<Thread>(); // To keep track of them 

then when starting the threads, replace the current new Thread line as follows:

 ThreadList.Add(new Thread(ThreadJob).Start(records[i])); 

and then immediately before Console.WriteLine:

 foreach( Thread t in ThreadList ) { t.Join(); } 

This will close if none of the threads terminate, and if you ever want to know which threads have not finished, this method will not work.

0
source

All Articles