Posted in:

Suppose in C# we have a number of tasks to perform that we're currently doing sequentially, but would like to speed up by running them in parallel. As a trivial example, imagine we're downloading a bunch of web pages like this:

var urls = new [] { 
        "https://github.com/naudio/NAudio", 
        "https://twitter.com/mark_heath", 
        "https://github.com/markheath/azure-functions-links",
        "https://pluralsight.com/authors/mark-heath",
        "https://github.com/markheath/advent-of-code-js",
        "http://stackoverflow.com/users/7532/mark-heath",
        "https://mvp.microsoft.com/en-us/mvp/Mark%20%20Heath-5002551",
        "https://github.com/markheath/func-todo-backend",
        "https://github.com/markheath/typescript-tetris",
};
var client = new HttpClient();
foreach(var url in urls)
{
    var html = await client.GetStringAsync(url);
    Console.WriteLine($"retrieved {html.Length} characters from {url}");
}

To parallelize this, we could just turn every single download into a separate Task with Task.Run and wait for them all to complete, but what if we wanted to limit the number of concurrent downloads? Let's say we only want 4 downloads to happen at a time.

In this trivial example, the exact number might not matter too much, but it's not hard to imagine a situation in which you would want to avoid too many concurrent calls to a downstream service.

In this post I'll look at four different ways of solving this problem.

Technique 1 - ConcurrentQueue

The first technique has been my go-to approach for many years. The basic idea is to put the work onto a queue and then have multiple threads reading off that queue. This is a nice simple approach but it does require us to remember to lock the queue as it will be accessed by multiple threads. In this example I'm using ConcurrentQueue to give us thread safety.

We fill the queue with all the urls to download, and then start one Task for each thread that simply sits in a loop trying to read from the queue, and exits when there are no more items left in the queue. We put each of these queue reader tasks in a list and then use Task.WhenAll to wait for them all to exit, which will happen once the final download has completed.

var maxThreads = 4;
var q = new ConcurrentQueue<string>(urls);
var tasks = new List<Task>();
for(int n = 0; n < maxThreads; n++)
{
    tasks.Add(Task.Run(async () => {
        while(q.TryDequeue(out string url)) 
        {
            var html = await client.GetStringAsync(url);
            Console.WriteLine($"retrieved {html.Length} characters from {url}");
        }
    }));
}
await Task.WhenAll(tasks);

I still like this approach as its conceptually simple. But it can be a bit of a pain if we are still generating more work to do while we've started processing work as the reader threads could exit too early.

Technique 2 - SemaphoreSlim

Another approach (inspired by this StackOverflow answer) to use a SemaphoreSlim with an initialCount equal to the maximum number of threads. Then you use WaitAsync to wait until it's OK to queue up another. So immediately we kick off four tasks, but then have to wait for the first of those to finish before we get past WaitAsync to add the next.

var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: maxThreads);
foreach (var url in urls)
{
    await throttler.WaitAsync();
    allTasks.Add(
        Task.Run(async () =>
        {
            try
            {
                var html = await client.GetStringAsync(url);
                Console.WriteLine($"retrieved {html.Length} characters from {url}");
            }
            finally
            {
                throttler.Release();
            }
        }));
}
await Task.WhenAll(allTasks);

The code here is a bit more verbose than the ConcurrentQueue approach and also ends up with a potentially huge list containing mostly completed Tasks, but this approach does have an advantage if you are generating the tasks to be completed at the same time you are executing them.

For example, to upload a large file to Azure blob storage you might read 1MB chunks sequentially, but want to upload up to four in parallel. You don't want to read all the chunks in advance of uploading them as that uses a lot of time and memory before we can even start uploading. With this approach we can generate the work to be done just in time, as threads become available for uploading which is more efficient.

Technique 3 - Parallel.ForEach

The Parallel.ForEach method at first appears to be the perfect solution to this problem. You can simply specify the MaxDegreeOfParallelism and then provide an Action to perform on each item in your IEnumerable:

var options = new ParallelOptions() { MaxDegreeOfParallelism = maxThreads };
Parallel.ForEach(urls, options, url =>
    {
        var html = client.GetStringAsync(url).Result;
        Console.WriteLine($"retrieved {html.Length} characters from {url}");
    });

Looks nice and simple doesn't it? However, there is a nasty gotcha here. Because Parallel.ForEach takes an Action, not a Func<T> it should only be used to call synchronous functions. You might notice we've ended up putting a .Result after GetStringAsync which is a dangerous antipattern.

So unfortunately, this method should only be used if you have a synchronous method you want to perform in parallel. There is a NuGet package that implements an asynchronous version of Parallel.ForEach so you could try that if you'd like to write something like this instead:

await uris.ParallelForEachAsync(
    async url =>
    {
        var html = await httpClient.GetStringAsync(url);
        Console.WriteLine($"retrieved {html.Length} characters from {url}");
    },
    maxDegreeOfParalellism: maxThreads);

Technique 4 - Polly Bulkhead Policy

