0 Comments Posted in:

In this series:

In an earlier post, we saw some basic code examples of how to send and receive messages from an Azure Service Bus queue. In this post, we're going to look at some options we can configure when receiving messages.

MessageHandlerOptions

If you recall, our basic message receiving code looked a bit like this, where we passed a message handler function to RegisterMessageHandler.

static void Main(string[] args)
{
    var connectionString = args[0];
    var queueName = "queue1";
    var queueClient = new QueueClient(connectionString, queueName);
    var messageHandlerOptions = new MessageHandlerOptions(OnException);
    queueClient.RegisterMessageHandler(OnMessage, messageHandlerOptions);
    Console.WriteLine("Listening, press any key");
    Console.ReadKey();
}

But you can also see that we can pass an instance of MessageHandlerOptions, which allows us to control the way we receive messages. It only has four properties, so let's look at each of them.

ExceptionReceivedHandler

We've already seen that we're required to provide an exception handler function. This will be called if there are problems communicating with the Service Bus. We not only get access to the exception that was thrown, but the ExceptionReceivedContext tells us a little bit more about what exactly it was trying to do when the problem occurred. In our demo we just write to the console, but here you'd typically write out to logs, and maybe even do something that will cause an alert to be raised if this is a critical application running in production.

static Task OnException(ExceptionReceivedEventArgs args)
{
    Console.WriteLine("Got an exception:");
    Console.WriteLine(args.Exception.Message);
    Console.WriteLine(args.ExceptionReceivedContext.Action);
    Console.WriteLine(args.ExceptionReceivedContext.ClientId);
    Console.WriteLine(args.ExceptionReceivedContext.Endpoint);
    Console.WriteLine(args.ExceptionReceivedContext.EntityPath);
    return Task.CompletedTask;
}

MaxConcurrentCalls

We can also specify how many messages we want to process concurrently. By default, this is set to 1, which means that we'll process 1 message off the queue, and only when that has finished will we move onto the next one.

Note that MaxConcurrentCalls applies to just the process that you are running. If for example you had three instances of the Receiver application each listening to the same queue with MaxConcurrentCalls set to 1, then you would process three messages in parallel.

In most situations, a certain amount of concurrent message handling is desirable, since it allows you to work through a queue more quickly, but as we mentioned earlier in this series, there are some trade-offs. If you handle too many messages in parallel, you could overload downstream services such as a database. Also, if all the messages relate to the same entity in a database, you could find that each handler tries to make conflicting updates to the same row in a database table, resulting in concurrency errors.

I updated my Receiver demo application to optionally sleep for 5 seconds in the message handler, and then set MaxConcurrentCalls to 4. Then when I ran it with 5 messages already in the queue, it was clear that the first four were handled concurrently, and then the fifth was handled afterwards. Feel free to download the demo application and experiment with it yourself.

AutoComplete

One nice feature of Service Bus is that when you retrieve a message from the queue, it's not necessarily removed instantaneously. Instead, that message is "locked" for a time period, so it's not visible to anyone else. When you've finished handling the message you then "complete" that message which tells Service Bus you've finished handling the message, and it can safely delete it. This is called the "peek-lock" mode, and is the default behaviour with the SDK.

However, you can also "abandon" the message - in other words you tell Service Bus that for whatever reason, you were not able to process the message. In that case the message becomes available again to be retried.

If AutoComplete is set to true (which is the default), then assuming your handler completes successfully, the Service Bus SDK will automatically complete the message for you. I recommend you use this behaviour, but in some more advanced scenarios you might want to take control yourself.

If you are completing messages yourself, then you do so by calling CompleteAsync (or AbandonAsync) on the QueueClient, and you need to pass in the LockToken for the message, which is available on the SystemProperties of the Message.

await queueClient.CompleteAsync(message.SystemProperties.LockToken);

If you never get back to the Service Bus to complete or abandon the message, then Service Bus will eventually time out the "lock" you have on the message and make it visible again, which is what the next property we'll discuss is about.

MaxAutoRenewDuration

Ideally, your message handler should complete quite quickly, before Service Bus decides to time it out. But sometimes your message handler might legitimately need to perform a long-running task. In this case, you need to keep checking in with Service Bus to tell it that you're still working on the message, and that it should retain the lock. With the Service Bus SDK, you can do this with MaxAutoRenewDuration.

The default is five minutes, meaning that periodically, the SDK will talk back to Service Bus asking to "renew" the lock. It might do this every 30 seconds or so. But after five minutes, it will no longer renew the lock, so it's important if you have a handler that might take an hour or more, to extend this duration to greater than the longest time you might need. Otherwise, while one instance of your message handling microservice is processing the message, it could become visible again, and another instance might concurrently work on the same message, which can often result in bugs that are hard to track down.

Summary

The default message receive options are fine for many use cases, but as you build more complex applications, you will find that you need to fine tune some of these parameters. In particular I've found that it is very useful to make MaxAutoRenewDuration and MaxConcurrentCalls adjustable through configuration in your application, so if you find in production that there is a problem with throughput for a queue, or with messages taking longer to process than expected, you can tweak the values without needing to update source code.

