Queue request and thread safety

Thread-Safety is not an aspect of which I am very worried, because the simple applications and libraries that I wrote usually run only in the main thread or do not directly change the properties or fields in any classes that I need to worry about before.

However, I started working on a personal project that I use WebClient to asynchronously download data from a remote server. There is a Queue<Uri> that contains a pre-created URI series queue for loading data.

So, consider the following snippet (this is not my real code, but I hope this illustrates my question:

 private WebClient webClient = new WebClient(); private Queue<Uri> requestQueue = new Queue<Uri>(); public Boolean DownloadNextASync() { if (webClient.IsBusy) return false; if (requestQueue.Count == 0) return false var uri = requestQueue.Dequeue(); webClient.DownloadDataASync(uri); return true; } 

If I understand correctly, this method is not thread safe (assuming that this particular instance of this object is known to multiple threads). My reasoning with WebClient may become busy between the time that IsBusy and the DownloadDataASync() method is called. In addition, requestQueue can become empty between the Count check and when the next item is deleted.

My question is the best way to handle this type of situation in order to make it thread safe?

This is a more abstract question, because I understand for this particular method that there will be an extremely inconvenient moment for this to actually cause the problem, and to cover this case, I could just wrap the method in a suitable try-catch , since both parts threw an exception. But is there any other option? Can the lock statement apply here?

+4
source share
3 answers

If you target .Net 4.0, you can use the task reference library for reference:

 var queue = new BlockingCollection<Uri>(); var maxClients = 4; // Optionally provide another producer/consumer collection for the data // var data = new BlockingCollection<Tuple<Uri,byte[]>>(); // Optionally implement CancellationTokenSource var clients = from id in Enumerable.Range(0, maxClients) select Task.Factory.StartNew( () => { var client = new WebClient(); while (!queue.IsCompleted) { Uri uri; if (queue.TryTake(out uri)) { byte[] datum = client.DownloadData(uri); // already "async" // Optionally pass datum along to the other collection // or work on it here } else Thread.SpinWait(100); } }); // Add URI to search // queue.Add(...); // Notify our clients that we've added all the URI's queue.CompleteAdding(); // Wait for all of our clients to finish clients.WaitAll(); 

To use this approach to indicate progress, you can use TaskCompletionSource<TResult> to control event-based parallelism:

 public static Task<byte[]> DownloadAsync(Uri uri, Action<double> progress) { var source = new TaskCompletionSource<byte[]>(); Task.Factory.StartNew( () => { var client = new WebClient(); client.DownloadProgressChanged += (sender, e) => progress(e.ProgressPercentage); client.DownloadDataCompleted += (sender, e) => { if (!e.Cancelled) { if (e.Error == null) { source.SetResult((byte[])e.Result); } else { source.SetException(e.Error); } } else { source.SetCanceled(); } }; }); return source.Task; } 

Used like this:

 // var urls = new List<Uri>(...); // var progressBar = new ProgressBar(); Task.Factory.StartNew( () => { foreach (var uri in urls) { var task = DownloadAsync( uri, p => progressBar.Invoke( new MethodInvoker( delegate { progressBar.Value = (int)(100 * p); })) ); // Will Block! // data = task.Result; } }); 
+1
source

I highly recommend reading Joseph Albahari's Threading In C #. I looked at it in preparation for my first (erroneous) adventure in flowing, and it is quite comprehensive.

You can read it here: http://www.albahari.com/threading/

+1
source

Both of the thread safety questions you raised are valid. In addition, both WebClient and Queue are documented as unsafe threads (at the bottom of the MSDN documentation). For example, if two threads start simultaneously, they may cause the queue to become internally inconsistent or may lead to insensitive return values. For example, if the implementation of Dequeue () was something like:

 1. var valueToDequeue = this._internalList[this._startPointer]; 2. this._startPointer = (this._startPointer + 1) % this._internalList.Count; 3. return valueToDequeue; 

and two threads, each of which performed line 1, before continuing with line 2, then both will return the same value (there are other potential problems here). This will not necessarily throw an exception, so you should use the lock statement to ensure that there can only be one thread inside the method:

 private readonly object _lock = new object(); ... lock (this._lock) { // body of method } 

You can also block WebClient or Queue if you know that no one will synchronize them.

+1
source

All Articles