Posted in:

Recently I wrote about how AzCopy can upload blobs rapidly by uploading multiple chunks in parallel. And I also showed how to use TPL Dataflow to implement the producer consumer pattern. Let's put these two bits of information together to build our own blob uploader in C# using the Azure SDK and see how fast it can go.

Producer and Consumer

The producer consumer pattern is ideal for this scenario as we want to sequentially work through the file, producing blocks to upload, but to upload them in parallel for maximum throughput. We also don't want to hold the whole file in memory, so we should only produce blocks at a fast enough rate to keep all the consumers occupied.

The producer will produce "blocks" of audio, each of which has an id, and a buffer holding the block data, as well as an indicator of the amount of data in that block.

record Block(string Id, byte[] Data, int Length);

Our producer function is simply going to read buffers of 8MB at a time (because that's the block size that AzCopy seems to use) and pass them to the ITargetBlock (with SendAsync which enables us to apply back-pressure).

We need to give each block a base 64 encoded identifier which is simply a GUID. The producer function keeps track of all the block ids, so that we can use the full list at the end to construct a block blob out of all the staged blocks. It's important that we keep this list of ids in the correct order or the file will be corrupt.

async Task<IReadOnlyCollection<string>> Produce(ITargetBlock<Block> target, 
                                                Stream file)
{
    var blockIds = new List<string>();
    var blockSize = 8 * 1024 * 1024;

    while (true)
    {
        var buffer = new byte[blockSize];
        var read = await file.ReadAsync(buffer, 0, buffer.Length);
        if (read == 0) break;

        string blockId = Convert.ToBase64String(Guid.NewGuid().ToByteArray());
        blockIds.Add(blockId);
        await target.SendAsync(new Block(blockId, buffer, read));
    }
    target.Complete();
    return blockIds;
}

Consuming each block is relatively easy. We just call the StageBlockAsync method on a block blob client for each one.

async Task StageBlock(Block block, BlockBlobClient blobClient)
{
    using var ms = new MemoryStream(block.Data, 0, block.Length);
    await blobClient.StageBlockAsync(block.Id, ms);
}

Starting the copy

To start the copy, we first need to connect to blob storage and get a container client:

var blobServiceClient = new BlobServiceClient(connectionString);
var containerName = "uploads";
var containerClient = blobServiceClient.GetBlobContainerClient(containerName);

And then we'll get a block blob client for our upload, specifying the blob name. I'll also open the file that we're going to upload.

var uploadBlobClient = containerClient.GetBlockBlobClient("chunked-upload.mp4");
await uploadBlobClient.DeleteIfExistsAsync();
using var file = File.OpenRead(fileName);

And now it's just a case of connecting the producer to the consumer by means of a BufferBlock with a bounded capacity (as I explained in my article about the producer consumer pattern with TPL Dataflow). And a linked ActionBlock with parallelism configured that stages each block. I've set maxParallelConsume to 8 by default, but you can try different values to see what speed increase you can get.

var buffer = new BufferBlock<Block>(new DataflowBlockOptions() 
                { BoundedCapacity = maxParallelConsume });

var consumerBlock = new ActionBlock<Block>(
    block => StageBlock(block,uploadBlobClient),
    new ExecutionDataflowBlockOptions 
                { BoundedCapacity = maxParallelConsume,
                   MaxDegreeOfParallelism = maxParallelConsume });

buffer.LinkTo(consumerBlock, new DataflowLinkOptions() 
                { PropagateCompletion = true });

And then we just need to start producing blocks, and wait for the consumers to complete.

var producerTask = Produce(buffer,file);
await consumerBlock.Completion;

Finally, we need to commit the block list, which we can get to as it's the result of the producer task. When we commit a block we can set up various metadata such as tags, and metadata, but here I'm simply committing the block list.

var blockIds = producerTask.Result;
var opts = new CommitBlockListOptions()
{
    // could set tags, metadata, mime type etc here
};

var info = await uploadBlobClient.CommitBlockListAsync(blockIds, opts);

It is possible commit block lists multiple times, to gradually build up a blob by appending blocks one at a time, but I prefer for a file I'm uploading to only exist as a blob once it is uploaded in its entirety. That means you never have to be concerned about the possibility of a half-uploaded file being visible in blob storage.

Test by hashing

Of course, fast upload is no good if we corrupt the file on the way up, so let's test by hashing. Here's a simple function that can calculate a hash for a stream.

string HashFile(HashAlgorithm hasher, Stream stream)
{
    hasher.Initialize();
    var buffer = new byte[4*1024*1024];
    while (true)
    {
        int read = stream.Read(buffer, 0, buffer.Length);
        if (read == 0) break;
        hasher.TransformBlock(buffer, 0, read, null, 0);
    }

    hasher.TransformFinalBlock(new byte[0], 0, 0);
    var hash = hasher.Hash;
    return BitConverter.ToString(hash).Replace("-", "");
};

We could then calculate the hash of the file we're uploading...

using var file = File.OpenRead(fileName);
var expectedHash = HashStream(SHA512.Create(), file);

And then use our blob client to re-download the file we just uploaded and calculate the hash to be compared with the expected value.

using var readStream = uploadBlobClient.OpenRead();
var uploadedHash = HashStream(SHA512.Create(),readStream);

Results

With this code I was able to achieve upload speeds on a par with AzCopy in C# which (depending on the file size and available upload bandwidth) were up to 10 times faster than simply uploading a file one chunk at a time. You can try out the LINQPad script I used for this demo here.

Finally, an important note. You won't normally need to do this with the Azure SDK, as the UploadAsync method on the BlobClient already lets you specify a maximum degree of concurrency. I blogged about how to do this here. The reason I went through this exercise is that I sometimes need to deal with chunked files being uploaded via an external API, and so the staging block approach is ideal to use. (And it was also fun to find a problem that was a good fit for the producer consumer TPL dataflow).