In our next post, we'll start looking at the concepts of Topics and Subscriptions.


0 Comments Posted in:

In this series:

In our last post, we saw some basic code examples of how to send and receive messages from an Azure Service Bus queue.

Today, I want to dig a bit deeper and explore some of the options we have for sending messages.

Message Metadata

In our demo to send a message, we simply created a Message object and set it's Body property to contain the payload of the message. But the Message object allows us to attach various additional metadata to our message. There are some standard properties, as well as the ability to specify arbitrary metadata as a series of key value pairs.

There's a helpful guide available here to many of the properties you can set on the Message object, so I won't try to exhaustively cover it, but I'll just pick out a few interesting properties.

1. ContentType

Every message has a ContentType, indicating what format the message payload is in. A typical value might be application/json as the format for this property should follow RFC2045.

2. Label

A message also has a Label property which you can set to whatever you like to indicate the "subject" of a message. It's completely up to you what you use this property for. You could use this to indicate what type of object was serialized into the message body, but I tend to use the user properties for that...

3. UserProperties

Each message also has a UserProperties dictionary of key value metadata, allowing you to attach any information to a message. Some examples of what you might store here are an identifier of what the message type is, allowing you to deserialize it into a strongly typed object, or even to set up filtered subscriptions that only listen for a specific message type (another nice feature of Service Bus). Or if you're implementing custom message compression or encryption, you could use the UserProperties metadata to indicate what algorithms or keys you are using to do this.

4. MessageId

Another interesting property is the MessageId - by default it will be set to a guid, but it is used for (optional) message de-duplication, so if you have a scenario where it's really important that the same message doesn't get sent twice by accident, then you should set the MessageId to something that would be the same for both "duplicate" messages. This isn't a feature I've made use of myself yet - generally I've tried to promote "idempotent" message handlers, as an alternative way of coping with the same message being delivered twice.

5. CorrelationId

Finally, I want to mention the CorrelationId - this can be used to identify that this message relates to some other message or long-running operation.

A common use of correlation ids is that when an end user initiates an action, a "correlation id" is generated - just a GUID, but from then on, all messages that are sent, or HTTP requests that are made as a result of that single action will use that same correlation id. That way, if all parts of the code ensure they write the correlation id to the logs, it's possible to piece together all the log messages relating to that single operation, even if they're spread out across multiple microservices.

It's a really useful and powerful technique, and I recommend you use it. Of course, you don't necessarily need to use this CorrelationId property - you might use a user property, or even put your correlation id in the message payload.

If you are using correlation ids, I recommend having a base message handler that automatically ensures the correlation id gets logged every time you log everything in that message handler, and also automatically attaches that correlation id to any outgoing messages. Some frameworks might be able to do this for you.

Scheduled Messages

One really powerful feature is the ability to schedule a message to be delivered in the future. This allows you to implement all kinds of interesting patterns like reminders, timeouts, or delayed retries. You can access this by setting the ScheduledEnqueueTimeUtc property on the Message object.

If you set the enqueue time to be say 1 hour in the future, that means that the message will not be visible to the receiver for an hour. Of course, you can't guarantee that the receiver will be listening in an hour - if the receiver happens to be offline then, the message will just be picked up when it starts polling again.

Demo - sending a scheduled message with metadata

I've updated my sender application (available here on GitHub) to set up a few message metadata properties and schedule the message to become visible five minutes into the future

var message = new Message(body);
message.CorrelationId = Guid.NewGuid().ToString();
message.Label = "My message";
message.UserProperties["Sender"] = "Mark";
message.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
await queueClient.SendAsync(message);

And send it in the usual way:

dotnet run $connectionString "scheduled message"

I also updated my receiver application to output a few of the metadata properties

Console.WriteLine("Got a message:");
Console.WriteLine(messageText);

Console.WriteLine($"Enqueued at {m.SystemProperties.EnqueuedTimeUtc}");
Console.WriteLine($"CorrelationId: {m.CorrelationId}");
Console.WriteLine($"ContentType: {m.ContentType}");
Console.WriteLine($"Label: {m.Label}");
Console.WriteLine($"MessageId: {m.MessageId}");
foreach(var prop in m.UserProperties)
{
    Console.WriteLine($"{prop.Key}={prop.Value}");
}

And now when I run the receiver application and wait five minutes for the scheduled message to appear.

dotnet run $connectionString

We can see the correlation id, label and user metadata I supplied, as well as the fact that a MessageId was generated automatically for me:

Got a message:
scheduled message
Enqueued at 28/04/2020 06:54:33
CorrelationId: 872ecaeb-82ee-467e-b8ba-9a016f3a3668
ContentType:
Label: My message
MessageId: 22865392df49461f86fe39e9ff9fdf2c
Sender=Mark

Summary

As we've seen in this post, Azure Service Bus offers us several ways to attach useful metadata to the messages we send, which allow the message handler to know how to correctly interpret the message and provide us useful logging and diagnostic information. We also saw how the scheduled message capability can be really useful to request that an action is performed in the future. In the next installment, we'll look at some of the options for how we handle messages.


0 Comments Posted in:

In this series:

