Constraining Concurrent Threads in C#
Suppose in C# we have a number of tasks to perform that we're currently doing sequentially, but would like to speed up by running them in parallel. As a trivial example, imagine we're downloading a bunch of web pages like this:
var urls = new [] {
"https://github.com/naudio/NAudio",
"https://twitter.com/mark_heath",
"https://github.com/markheath/azure-functions-links",
"https://pluralsight.com/authors/mark-heath",
"https://github.com/markheath/advent-of-code-js",
"http://stackoverflow.com/users/7532/mark-heath",
"https://mvp.microsoft.com/en-us/mvp/Mark%20%20Heath-5002551",
"https://github.com/markheath/func-todo-backend",
"https://github.com/markheath/typescript-tetris",
};
var client = new HttpClient();
foreach(var url in urls)
{
var html = await client.GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
}
To parallelize this, we could just turn every single download into a separate Task
with Task.Run
and wait for them all to complete, but what if we wanted to limit the number of concurrent downloads? Let's say we only want 4 downloads to happen at a time.
In this trivial example, the exact number might not matter too much, but it's not hard to imagine a situation in which you would want to avoid too many concurrent calls to a downstream service.
In this post I'll look at four different ways of solving this problem.
Technique 1 - ConcurrentQueue
The first technique has been my go-to approach for many years. The basic idea is to put the work onto a queue and then have multiple threads reading off that queue. This is a nice simple approach but it does require us to remember to lock the queue as it will be accessed by multiple threads. In this example I'm using ConcurrentQueue
to give us thread safety.
We fill the queue with all the urls to download, and then start one Task
for each thread that simply sits in a loop trying to read from the queue, and exits when there are no more items left in the queue. We put each of these queue reader tasks in a list and then use Task.WhenAll
to wait for them all to exit, which will happen once the final download has completed.
var maxThreads = 4;
var q = new ConcurrentQueue<string>(urls);
var tasks = new List<Task>();
for(int n = 0; n < maxThreads; n++)
{
tasks.Add(Task.Run(async () => {
while(q.TryDequeue(out string url))
{
var html = await client.GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
}
}));
}
await Task.WhenAll(tasks);
I still like this approach as its conceptually simple. But it can be a bit of a pain if we are still generating more work to do while we've started processing work as the reader threads could exit too early.
Technique 2 - SemaphoreSlim
Another approach (inspired by this StackOverflow answer) to use a SemaphoreSlim
with an initialCount
equal to the maximum number of threads. Then you use WaitAsync
to wait until it's OK to queue up another. So immediately we kick off four tasks, but then have to wait for the first of those to finish before we get past WaitAsync
to add the next.
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: maxThreads);
foreach (var url in urls)
{
await throttler.WaitAsync();
allTasks.Add(
Task.Run(async () =>
{
try
{
var html = await client.GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
}
finally
{
throttler.Release();
}
}));
}
await Task.WhenAll(allTasks);
The code here is a bit more verbose than the ConcurrentQueue
approach and also ends up with a potentially huge list containing mostly completed Tasks, but this approach does have an advantage if you are generating the tasks to be completed at the same time you are executing them.
For example, to upload a large file to Azure blob storage you might read 1MB chunks sequentially, but want to upload up to four in parallel. You don't want to read all the chunks in advance of uploading them as that uses a lot of time and memory before we can even start uploading. With this approach we can generate the work to be done just in time, as threads become available for uploading which is more efficient.
Technique 3 - Parallel.ForEach
The Parallel.ForEach
method at first appears to be the perfect solution to this problem. You can simply specify the MaxDegreeOfParallelism
and then provide an Action
to perform on each item in your IEnumerable
:
var options = new ParallelOptions() { MaxDegreeOfParallelism = maxThreads };
Parallel.ForEach(urls, options, url =>
{
var html = client.GetStringAsync(url).Result;
Console.WriteLine($"retrieved {html.Length} characters from {url}");
});
Looks nice and simple doesn't it? However, there is a nasty gotcha here. Because Parallel.ForEach
takes an Action
, not a Func<T>
it should only be used to call synchronous functions. You might notice we've ended up putting a .Result
after GetStringAsync
which is a dangerous antipattern.
So unfortunately, this method should only be used if you have a synchronous method you want to perform in parallel. There is a NuGet package that implements an asynchronous version of Parallel.ForEach so you could try that if you'd like to write something like this instead:
await uris.ParallelForEachAsync(
async url =>
{
var html = await httpClient.GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
},
maxDegreeOfParalellism: maxThreads);
Technique 4 - Polly Bulkhead Policy
The final technique is to use the "Bulkhead" isolation policy from Polly. A bulkhead policy restricts the number of concurrent calls that can be made, and optionally allows you to queue up calls that exceed that number.
Here we set up a bulkhead policy with a constrained number of concurrent executions and an unlimited number of queued tasks. Then we simply call ExecuteAsync
repeatedly on the bulkhead policy, allowing it to either run it immediately or queue it up if too many.
var bulkhead = Policy.BulkheadAsync(maxThreads, Int32.MaxValue);
var tasks = new List<Task>();
foreach (var url in urls)
{
var t = bulkhead.ExecuteAsync(async () =>
{
var html = await client.GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
});
tasks.Add(t);
}
await Task.WhenAll(tasks);
As with several other of our solutions we put the tasks into a list and use Task.WhenAll
to wait for them. It is worth pointing out though, that this pattern is really designed for the situation where concurrent tasks are being generated from multiple threads (for example from ASP.NET controller actions). They simply use a shared bulkhead policy and then you just run a task with await bulkhead.ExecuteAsync(...)
. So this approach is very straightforward to use in the situations it is designed for.
Summary
Parallelism can greatly speed up the overall performance of your application, but when misused can cause more problems than it solves. These patterns allow you to use a constrained number of threads to work through a batch of jobs. The one you should pick depends on the way you're generating tasks - do you know them all up front, or are they created on the fly while you're already processing earlier tasks? And are you generating these tasks sequentially on a single thread, or are multiple threads able to produce additional work items on the fly?
Of course, I'm sure there are plenty of other clever ways of approaching this problem, so do let me know in the comments what your preffered solution is.
Comments
You could use the ActionBlock of the TPL Dataflow, see https://stackoverflow.com/a...
Peter Bonsah yes, I've seen that demoed a few times and never got round to trying it myself
Mark Heath> But it can be a bit of a pain if we are still generating more work to do
Thomas Levesquewhile we've started processing work as the reader threads could exit
too early.
You could use a BlockingCollection<t> (which is backed by a ConcurrentQueue<t> by default). It's perfect for producer-consumer scenarios.
https://docs.microsoft.com/...
Awesome, hadn't come across that one before. Will have to give it a try. Thanks for letting me know about it.
Mark HeathI'm using BlockingCollection<t>, CountdownEvent and ThreadPool.QueueUserWorkItem
Petar Petrovvar limit = 4;
using (var urlsCollection = new BlockingCollection<string>(limit))
{
var workers = Math.Min(urls.Length, limit);
using (var doneEvent = new CountdownEvent(workers))
{
// Consumers(Workers)
for (var i = 0; i < workers; i++)
{
ThreadPool.QueueUserWorkItem(_ =>
{
var pms = _ as object[];
var e = pms[0] as CountdownEvent;
var items = pms[1] as BlockingCollection<string>;
try
{
foreach (var item in items.GetConsumingEnumerable())
{
// Work ...
}
}
finally
{
e.Signal();
}
}, new object[] { doneEvent, urlsCollection });
}
// Producer
foreach (var url in urls)
{
urlsCollection.Add(url);
}
urlsCollection.CompleteAdding();
doneEvent.Wait();
}
}
i second that. tpl dataflow is superuseful and very simple to use.
snielssonAnother thing to remember is that Task.WhenAll() takes in a IEnumerable<task<t>>, so you do not have to use an instantiated list. I like option 4, but you could reduce code even further like this:
Stuart Turnervar bulkhead = Policy.BulkheadAsync(maxThreads, Int32.MaxValue);
await Task.WhenAll(urls
.Select(u => bulkhead.ExecuteAsync(async () =>
{
var html = await client.GetStringAsync(u);
Console.WriteLine(…);
})));
1. Parallel.Foreach is not suitable for executing async methods. It's designed to be used for heavy cpu-bound operations run in parallel (not concurrent). For more details you can see this discussion
Artur Kordzikhttps://stackoverflow.com/q...
2. In the same example (Parallel.Foreach), invoking Result on async method is also bad approach because you block execution thread and as a result run the method synchronously
3. I agree with Petern Bons that ActionBlock is the best solution here.
I strongly advice you to read this article:
http://gigi.nullneuron.net/...
yes, agreed and I do say in my article that you should not use Parallel.ForEach in this situation.
Mark HeathThanks for the link to the article. I have a similar article brewing about some threading antipatterns I've come across.
Sorry, missed that apparently :)
Artur KordzikWhat about using Rx, System.Reactive.Linq and IObservable<t> e.g.
Ravi Purivar resources =
new List<string>
{
"http://www.google.com"
,"http://www.yahoo.com"
,"http://cnn.com"
,"http://microsoft.com"
}
.ToObservable()
.Select(s => Observable.FromAsync(() => DownloadAsync(s)))
.Concat()
.Where(w => w.IsSuccessStatusCode)
.Select(s => Observable.FromAsync(() => GetResources(s)))
.Concat()
.Where(w => w.Any())
.SelectMany(s => s)
.ToEnumerable()
.OrderBy(o => o.Name);
foreach (var re in resources)
{
Console.WriteLine(re.Name);
}
Console.ReadLine();
WhenAll() instantiates an array, either by calling ToArray() on the enumeration or copying element one by one when the enumeration is already a collection (e.g. a list). In other words, it doesn't change anything.
Nicolas MussetOnly case where there is a small improvement is when passing an array. So if you know in advance the number of tasks, consider creating an array instead.
Hi Mark. Nice article, I enjoyed reading it.
Nicolas MussetDo you plan to update it or write a follow up, with all the comments/feedbacks here?
I often use Stephen Cleary's Nito.Async library (https://github.com/StephenC.... It has a lot of asyn-ready constructs (including queue, semaphore, etc.) so I don't have to re-implement everything.
Nicolas MussetStephen was working on a rewrite of his library but i haven't seen any progress the last 6 months so i don't know what is the current satus.
Technically, by passing in an IEnumerable<>, you save the allocation of a temporary array. On the other hand, depending on the source of the IEnumerable<>, you may have the allocation of lambdas, so it's probably a wash either way.
Stuart TurnerMy comment is not about performance though. It's about readability. Since Task.WhenAll() takes an IEnumerable<>, you can provide the result of a .Select() directly, rather than the way Mark did it where you use a foreach() loop to add items to a List<>. I find it easier to read code when boiler-plate code like adding items to a List<> in a foreach() is removed.
No in fact you don't save any allocation. Look at the code for WhenAll() and ToArray(). Whether you allocate a list and pass it to WhenAll() or pass an IEnumerable() and let WhenAll() do the allocation, the result is exactly the same number of allocations (list + array, or buffer + array).
Nicolas MussetSources:
https://referencesource.mic...
https://referencesource.mic...
You are correct, my mistake.
Stuart TurnerHowever, you're still missing the point. My change is not about performance. Rather, it's about readability, which I said in my previous comment.
Readability is always better, indeed. On that we agree ;)
Nicolas MussetI think it's important to distinguish between synchronous and asynchronous concurrency. Synchronous concurrency (parallelism) is using multiple threads, and is an appropriate choice if you have CPU-bound code. Asynchronous concurrency is a form of concurrency that does not require additional threads, and is an appropriate choice if you have I/O-bound code.
Stephen ClearyThis example (downloading) is I/O-bound, and thus a more appropriate fit for asynchronous concurrency. This is why Parallel / ConcurrentQueue / BlockingCollection solutions end up being awkward - blocking threads, etc. Those types are all firmly in the synchronous concurrency world. It's possible to do solve the problem this way, but it is less efficient.
Downloading is more suited for asynchronous concurrency approaches. This includes SemaphoreSlim with Task.WhenAll (but without the unnecessary Task.Run), and TPL ActionBlock / BufferBlock (works like an asynchronous ConcurrentQueue).
E.g., the SempahoreSlim approach would simplify to:
var throttler = new SemaphoreSlim(initialCount: maxThreads);
var tasks = urls.Select(async url =>
{
await throttler.WaitAsync();
try
{
var html = await client.GetStringAsync(url);
Console.WriteLine($"retrieved {html.Length} characters from {url}");
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
and it can be simplified further if you define a LockAsync extension method for SemaphoreSlim.
yes, would be a good idea to followup at some time. There's been a lot of helpful suggestions in the comments.
Mark HeathThe v5 release is blocked on a single issue that shows up when the thread pool is exhausted and client code makes blocking calls into AsyncEx types: https://github.com/StephenC...
Stephen ClearyThis is a rare scenario, but I want to get a fix in before v5 has an official release. Regarding time frame, it's difficult to say.
thanks, yes agreed its important to make the distinction between whether you need concurrency because you have CPU-bound or IO-bound operations. Thanks for the suggested code simplification. And presumably you mean the LockAsync method from AsyncEx - https://github.com/StephenC...
Mark HeathI'll have to give that a try - looks a very helpful library.
For Technique 2, I'm a little bit confused.
Enrique Sierra GutierrezBecause the await throttler.WaitAsync() call is made before the creation of the tasks and not inside the tasks, isn't the semaphore just controlling how fast the loop adds new tasks into the list and not really controlling how many of them run concurrently?
Update:
Oh nevermind, I just saw that the Release code is made right when each tasks finishes ^^.