The final technique is to use the "Bulkhead" isolation policy from Polly. A bulkhead policy restricts the number of concurrent calls that can be made, and optionally allows you to queue up calls that exceed that number.

Here we set up a bulkhead policy with a constrained number of concurrent executions and an unlimited number of queued tasks. Then we simply call ExecuteAsync repeatedly on the bulkhead policy, allowing it to either run it immediately or queue it up if too many.

    var bulkhead = Policy.BulkheadAsync(maxThreads, Int32.MaxValue);
    var tasks = new List<Task>();
    foreach (var url in urls)
    {
        var t = bulkhead.ExecuteAsync(async () =>
        {
            var html = await client.GetStringAsync(url);
            Console.WriteLine($"retrieved {html.Length} characters from {url}");
        });
        tasks.Add(t);
    }
    await Task.WhenAll(tasks);

As with several other of our solutions we put the tasks into a list and use Task.WhenAll to wait for them. It is worth pointing out though, that this pattern is really designed for the situation where concurrent tasks are being generated from multiple threads (for example from ASP.NET controller actions). They simply use a shared bulkhead policy and then you just run a task with await bulkhead.ExecuteAsync(...). So this approach is very straightforward to use in the situations it is designed for.

Summary

Parallelism can greatly speed up the overall performance of your application, but when misused can cause more problems than it solves. These patterns allow you to use a constrained number of threads to work through a batch of jobs. The one you should pick depends on the way you're generating tasks - do you know them all up front, or are they created on the fly while you're already processing earlier tasks? And are you generating these tasks sequentially on a single thread, or are multiple threads able to produce additional work items on the fly?

Of course, I'm sure there are plenty of other clever ways of approaching this problem, so do let me know in the comments what your preffered solution is.

Comments

Comment by Mark Heath

ah yes, I've seen that demoed a few times and never got round to trying it myself

Mark Heath
Comment by Thomas Levesque

> But it can be a bit of a pain if we are still generating more work to do
while we've started processing work as the reader threads could exit
too early.
You could use a BlockingCollection<t> (which is backed by a ConcurrentQueue<t> by default). It's perfect for producer-consumer scenarios.
https://docs.microsoft.com/...

Thomas Levesque
Comment by Mark Heath

Awesome, hadn't come across that one before. Will have to give it a try. Thanks for letting me know about it.

Mark Heath
Comment by Petar Petrov

I'm using BlockingCollection<t>, CountdownEvent and ThreadPool.QueueUserWorkItem
var limit = 4;
using (var urlsCollection = new BlockingCollection<string>(limit))
{
var workers = Math.Min(urls.Length, limit);
using (var doneEvent = new CountdownEvent(workers))
{
// Consumers(Workers)
for (var i = 0; i < workers; i++)
{
ThreadPool.QueueUserWorkItem(_ =>
{
var pms = _ as object[];
var e = pms[0] as CountdownEvent;
var items = pms[1] as BlockingCollection<string>;
try
{
foreach (var item in items.GetConsumingEnumerable())
{
// Work ...
}
}
finally
{
e.Signal();
}
}, new object[] { doneEvent, urlsCollection });
}
// Producer
foreach (var url in urls)
{
urlsCollection.Add(url);
}
urlsCollection.CompleteAdding();
doneEvent.Wait();
}
}

Petar Petrov
Comment by snielsson

i second that. tpl dataflow is superuseful and very simple to use.

snielsson
Comment by Stuart Turner

Another thing to remember is that Task.WhenAll() takes in a IEnumerable<task<t>>, so you do not have to use an instantiated list. I like option 4, but you could reduce code even further like this:
var bulkhead = Policy.BulkheadAsync(maxThreads, Int32.MaxValue);
await Task.WhenAll(urls
.Select(u => bulkhead.ExecuteAsync(async () =>
{
var html = await client.GetStringAsync(u);
Console.WriteLine(…);
})));

Stuart Turner
Comment by Artur Kordzik

1. Parallel.Foreach is not suitable for executing async methods. It's designed to be used for heavy cpu-bound operations run in parallel (not concurrent). For more details you can see this discussion
https://stackoverflow.com/q...
2. In the same example (Parallel.Foreach), invoking Result on async method is also bad approach because you block execution thread and as a result run the method synchronously
3. I agree with Petern Bons that ActionBlock is the best solution here.
I strongly advice you to read this article:
http://gigi.nullneuron.net/...

Artur Kordzik
Comment by Mark Heath

yes, agreed and I do say in my article that you should not use Parallel.ForEach in this situation.
Thanks for the link to the article. I have a similar article brewing about some threading antipatterns I've come across.

Mark Heath
Comment by Artur Kordzik

Sorry, missed that apparently :)

Artur Kordzik
Comment by Ravi Puri

