When you use queues, messages are read off in the order they are placed into the queue. This means that if there are 1000 messages in your queue, and now you want to send another message that is top priority, there’s no easy way to force it to the front of the queue.

The solution to this problem is to use “priority queues”. This allows high priority messages to get serviced immediately, irrespective of how many low priority messages are waiting.

There’s a few different options for how to implement priority queues in Azure Service Bus. We can choose how we partition the messages into priorities, either by using multiple queues, or multiple subscriptions on a topic. And we can also choose how we read from the queues – either with multiple concurrent listeners, or with a round-robin technique.

Sending Technique 1: Multiple Queues

A very simple way to achieve priority queues is to have two (or more) queues. One queue is for high priority messages, and the other for low priority. Whenever you send a message, you pick which queue to send it to. So this technique assumes that the code sending the message knows whether it should be high priority or not, and also knows how many priority queues there are.

In this simple code sample, we send three messages to the low priority queue and two to the high. We need two queue clients and to know the names of both queues to achieve this:

var clientHigh = QueueClient.CreateFromConnectionString(connectionString, "HighPriorityQueue");
var clientLow = QueueClient.CreateFromConnectionString(connectionString, "LowPriorityQueue");
clientLow.Send(new BrokeredMessage("Low 1"));
clientLow.Send(new BrokeredMessage("Low 2"));
clientLow.Send(new BrokeredMessage("Low 3"));
clientHigh.Send(new BrokeredMessage("High 1"));
clientHigh.Send(new BrokeredMessage("High 2"));

Sending Technique 2: One Topic with Multiple Subscriptions

An alternative approach is to make use of Azure Service Bus topics and subscriptions. With this approach, the messages are all sent to the same topic. But a piece of metadata is included with the message that can be used to partition the messages into high and low priorities.

So in this case we need a bit more setup. We’ll need a method to send messages with a Priority property attached:

void SendMessage(string body, TopicClient client, int priority)
{
    var message = new BrokeredMessage(body);
    message.Properties["Priority"] = priority;    
    client.Send(message);
}

And this allows us to send messages with priorities attached. We’ll send a few with priority 1, and a couple with priority 10:

var topicClient = TopicClient.CreateFromConnectionString(connectionString, "MyTopic");

SendMessage("Low 1", topicClient, 1);
SendMessage("Low 2", topicClient, 1);
SendMessage("Low 3", topicClient, 1);
SendMessage("High 1", topicClient, 10);
SendMessage("High 2", topicClient, 10);

But for this to work, we also need to have pre-created some subscriptions, that are set up to filter based on the Priority property. Here’s a helper method to ensure a subscription exists and has a single rule set on it:

SubscriptionClient CreateFilteredSub(string topicName, string subscriptionName, RuleDescription rule)
{
    if (!namespaceManager.SubscriptionExists(topicName, subscriptionName))
    {
        namespaceManager.CreateSubscription(topicName, subscriptionName);
    }
    var rules = namespaceManager.GetRules(topicName, subscriptionName);
    var subClient = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionName);
    foreach (var ruleName in rules.Select(r => r.Name))
    {
        subClient.RemoveRule(ruleName);
    }
    subClient.AddRule(rule);
    return subClient;
}

Now we can use this method to create our two filtered subscriptions, one for messages whose priority is >= 5, and one for those whose priority is < 5:

var subHigh = CreateFilteredSub("MyTopic", "HighPrioritySub", new RuleDescription("High", new SqlFilter("Priority >= 5")));
var subLow = CreateFilteredSub("MyTopic", "LowPrioritySub", new RuleDescription("Low", new SqlFilter("Priority < 5")));

Note that you must take care that your filters result in every message going to one or the other of the subscriptions. It would be possible if you weren’t careful with your filter clauses to lose messages or to double-process them.

So this technique is more work to set up, but removes knowledge of how many priority queues there are from the sending code. You could partition the priorities into more subscriptions, or change the rules about how priorities were determined to use different message metadata without necessarily having to change the code that sends the messages.

Receiving Technique 1: Simultaneous Listeners

We’ve seen how to partition our messages into high and low priority queues or subscriptions, but how do we go about receiving those messages and processing them?

Well, the easiest approach by far is simply to simultaneously listen on both queues (or subscriptions). For example, one thread is listening on the high priority queue and working through that, while another thread is listening on the low priority queue. You could assign more threads, or even separate machines to service each queue, using the “competing consumer pattern”.

The advantage of this approach is that conceptually it’s very simple. The disadvantage is that if both high and low priority queues are full, we’ll be simultaneously doing some high and some low priority work. That might be fine, but if there could be database contention introduced by the low priority work, you might prefer that all high priority messages are handled first, before doing any low priority work.

Receiving Technique 2: Round Robin Listening

So the second technique is simply to check the high priority queue for messages, and if there are any, process them. Once the high priority queue is empty, check the low priority queue for a message and process it. Then go back and check the high priority queue again.

Here’s a very simplistic implementation for two QueueClients (but it would be exactly the same for two SubscriptionClients if you were using a topic)

void PollForMessages(QueueClient clientHigh, QueueClient clientLow)
{
    bool gotAMessage = false;
    do
    {
        var priorityMessage = clientHigh.Receive(gotAMessage ? TimeSpan.FromSeconds(1) : TimeSpan.FromSeconds(30));
        if (priorityMessage != null)
        {
            gotAMessage = true;
            HandleMessage(priorityMessage);
        }
        else
        {
            var secondaryMessage = clientLow.Receive(TimeSpan.FromSeconds(1));
            if (secondaryMessage == null)
            {
                gotAMessage = false;
            }
            else
            {
                HandleMessage(secondaryMessage);
                gotAMessage = true;
            }
        }
    } while (true);
}

The only complex thing here is that I’m trying to change the timeouts on calls to Receive to avoid spending too much money if both queues are empty for prolonged periods. With Azure Service Bus you pay (a very small amount) for every call you make, so checking both queues every second might get expensive.

No doubt this algorithm could be improved on, and would need to be fine tuned for the specific needs of your application, but it does show that it’s not too hard to set up listeners that guarantee to process all available high priority messages before they begin working on the low priority messages.

Vote on HN
comments powered by Disqus