Run the asynchronous method 8 times in parallel

How to include the following in Parallel.ForEach?

public async void getThreadContents(String[] threads) { HttpClient client = new HttpClient(); List<String> usernames = new List<String>(); int i = 0; foreach (String url in threads) { i++; progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>(); HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; Predicate<String> userPredicate; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); userPredicate = (String x) => x == user; if (usernames.Find(userPredicate) != user) { usernames.Add(match.Groups[1].ToString()); } } progressBar1.PerformStep(); } } 

I encoded it under the assumption that asynchronous and parallel processing would be the same, and I just realized that it wasn’t. I looked at all the questions that I could find on this, and I really can't find an example that does this for me. Most of them do not have readable variable names. Using single-letter variable names that do not explain what they contain is a terrible way to provide an example.

I usually have 300 to 2000 entries in an array called threads (Contains the URL for forum threads), and it would seem that parallel processing (due to the many HTTP requests) will speed up execution).

Do I have to remove all asynchrony (I did not get asynchronous going beyond foreach, only variable definitions) before I can use Parallel.ForEach? How can I do it? Can I do this without blocking the main thread?

I am using .NET 4.5, by the way.

+6
source share
4 answers

I encoded it under the assumption that asynchronous and parallel processing would be the same

Asynchronous processing and parallel processing are completely different. If you don't understand the difference, I think you should first read about it (for example, what is the relationship between asynchronous and parallel programming in C #? ).

Now what you want to do is actually not that simple, because you want to process a large collection asynchronously with a certain degree of parallelism (8). With synchronous processing, you can use Parallel.ForEach() (along with ParallelOptions to adjust the degree of parallelism), but there is no simple alternative that will work with async .

In your code, this is complicated by the fact that you expect everything to run in the user interface thread. (Although ideally, you should not access the user interface directly from your calculations. Instead, you should use IProgress , which would mean that the code should no longer be executed in the user interface thread.)

Probably the best way to do this in .Net 4.5 is to use the TPL data stream. Its ActionBlock does exactly what you want, but it can be quite verbose (because it is more flexible than what you need). Therefore, it makes sense to create a helper method:

 public static Task AsyncParallelForEach<T>( IEnumerable<T> source, Func<T, Task> body, int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded, TaskScheduler scheduler = null) { var options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = maxDegreeOfParallelism }; if (scheduler != null) options.TaskScheduler = scheduler; var block = new ActionBlock<T>(body, options); foreach (var item in source) block.Post(item); block.Complete(); return block.Completion; } 

In your case, you will use it as follows:

 await AsyncParallelForEach( threads, async url => await DownloadUrl(url), 8, TaskScheduler.FromCurrentSynchronizationContext()); 

Here DownloadUrl() is an async Task method that processes a single URL (the body of your loop), 8 is a degree of parallelism (probably should not be a constant in real code) and FromCurrentSynchronizationContext() provides code execution in the user interface stream.

+6
source

Stephen Tuub has a good blog post on the implementation of ForEachAsync . Svick's answer is not bad for platforms on which data flow is available.

Here's an alternative using the delimiter from TPL:

 public static Task ForEachAsync<T>(this IEnumerable<T> source, int degreeOfParallelism, Func<T, Task> body) { var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism); var tasks = partitions.Select(async partition => { using (partition) while (partition.MoveNext()) await body(partition.Current); }); return Task.WhenAll(tasks); } 

Then you can use this as such:

 public async Task getThreadContentsAsync(String[] threads) { HttpClient client = new HttpClient(); ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>(); await threads.ForEachAsync(8, async url => { HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); usernames.TryAdd(user, null); } progressBar1.PerformStep(); }); } 
+6
source

Another alternative: nofollow -> t20> / a> (which is included in my AsyncEx library and supports a lot more platforms than SemaphoreSlim ):

 public async Task getThreadContentsAsync(String[] threads) { SemaphoreSlim semaphore = new SemaphoreSlim(8); HttpClient client = new HttpClient(); ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>(); await Task.WhenAll(threads.Select(async url => { await semaphore.WaitAsync(); try { HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); usernames.TryAdd(user, null); } progressBar1.PerformStep(); } finally { semaphore.Release(); } })); } 
+2
source

You can try the ParallelForEachAsync extension method from the AsyncEnumerator NuGet Package :

 using System.Collections.Async; public async void getThreadContents(String[] threads) { HttpClient client = new HttpClient(); List<String> usernames = new List<String>(); int i = 0; await threads.ParallelForEachAsync(async url => { i++; progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>(); HttpResponseMessage response = await client.GetAsync(url); String content = await response.Content.ReadAsStringAsync(); String user; Predicate<String> userPredicate; foreach (Match match in regex.Matches(content)) { user = match.Groups[1].ToString(); userPredicate = (String x) => x == user; if (usernames.Find(userPredicate) != user) { usernames.Add(match.Groups[1].ToString()); } } // THIS CALL MUST BE THREAD-SAFE! progressBar1.PerformStep(); }, maxDegreeOfParallelism: 8); } 
0
source

All Articles