What about using Rx, System.Reactive.Linq and IObservable<t> e.g.
var resources =
new List<string>
{
"http://www.google.com"
,"http://www.yahoo.com"
,"http://cnn.com"
,"http://microsoft.com"
}
.ToObservable()
.Select(s => Observable.FromAsync(() => DownloadAsync(s)))
.Concat()
.Where(w => w.IsSuccessStatusCode)
.Select(s => Observable.FromAsync(() => GetResources(s)))
.Concat()
.Where(w => w.Any())
.SelectMany(s => s)
.ToEnumerable()
.OrderBy(o => o.Name);
foreach (var re in resources)
{
Console.WriteLine(re.Name);
}
Console.ReadLine();

Ravi Puri
Comment by Nicolas Musset

WhenAll() instantiates an array, either by calling ToArray() on the enumeration or copying element one by one when the enumeration is already a collection (e.g. a list). In other words, it doesn't change anything.
Only case where there is a small improvement is when passing an array. So if you know in advance the number of tasks, consider creating an array instead.

Nicolas Musset
Comment by Nicolas Musset

Hi Mark. Nice article, I enjoyed reading it.
Do you plan to update it or write a follow up, with all the comments/feedbacks here?

Nicolas Musset
Comment by Nicolas Musset

I often use Stephen Cleary's Nito.Async library (https://github.com/StephenC.... It has a lot of asyn-ready constructs (including queue, semaphore, etc.) so I don't have to re-implement everything.
Stephen was working on a rewrite of his library but i haven't seen any progress the last 6 months so i don't know what is the current satus.

Nicolas Musset
Comment by Stuart Turner

Technically, by passing in an IEnumerable<>, you save the allocation of a temporary array. On the other hand, depending on the source of the IEnumerable<>, you may have the allocation of lambdas, so it's probably a wash either way.
My comment is not about performance though. It's about readability. Since Task.WhenAll() takes an IEnumerable<>, you can provide the result of a .Select() directly, rather than the way Mark did it where you use a foreach() loop to add items to a List<>. I find it easier to read code when boiler-plate code like adding items to a List<> in a foreach() is removed.

Stuart Turner
Comment by Nicolas Musset

No in fact you don't save any allocation. Look at the code for WhenAll() and ToArray(). Whether you allocate a list and pass it to WhenAll() or pass an IEnumerable() and let WhenAll() do the allocation, the result is exactly the same number of allocations (list + array, or buffer + array).
Sources:
https://referencesource.mic...
https://referencesource.mic...

Nicolas Musset
Comment by Stuart Turner

You are correct, my mistake.
However, you're still missing the point. My change is not about performance. Rather, it's about readability, which I said in my previous comment.

Stuart Turner
Comment by Nicolas Musset

Readability is always better, indeed. On that we agree ;)

Nicolas Musset
Comment by Stephen Cleary

I think it's important to distinguish between synchronous and asynchronous concurrency. Synchronous concurrency (parallelism) is using multiple threads, and is an appropriate choice if you have CPU-bound code. Asynchronous concurrency is a form of concurrency that does not require additional threads, and is an appropriate choice if you have I/O-bound code.
This example (downloading) is I/O-bound, and thus a more appropriate fit for asynchronous concurrency. This is why Parallel / ConcurrentQueue / BlockingCollection solutions end up being awkward - blocking threads, etc. Those types are all firmly in the synchronous concurrency world. It's possible to do solve the problem this way, but it is less efficient.
Downloading is more suited for asynchronous concurrency approaches. This includes SemaphoreSlim with Task.WhenAll (but without the unnecessary Task.Run), and TPL ActionBlock / BufferBlock (works like an asynchronous ConcurrentQueue).
E.g., the SempahoreSlim approach would simplify to:
var throttler = new SemaphoreSlim(initialCount: maxThreads);
var tasks = urls.Select(async url =>
{
await throttler.WaitAsync();
try
{
var html = await client.GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
and it can be simplified further if you define a LockAsync extension method for SemaphoreSlim.

Stephen Cleary
Comment by Mark Heath

yes, would be a good idea to followup at some time. There's been a lot of helpful suggestions in the comments.

Mark Heath
Comment by Stephen Cleary

The v5 release is blocked on a single issue that shows up when the thread pool is exhausted and client code makes blocking calls into AsyncEx types: https://github.com/StephenC...
This is a rare scenario, but I want to get a fix in before v5 has an official release. Regarding time frame, it's difficult to say.

Stephen Cleary
Comment by Mark Heath

thanks, yes agreed its important to make the distinction between whether you need concurrency because you have CPU-bound or IO-bound operations. Thanks for the suggested code simplification. And presumably you mean the LockAsync method from AsyncEx - https://github.com/StephenC...
I'll have to give that a try - looks a very helpful library.

Mark Heath
Comment by Enrique Sierra Gutierrez

For Technique 2, I'm a little bit confused.
Because the await throttler.WaitAsync() call is made before the creation of the tasks and not inside the tasks, isn't the semaphore just controlling how fast the loop adds new tasks into the list and not really controlling how many of them run concurrently?
Update:
Oh nevermind, I just saw that the Release code is made right when each tasks finishes ^^.

Enrique Sierra Gutierrez