Experimenting with System.IO.Pipelines for high performance audio
Audio pipelines
In audio programming, you often need to rapidly process buffers of audio. When you record audio, several times a second a new buffer of audio is presented that is usually saved to a file or injected into an audio processing pipeline. And when you play audio, several times a second you are pulling audio out of an audio processing pipeline to provide new buffers to the soundcard.
There are some scenarios in which both happen at the same time. You are receiving audio, either by capturing it from a soundcard input, or by receiving it over the network (e.g. in an online voice chat), and placing that audio into an audio processing pipeline. But at the same time you are playing audio, reading it out of the pipeline.
This requires a high-performance pipeline to avoid glitches or dropouts in audio, and in NAudio, BufferedWaveProvider
provides this functionality. This is backed by a circular memory buffer (to avoid memory allocations - an important consideration for high performance code), and is thread-safe, meaning that it can be safely read from and written to on different threads.
System.IO.Pipelines.Pipe
BufferedWaveProvider
has worked well for many years, but I was very interested to see that the new System.IO.Pipelines.Pipe
class solves a very similar problem. It's intended for situations where you need to efficiently parse or process data while it is being received, and is especially useful when one thread is writing to the pipe, and another thread is reading, and the reading thread might need pause and wait for more data to be written to the pipe before it can continue.
The new Pipe
class lets you work with Span<T>
, which I've written about before from an audio processing perspective - it offers a very efficient way of working with memory, helping you to reduce allocations and copies.
Let's see how to work with a Pipe
.
Writing to a Pipe
Let's start by creating a new Pipe
and get hold of it's PipeWriter
which we can use for writing. The Pipe
constructor does have some configuration options to fine-tune exactly how it works, but I'm just using the defaults here.
var pipe = new Pipe();
var writer = pipe.Writer;
Now, whenever we get some new audio date, we can write it into the pipe by calling WriteAsync
. Here I'm just filling a byte array with random data, but normally you'd be writing the audio buffer received from the soundcard or over the network.
// just for demo purposes - get some random data to add to our pipe
var r = new Random();
var buffer = new byte[1024];
r.NextBytes(buffer);
// write it to the pipe
await writer.WriteAsync(buffer);
We can keep writing to the pipe without anything reading from it, but after a certain (configurable) threshold it will pause while trying to write.
Reading from a Pipe
Now, we can read from the pipe on another thread, and the nice thing about a pipe model is that read sizes do not have to match write sizes. This is very helpful in dealing with audio, as often you want to read a number of samples that is a power of 2, to simplify passing audio through algorithms like a Fast Fourier Transform (FFT), but the audio is not typically arriving in the correct buffer sizes.
Reading is simply a matter of calling ReadAsync
which returns a ReadResult
containing a Buffer
property. The Buffer
is not actually a Span<T>
like you might expect. Instead it's a ReadOnlySequence<byte>
, which is because the data returned might actually be backed by more than one non-contiguous block of memory. There are a variety of ways to access the data in the sequence - you can enumerate through it as a sequence of ReadOnlyMemory<T>
, or you can slice it and copy it into a byte array or Span
as I show below:
var res = await pipe.Reader.ReadAsync();
// to slice the returned buffer
var slice = res.Buffer.Slice(startPos, length);
// to copy the returned buffer into a byte array / span:
slice.CopyTo(buffer);
Just because you read from the pipe, doesn't mean the read position has advanced yet, so you need to explicitly call AdvanceTo
to move forward in the pipe. This can be a bit confusing as you can specify two positions here - the "consumed" position, which indicates the end position of the data you've actually consumed and don't want to see again, and the "examined" position which is how far you have looked. This is relevant if you are parsing data, and the end of the buffer might contain an incomplete data structure that you want to wait for more data to be available before continuing to parse.
To indicate that we've consumed everything we read from the pipe, we can just do this:
pipe.Reader.AdvanceTo(res.Buffer.End);
Or in the case of audio, where I want to read in certain block sizes (BlockSize
in my example), then I'd work through as many blocks as are available, and then tell the pipe that I've consumed up to the start of the first incomplete block, but I've examined everything.
var res = await pipe.Reader.ReadAsync();
var buffer = res.Buffer;
while (buffer.Length >= BlockSize)
{
var block = buffer.Slice(0,BlockSize);
// TODO process the first block of ReadSize bytes here
// slice to get the next block
buffer = buffer.Slice(BlockSize);
}
// advance the pipe to the start of the first incomplete block
pipe.Reader.AdvanceTo(buffer.Start, res.Buffer.End);
One thing to be aware of is that after you call AdvanceTo
you will not get anything back from the pipe reader until there has been another write (even if you didn't "examine" all the way to the end of the buffer). That's why my example above has a loop, to process all blocks in the data returned by ReadAsync
.
Limitations
Could Pipe
be used instead of my existing circular buffer approach to BufferedWaveProvider
in NAudio? Well, there are a few differences and limitations.
First, Pipe
does not give you any indication of how many bytes are currently buffered. In NAudio, I often use that information to decide whether to pause audio playback, to avoid stuttering playback in internet radio streaming scenarios where there is a poor network connection.
Another feature I provide in BufferedWaveProvider
is an option to discard incoming audio if the buffer is full. The Pipe
class does not anticipate that you would want to do this (which makes sense for its intended use case of parsing received data). You'd probably need to track externally how many bytes were buffered to replicate this behaviour.
BufferedWaveProvider
also has a "pad with silence" option, so you can always read your desired number of bytes, just with silence appended to fill in any missing data. This is good for live chat scenarios where network issues may mean there is no buffered audio available to play but we don't want to pause the playback device. Although Pipe
doesn't offer such a feature, it wouldn't be too hard to replicate by adding the padding after calling ReadAsync
.
Finally, Pipe
has an async programming model - you await
the ReadAsync
and WriteAsync
methods. In NAudio, although there are multiple threads involved, you tend to prefer to do all your audio processing on a single thread for performance reasons (and certain audio APIs require you to always call them from the same thread). I believe that Pipe
will try to use the SynchronizationContext
if it is available, but usually there isn't one on an audio playback thread, so code using Pipe
would end up switching to different threadpool threads as you worked through the pipe.
Performance
I was interested to test how well Pipe
performs compared to NAudio's BufferedWaveProvider
.
It was a little tricky to come up with a fair benchmark, since the way BufferedWaveProvider
works is that the call to Read
is not blocking - you're expected to call it periodically when you need the next buffer of audio to play, and so if the required amount of audio isn't present, it usually pads with silence. Pipe
on the other hand will block on a call to ReadAsync
until more data has been written. So as a compromise, my benchmark was single-threaded with alternating writes and reads, but using different desired read block sizes to write block sizes. I then saw how quickly I could get about an hours worth of (random) audio through.
Here's the code for the BufferedWaveProvider
benchmark:
private void ReadWriteBufferedWaveProvider()
{
var r = new Random();
var writeBuffer = new byte[WriteSize];
var readBuffer = new byte[ReadSize];
var bytesWritten = 0;
var bytesRead = 0;
while (bytesRead < TotalBytes)
{
// fill the buffer with random data
r.NextBytes(writeBuffer);
// write a block into the BufferedWaveProvider
bufferedWaveProvider.AddSamples(writeBuffer, 0, writeBuffer.Length);
bytesWritten += writeBuffer.Length;
// read as many full blocks as we can
while(bufferedWaveProvider.BufferedBytes > ReadSize)
{
var read = bufferedWaveProvider.Read(readBuffer, 0, ReadSize);
bytesRead += read;
}
}
}
And for the Pipe
benchmark:
private async Task ReadWritePipe()
{
var r = new Random();
var writeBuffer = new byte[WriteSize];
var bytesWritten = 0;
var bytesRead = 0;
while (bytesRead < TotalBytes)
{
// fill the buffer with random data
r.NextBytes(writeBuffer);
// write it into the pipe
await pipe.Writer.WriteAsync(writeBuffer);
bytesWritten += writeBuffer.Length;
// perform a single read from the pipe
var res = await pipe.Reader.ReadAsync();
// process as many read blocks as we can
var buffer = res.Buffer;
while (buffer.Length >= ReadSize)
{
// here's where we'd process a single block
// var currentBlock = buffer.Slice(0, ReadSize)
buffer = buffer.Slice(ReadSize);
bytesRead += ReadSize;
}
// tell the pipe we've "consumed" up to the start of the first incomplete block
// and we've "examined" the whole thing
pipe.Reader.AdvanceTo(buffer.Start, res.Buffer.End);
}
}
I used Benchmark.NET to compare these approaches at different read and write block sizes, and the results were very close - with NAudio's BufferedWaveProvider
slightly faster:
| Method | TotalBytes | ReadSize | WriteSize | Mean | Error | StdDev | Gen 0 | Gen 1 | Gen 2 | Allocated |
|--------------------- |----------- |--------- |---------- |--------:|---------:|---------:|----------:|------:|------:|-----------:|
| Pipe | 600000000 | 1000 | 1500 | 4.341 s | 0.0455 s | 0.0403 s | 2000.0000 | - | - | 9376.77 KB |
| BufferedWaveProvider | 600000000 | 1000 | 1500 | 4.163 s | 0.0501 s | 0.0469 s | - | - | - | 2.77 KB |
| Pipe | 600000000 | 1000 | 6000 | 4.202 s | 0.0616 s | 0.0576 s | 1000.0000 | - | - | 4693.66 KB |
| BufferedWaveProvider | 600000000 | 1000 | 6000 | 4.181 s | 0.0593 s | 0.0555 s | - | - | - | 7.16 KB |
| Pipe | 600000000 | 5000 | 1500 | 4.342 s | 0.0570 s | 0.0505 s | 1000.0000 | - | - | 7501.77 KB |
| BufferedWaveProvider | 600000000 | 5000 | 1500 | 4.323 s | 0.0993 s | 0.1326 s | - | - | - | 6.67 KB |
| Pipe | 600000000 | 5000 | 6000 | 4.157 s | 0.0596 s | 0.0498 s | - | - | - | 2818.66 KB |
| BufferedWaveProvider | 600000000 | 5000 | 6000 | 4.140 s | 0.0631 s | 0.0591 s | - | - | - | 11.06 KB |
I had mixed feelings about these results. On the one hand, I like the fact that it validates that what I initially created some 15 years ago in NAudio actually performs pretty well. I was already following best practices of avoiding allocations, and minimising the use of locks, so there aren't too many obvious ways it could be optimised further. On the other hand, I was expecting that Pipe
might be even faster. I suspect the main reason it isn't in this benchmark is simply because it has an async API, compared to the BufferedWaveProvider
which doesn't use await
at all. The Pipe
benchmark also seems to allocate a fair bit more memory than I was expecting.
Summary
Pipe
is another great addition to the high performance toolbox for .NET developers, and is particularly good for scenarios where you are consuming data at a different rate or in different block sizes to the rate at which you receive it. Although it serves a similar need to BufferedWaveProvider
in NAudio, the specific requirements of audio pipelines are perhaps not quite the right fit for Pipe
which was more designed with parsing HTTP requests in mind.
The other takeaway from this experiment, is that for high performance scenarios, taking care to write allocation free code (and minimise use of locking) will go a long way.
Comments
try name pipes over WCF for way cleaner approach
Digital ElvisDid You try TPL Dataflow, or compared those two?
szymon wardaEdit:
When scanning the article the `AdvanceTo` looked very similar to `LinkTo`, but after carefull read those librares are not similar.
Good post. Thanks
I guess that:
YorieFirst of all, you're not completely correct with that pipes doesn't have any indication on data recieved or being read - see PauseWriterThreshold and the ResumeWriterThreshold. I could guess they would perfectly fit into your model when you need to block data transmission when reaching some playback buffer size.
Second and most alerting, your performance comparison is totally unfair. You comparing two things which were creates for completely different purposes.
Circular buffers are extremely good when you work with limited buffering. You just can make an assumtion on average data size and avoid lots of allocations and garbage starvation at all.
While pipelines were created to automate a decent workaround about memory allocations and corresponding techniques for data sequences which properties could not be known beforehead, for largely scalable multiuser high performance IO services.
These two techniques both are good but each in own place. Don't mix them in that way you did when done comparison.
Hi! I want to create voice chat app with backend on c#. Can you please share some knowledge with me? What should I learn and what resources you'd recommend? Thank you in advance!
zoom Zoom KabarePipeReader does have a non-blocking TryRead method, maybe that way the comparison would be fairer?
Pipe