Multithreading implementation in C # (code review)

Hey.

I am trying to implement some multithreaded code in an application. The purpose of this code is to check the elements that the database provides. Validation can take some time (from several hundred ms to several seconds), so this process needs to be forked into its own stream for each element.

The database can give it 20 or 30 positions per second, but it starts to decline rapidly, and ultimately it reaches about 65 thousand elements within 24 hours, after which the application exits.

I would like for someone more knowledgeable to look into my code and see if there are any obvious problems. No one I work with knows multithreading, so I'm really just one on this.

Here is the code. This is pretty long, but should be pretty clear. Let me know if you have any feedback or tips. Thanks!

public class ItemValidationService { /// <summary> /// The object to lock on in this class, for multithreading purposes. /// </summary> private static object locker = new object(); /// <summary>Items that have been validated.</summary> private HashSet<int> validatedItems; /// <summary>Items that are currently being validated.</summary> private HashSet<int> validatingItems; /// <summary>Remove an item from the index if its links are bad.</summary> /// <param name="id">The ID of the item.</param> public void ValidateItem(int id) { lock (locker) { if ( !this.validatedItems.Contains(id) && !this.validatingItems.Contains(id) ){ ThreadPool.QueueUserWorkItem(sender => { this.Validate(id); }); } } } // method private void Validate(int itemId) { lock (locker) { this.validatingItems.Add(itemId); } // ********************************************* // Time-consuming routine to validate an item... // ********************************************* lock (locker) { this.validatingItems.Remove(itemId); this.validatedItems.Add(itemId); } } // method } // class 
+6
multithreading c #
source share
8 answers

A thread pool is a convenient choice if you have sporadic light weight processing that is not time sensitive. However, I remember reading the MSDN that it is not suitable for large-scale processing of this kind.

I used it for something very similar to this and regret it. In subsequent applications, I took the work thread approach and are much happier with the level of control that I have.

My favorite pattern in the workflow model is to create a main thread that contains a queue of task elements. Then deploy a bunch of workers who exit this queue for processing. I use a blocking queue, so when there are no elements in it, workers simply block until something is pressed into the queue. In this model, the main thread creates work items from some source (db, etc.), and work threads consume them.

+4
source share

Secondly, the idea of ​​using blocking queues and worker threads. Here is an implementation of the blocking queue that I used in the past with good results: http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

What is related to your validation logic? If this is mainly related to the processor, I would create no more than 1 workflow per processor / core per box. This will tell you the number of processors: Environment.ProcessorCount

If your check includes I / O, such as file access or database access, you can use multiple threads than the number of processors.

+2
source share

Be careful QueueUserWorkItem may not work

+1
source share

There is a possible logical error in the code posted with the question, depending on where the identifier of the element comes from in ValidateItem(int id) . What for? Because, although you correctly block your validatorsItems and validatedItems before overturning a work item, you do not add the item to the validatingItems queue until a new thread starts. This means that there may be a period of time when another thread calls ValidateItem(id) with the same identifier (if this does not work in one main thread).

I would add the element to the validatingItems queue immediately before the element's queue inside the lock.

Edit: QueueUserWorkItem() returns bool, so you should use the return value to make sure the item is queued, and THEN add it to the validatingItems queue.

+1
source share

ThreadPool may not be optimal for getting stuck in it right away. You can explore the upper limits of your possibilities and / or minimize your own.

In addition, there is a race condition that exists in your code if you do not expect re-checks. Call

 this.validatingItems.Add(itemId); 

should happen in the main thread (ValidateItem), and not in the thread pool thread (validation method). This call must have a line before the work item queue in the pool.

Worst error found without checking QueueUserWorkItem return. The lineup may fail, and why it does not throw an exception is a mystery to us all. If it returns false, you need to remove the element that has been added to the validatingItems list and handle the error (probably throw an exception).

+1
source share

I will worry about performance here. You indicated that the database can give it 20-30 units per second, and the item can take up to several seconds to check. This can be quite a large number of threads - using your metrics, the worst threads are 60-90! I think you need to review the design here. Michael mentioned a good sample. Using a queue really helps keep things in control and organized. You can also use the semaphore to control the number of threads created - that is, you could have the maximum number of allowed threads, but with less workloads you would not have to create the maximum number if a smaller number ended up doing the work - - i.e. Your own pool size can be dynamic with a cap.

When using a thread pool, I also find it difficult to control the execution of threads from the pool when doing work. Therefore, if it does not work and does not forget, I stand for more controlled execution. I know that you mentioned that your application exits after all 65K items are complete. How do you control your threads to determine if they have completed their work, i.e. All workers are in line. Do you control the state of all elements in a HashSet? I think that by queuing your things and your own workflows will destroy this queue, you can get more control. Although, this may be associated with high costs in terms of signaling between threads to indicate when all items are queued, allowing them to exit.

0
source share

You can also try using CCR - Concurrency and runtime coordination. He has buried himself in Microsoft Robotics Studio, but provides an excellent API for this kind of thing.

You just need to create a β€œPort” (essentially a queue), connect a receiver (a method that is called when something is sent to it), and then send work items to it. CCR processes the queue and workflow to start it.

Here's a video on Channel9 about CCR.

It is very high-performance and is even used for non-robotics (Myspace.com uses it behind the scenes for its content delivery network).

0
source share

I would recommend looking at MSDN: a parallel task library - DataFlow . You can find examples of the Producer-Consumer implementation, in your case these will be the items in the producing database for verification, and the verification routine will become consumer .

We also recommend using ConcurrentDictionary<TKey, TValue> as a "simultaneous" hash set, where you simply fill in the keys with no values ​​:). You can make your lock-free code.

0
source share

All Articles