Async Enumerable in C# (Part 3)
In this third part of my series on IAsyncEnumerable<T>
(part 1, part 2), let's discuss some of the differences between processing a sequence in parallel verses sequentially (sometimes referred to as "in series").
Processing Sequences in Series
When we write a regular foreach
loop to iterate through an IEnumerable<T>
sequence like in the example below, we're processing our sequence in series. Here we process a single order at a time before moving onto the next one:
foreach(var o in GetOrders())
{
await ProcessOrderAsync(o);
}
And the same is true if we have an IAsyncEnumerable<T>
and use the await foreach
syntax. In this example, we're still processing the orders one at a time, even though we're starting with an IAsyncEnumerable<Order>
.
await foreach(var o in GetOrders())
{
await ProcessOrderAsync(o);
}
Processing sequences in series isn't necessarily a problem. In many cases its the best thing to do. It protects us from a whole category of hard to diagnose bugs that come with parallelism.
But there are times when it does make sense for us to work through a sequence faster by processing items in parallel. So let's explore some of the options.
Processing Sequences in Parallel
One simple way of achieving parallelism that I often see developers reaching for is something like the example below. Basically, when we call Select
we return Task<T>
. This means that as we enumerate the sequence we're starting all the tasks one after the other without waiting for them to finish.
Then we can wait for all those tasks to finish with a call to await Tasks.WhenAll
like this:
var tasks = GetOrders().Select(async o => await ProcessOrderAsync(o));
await Tasks.WhenAll(tasks);
A similar approach can be taken if GetOrders
returns an IAsyncEnumerable<T>
. We can call ToListAsync
(from System.Linq.Async) to get a list of tasks we can pass into Task.WhenAll
. Note that we're using Select
(also from the System.Linq.Async NuGet package) not SelectAwait
here which means that we're only kicking off tasks, not waiting for them to finish before moving on to the next element in our IAsyncEnumerable
sequence.
// in this example, GetOrders() returns an IAsyncEnumerable<Order>
var tasks = await GetOrders().Select(async o => await ProcessOrder(o)).ToListAsync();
await Task.WhenAll(tasks);
Constraining the number of parallel operations
One issue with the examples I gave above is the fact that if there are say 10,000 orders, then we'll attempt to start 10,000 tasks all in one go. This not only risks flooding the thread-pool, but also potentially opens us up to overloading other resources (e.g. by making too many calls to a downstream service like a database).
It would be better if we could control the maximum degree of parallelism. For example we might only want to process 10 orders in parallel to avoid overloading the database with too many queries. I wrote an article a few years back about several ways you can constrain the number of parallel actions.
There's actually an easier option available now, which is to take advantage of the new Parallel.ForEachAsync
method that was introduced in .NET 6. Let's see that in action with a short demo.
Converting a sequential LINQ to parallel
In this example, imagine we've got a list of URLs, and we simply want to download the HTML from each of these URLs and search it for a particular phrase. Because that involves an asynchronous operation, we can use the technique we discussed earlier in this series to convert to an IAsyncEnumerable<T>
, allowing us to make a pipeline that maps the url to HTML and filters the results down to just the ones matching the search term:
var results = urls.ToAsyncEnumerable()
.SelectAwait(async url =>
new { Url = url,
Html = await httpClient.GetStringAsync(url)})
.Where(x => x.Html.Contains("Blazor"));
await foreach(var result in results)
{
Console.WriteLine($"Found a match in {result.Url}");
}
However, the example above does not download each URL in parallel, so we can speed things up by just selecting Tasks as discussed above, and then using Task.WhenAll
to wait for them to complete. We now need to move our filtering step to be after all the tasks have completed.
var tasks = urls
.Select(async url => new { Url = url,
Html = await httpClient.GetStringAsync(url) });
var results2 = await Task.WhenAll(tasks);
foreach(var result in results2.Where(x => x.Html.Contains("Blazor")))
{
Console.WriteLine($"Found a match in {result.Url}");
}
Let's see how we might rewrite this code to use Parallel.ForEachAsync
. For more readable code, I'd typically start by refactoring out the entire operation that needs to be performed in parallel into it's own method. I've called it FindMatch
.
async Task FindMatch(string url, string searchTerm)
{
var html = await httpClient.GetStringAsync(url);
if (html.Contains(searchTerm))
{
Console.WriteLine($"Found a match in {url}");
}
}
Now we can run this in parallel, by using Parallel.ForEachAsync
. (note that the ct
parameter is a cancellation token that I'm not making use of in this simple example, but would be a good idea to flow through into your async methods).
await Parallel.ForEachAsync(urls,
async (url, ct) => await FindMatch(url, "Blazor"));
By default, Parallel.ForEachAsync
will use the processor count of your computer as the default maximum degree of parallelism. This is a sensible default particularly if your tasks are CPU bound, as there would be no point going any higher. But there are situations when setting a different value makes sense.
We can control this by customizing the MaxDegreeOfParallelism
property of ParallelOptions
like this:
var parallelOptions = new ParallelOptions() { MaxDegreeOfParallelism = 2 };
await Parallel.ForEachAsync(urls, parallelOptions,
async (url, ct) => await FindMatch(url, "Blazor"));
I like this approach because it cleanly separates the concerns of deciding whether or not to run the operations in series or parallel, from the code that actually performs each operation. Not everything needs to be written as a LINQ pipeline, and it can sometimes result in harder to understand code if you do so.
Alternative approaches to parallelism
If you do find yourself trying to run multiple Tasks in parallel like this, it may be worth considering other alternative approaches. The code we've looked at spreads a large amount of work across different threads running on a single machine.
But it's often preferable to spread work out across multiple workers in a distributed system. One great way to achieve this is through messaging. If we post a message for each order to a service bus, then multiple listeners can work through them on different machines. Services like Azure Functions make this really easy to achieve and will automatically scale out to additional workers to help you manage a large backlog of messages. So that might be a better approach than focusing on parallelization within the context of a single worker.
Summary
In this post I've shown how we can process a sequence in parallel, which can be greatly reduce the time taken to get through a large sequence. However, it's important that you think carefully about what the best way to parallelize your code is, and take into account that too much parallelism can cause additional problems by hitting other bottlenecks in the system.
Comments
Great and useful article. Thanks Mark
Olex