Posted in:

A while back I wrote a post about constraining the number of threads when you are working through a bunch of tasks in parallel. For example, if I have 1000 items in a list that I want to process, but I only want to work on 5 at a time, then any of the techniques I discussed there can be used (and you can also use the newer Parallel.ForEachAsync method).

However, sometimes it's not as straightforward as working our way through a simple list of items. Sometimes the work we're doing can be thought of as a "producer" that asynchronously produces items to work on and "consumer" that processes each of those.

Note: Often in this scenario you would reach for a message broker (such as Azure Service Bus), and then the consumers can be completely decoupled from the producers, and running on different machines. But there are situations where you do want the producers and and consumers to be running in the same process.

The "producer consumer pattern" is ideal for this scenario. And the "Task Parallel Library" is able to help us implement this pattern elegantly.

What can make this pattern complex to implement yourself is that producers and consumers may work at different speeds. If the producer is creating work faster than the consumer can process it, then a backlog will build up. But you might not want that backlog to get too large as it might result in excessive memory usage (or it might be better for another process to start helping out).

Another complicating factor is how many items can be produced or consumed in parallel. Sometimes you can only produce in series, and sometimes you can only consume in series, but in other scenarios you might want to consume or produce several items in parallel.

Fortunately the TPL can help us with these situations. In this post, I'll show a very simple producer consumer pattern, and then expand it to help us control the number of items we process in parallel.

Getting started with TPL Dataflow

To use the Task Parallel Library, we need to install the System.Threading.Tasks.Dataflow NuGet package. Note that this now appears to be rolled into .NET 7 (not sure when this happened as I couldn't find any documentation mentioning it) so I didn't actually need the NuGet package installed.

Basic Producer Consumer implementation with TPL Dataflow

The basic example I'll show is similar to the one in the Microsoft Learn tutorial with the exception that we'll simulate that the producer takes some time, and so is still producing items while we begin to consume them.

Here's a method called ProduceAsync which produces 20 strings (at a rate of 2 per second), and puts them into an ITargetBlock, using the SendAsync method (which will allow us to implement back-pressure later). We can call Complete to indicate that we've finished producing items and there will be no more, which allows the consumer to know when its finished.

var produceSpeed = TimeSpan.FromSeconds(0.5);
var produceCount = 20;

async Task ProduceAsync(ITargetBlock<string> target)
{
    for (int i = 0; i < produceCount; i++)
    {
        var item = $"Item {i + 1}";
        Console.WriteLine($"Producing {item}");
        await Task.Delay(produceSpeed);
        Console.WriteLine($"Produced {item}");
        await target.SendAsync(item);
    }

    target.Complete();
}

Let's also have a method that processes one of the items that the producer produces. It's called ConsumeOneAsync and simulates that we need 2 seconds to consume an item.

var consumeSpeed = TimeSpan.FromSeconds(2);
async Task ConsumeOneAsync(string message)
{
    Console.WriteLine($"Consuming {message}");
    await Task.Delay(consumeSpeed);
    Console.WriteLine($"Consumed {message}");
}

And we'll have a method called ConsumeAllAsync that takes an ISourceBlock and uses it to receive each item that the producer produces and calls ConsumeOneAsync on that item. The OutputAvailableAsync check allows us to exit when we know that we've processed all items (because the producer called Complete).

async Task<int> ConsumeAllAsync(ISourceBlock<string> source)
{
    int itemsProcessed = 0;

    while (await source.OutputAvailableAsync())
    {
        var data = await source.ReceiveAsync();
        await ConsumeOneAsync(data);
        itemsProcessed++;
    }

    return itemsProcessed;
}

Now we have these pieces in place, we can use the BufferBlock from TPL dataflow as the link between the producer and consumer. The producer will insert items into it (as an ITargetBlock) and the consumer will read from it (as an ISourceBlock). We can then start both the producer and consumer tasks and wait for them both to finish:

var buffer = new BufferBlock<string>();
var consumerTask = ConsumeAllAsync(buffer);
var producerTask = ProduceAsync(buffer);
await Task.WhenAll(consumerTask, producerTask);
var itemsProcessed = consumerTask.Result;

When we run this, we see that the producer produces items one at a time, and the consumer consumes them one at a time. So all 20 items have been produced while the consumer is still working on item 5. What if we wanted to change this? Suppose we want to consume more than one item in parallel, or avoid too large a backlog building up. Let's see how to do that next.

Introducing back-pressure and consuming in parallel

Let's imagine that I want to consume up to four items in parallel. And so if the buffer has four items in it, I'd like the producer to pause for a while and let the consumers catch up.

The BufferBlock supports setting a BoundedCapacity, which allows us to constrain the number of items in it. What this means is that when we call SendAsync in the producer method, it will wait until there is space in that BufferBlock before returning.

var maxParallelConsume = 4;
var buffer = new BufferBlock<string>(
    new DataflowBlockOptions() { BoundedCapacity = maxParallelConsume });

The next change we need to make is to use an ActionBlock that we connect to the BufferBlock. This allows us to set up an action for every item that is added to the buffer, but we can also control the degree of parallelism.

The ActionBlock will call ConsumeOneAsync for each item that is produced, and has a MaxDegreeOfParallelism configured to control how many items we can consume in parallel (note I found that it was best to also set BoundedCapacity as well to ensure that the queued items don't get taken off BufferBlock before they are ready to be processed). Then we use LinkTo to connect the BufferBlock to the ActionBlock.

var consumerBlock = new ActionBlock<string>(
    message => ConsumeOneAsync(message),
    new ExecutionDataflowBlockOptions 
        { BoundedCapacity = maxParallelConsume,
            MaxDegreeOfParallelism = maxParallelConsume });
buffer.LinkTo(consumerBlock, new DataflowLinkOptions() 
    { PropagateCompletion = true });

The PropagateCompletion flag means that the ActionBlock will report that it is complete once the BufferBlock has completed and it has run the actions for each item in the buffer. So now all we need to do is start producing and wait for the ActionBlock to complete which we can do with the Completion property.

var producerTask = ProduceAsync(buffer);
await consumerBlock.Completion;

When we run this we see that now we are consuming multiple items in parallel, and that we don't produce item 20 until we've consumed item 16 even though we could have produced much faster than that, we were constrained by only allowing 4 items to be buffered at any one time.

Producing Item 1
Produced Item 1
Producing Item 2
Consuming Item 1
Produced Item 2
Producing Item 3
Consuming Item 2
Produced Item 3
Producing Item 4
Consuming Item 3
Produced Item 4
Producing Item 5
Consuming Item 4
Produced Item 5
Producing Item 6
Consumed Item 1
Consuming Item 5
Consumed Item 2
Produced Item 6
...

Real-world uses

This producer-consumer pattern isn't one I find myself reaching for very often, but there are situations where it is exactly what is needed. I hope to blog again soon about the situation I needed this pattern for, which was uploading a large file to blob storage. I wanted to send chunks of that file up in parallel to get faster upload speeds, but I didn't want the whole file to be loaded into memory, so the number of chunks ready to be sent needed to be constrained.

Let me know in the comments what real-world situations you've used this pattern for.

Alternative approaches

Of course, the TPL isn't the only way to achieve the producer consumer pattern. It's certainly much simpler than implementing it yourself. But it also looks like System.Threading.Channels would also allow us to implement the same thing, so I'd like to have a go at implementing this same thing with that as well (and of course I'll blog about it when I do).

You can access the demo LINQPad script for this blog post here.