In our last post, we saw how to create an Azure Service Bus namespace, get its connection string, and create a queue using the Azure CLI. In this post, we're going to send our first message to a queue, and see how to receive it, in a .NET Core application, using the Azure Service Bus SDK.

Step 1 - Creating a project

For this simple demo, we're going to create two .NET Core console applications - one that can send messages, and one that can receive messages. We'll make two separate applications to demonstrate that the sender and receiver are often completely different applications, and also that they don't need to be running at the same time. Assuming you have the .NET Core SDK installed, you can use dotnet new to create the console app, and the convenient dotnet add package command to reference the Microsoft.Azure.ServiceBus NuGet package which contains the Azure Service Bus SDK.

Here's what I typed in PowerShell to create the "sender project"

md sender
cd sender
dotnet new console
dotnet add package Microsoft.Azure.ServiceBus

Step 2 - Sending a message

I've kept the "Sender" project as simple as possible. I pass the connection string (which we saw how to get in the previous installment) as a command line parameter. Then we create a QueueClient for the queue that we want to send the message to. If you remember we already created a queue called queue1.

The message body is essentially the payload that we want to send. This is simply a byte array, so you can use any form of serialization you like. It's very common to serialize as JSON, and it's also possible to compress or encrypt the contents of a message. We'll perhaps look at those topics later in this series, but for now I'm just taking the message text (which is the second parameter to our console app), converting it into a byte array and using that as the message body. The most important thing about the message body is to ensure that it is in a format that receivers can easily interpret.

We then construct a Message object. This has some interesting additional properties we will explore later but for now we don't need any of them, and we'll send the message with SendAsync on the QueueClient.

static async Task Main(string[] args)
{
    var connectionString = args[0];
    var queueName = "queue1";
    var queueClient = new QueueClient(connectionString, queueName);
    var messageText = args[1];
    var body = Encoding.UTF8.GetBytes(messageText);
    var message = new Message(body);
    await queueClient.SendAsync(message);
    Console.WriteLine("Sent message");
}

We can run this with dotnet run $connectionString "hello world", and this will post a message containing "hello world" to queue1. Note that the receiving application doesn't need to be running. Azure Service Bus will hold onto this message until the receiver runs.

Step 3 - Receiving Messages

I've created a separate console application called Receiver for the purposes of receiving messages. Azure Service Bus does allow you to connect to a single queue and just ask for one message at a time, but the SDK encourages you to use a technique where you simply register a "message handler". That message handler will be called every time a message is received. The SDK behind the scenes uses an industry standard protocol called AMQP to receive those messages from Service Bus.

Let's look at the code for the Main method in our Receiver application, which was created in exactly the same way as we saw for the Sender application above. Notice again that we create a QueueClient using the Service Bus connection string that we pass in as a command line parameter. Then we use RegisterMessageHandler to tell it which method to call when we receive a message - in our case OnMessage. And there are also some custom options we can provide (we'll look in more detail at those later in this series), which requires us to also provide an exception handler - ours is called OnException. Then we just need to block which we're doing by waiting for a keypress, as the message pump runs in the background. In a real-world application that is processing queue messages, you'd probably want to consider using the .NET Core "worker service" template which is ideal for this kind of use case.

static void Main(string[] args)
{
    var connectionString = args[0];
    var queueName = "queue1";
    var queueClient = new QueueClient(connectionString, queueName);
    var messageHandlerOptions = new MessageHandlerOptions(OnException);
    queueClient.RegisterMessageHandler(OnMessage, messageHandlerOptions);
    Console.WriteLine("Listening, press any key");
    Console.ReadKey();
}

Let's look at the OnMessage message handler. We get passed a Message object, which allows us to access its Body property, which in our case just contains a string in UTF8 encoding, so reading that string out is easy. Notice that I'm also accessing one of the SystemProperties of a message - in this case we can see the EnqueuedTimeUtc which tells us when this particular message was sent.

static Task OnMessage(Message m, CancellationToken ct)
{
    var messageText = Encoding.UTF8.GetString(m.Body);
    Console.WriteLine("Got a message:");
    Console.WriteLine(messageText);
    Console.WriteLine($"Enqueued at {m.SystemProperties.EnqueuedTimeUtc}");
    return Task.CompletedTask;
}

The final bit of code I've not shown is the OnException method. This allows us to gracefully handle any issues receiving messages. I'm just outputting information to the console in this example for now.

static Task OnException(ExceptionReceivedEventArgs args)
{
    Console.WriteLine("Got an exception:");
    Console.WriteLine(args.Exception.Message);
    Console.WriteLine(args.ExceptionReceivedContext.ToString());
    return Task.CompletedTask;
}

When I run this on my machine, after having sent two messages in advance with my Sender application, I see the following output:

$> dotnet run $connectionString
Listening, press any key
Got a message:
message 1
Enqueued at 24/04/2020 08:48:47
Got a message:
message 2
Enqueued at 24/04/2020 08:48:52

We've seen in this post how the Azure Service Bus SDK makes it very straightforward to send and receive messages. In the next post we'll dive a bit deeper into some of the options we can configure when sending and receiving messages.

The code for these demos can be found in this GitHub repository