Posted in:

One of the workflow patterns that Azure Durable Functions supports is waiting for an external event. This is ideal for scenarios like waiting for manual approval before a process can continue, or waiting for an external system to perform a task.

Whenever you implement this pattern, it's very likely that you will also want to put a timeout into the workflow, so if the external event is not received, you can respond in some way.

Orchestrator code - waiting for events

Say for example, that we are implementing an order processing pipeline, and for orders over a certain value, we need to get manual approval.

Here's an example orchestrator function that requires a manual approval process for orders over $1000.

[FunctionName("O_ProcessOrder")]
public static async Task<object> ProcessOrder([OrchestrationTrigger] DurableOrchestrationContext ctx, TraceWriter log)
{
    var order = ctx.GetInput<Order>();

    if (order.Amount > 1000)
    {
        await ctx.CallActivityAsync("A_RequestOrderApproval", order);
        using (var cts = new CancellationTokenSource())
        {
            var timeoutAt = ctx.CurrentUtcDateTime.AddHours(24);
            var timeoutTask = ctx.CreateTimer(timeoutAt, cts.Token);
            var approvalTask = ctx.WaitForExternalEvent<string>("OrderApprovalResult");

            var winner = await Task.WhenAny(approvalTask, timeoutTask);
            if (winner == approvalTask)
            {
                cts.Cancel(); // we should cancel the timeout task
            }
            else
            {
                // timed out
                await ctx.CallActivityAsync("A_SendNotApprovedEmail", order);
                return "Order not approved";
            }
        }

First of all, if approval is needed we call the A_RequestOrderApproval activity function. This might send an email to someone who can review and approve the order.

Then we need a new CancellationTokenSource to support cancelling the task that's waiting for an external event. Next we use DurableOrchestrationContext.CreateTimer to create a task that will complete at a certain time. We might be tempted to use DateTime.UtcNow here, but that's against the rules for an orchestrator function, which must be strictly deterministic. So instead we use DurableOrchestrationContext.CurrentUtcDateTime to calcuate the timeout end time. In this example I've set it to 24 hours.

Notice we don't await the return of ctx.CreateTimer. That's because we're also going to call ctx.WaitForExternalEvent in parallel. This is the task that's waiting for the external event to be sent to the workflow.

Next, we use Task.WhenAny to see which task completes first. If it's the approvalTask that means we got an external event before the timeout completed. We should cancel the timeout task because we don't need it. If the timeout task is the winner, then we can proceed with whatever action we want to take in the case of a timeout.

Notice that there is no way to cancel the WaitForExternalEvent task. Should the event we were waiting for turn up in the future after the timeout has fired, then our orchestrator function will not take action on it, because the event sourcing history has already recorded that the timeout task won.

Simplifying waiting for timeout

One of the downsides of the orchestrator code shown above is that it is quite verbose, and (inspired by this tweet from Mikhail Shilkov) I wondered if it might be possible to create a helper overload of DurableOrchestrationContext.WaitForExternalEvent that took a timeout.

Imagine if we could simplify the orchestrator code down to something like this:

await ctx.CallActivityAsync("A_RequestOrderApproval", order);

var approvalResult = await ctx.WaitForExternalEvent<string>(
    "OrderApprovalResult", TimeSpan.FromHours(24));

if (approvalResult == null)
{
    // we timed out
    await ctx.CallActivityAsync("A_SendNotApprovedEmail", order);
    return "Order not approved";
}

This has the advantage not only of being more succinct and readable, but also protecting developers from implementing this pattern incorrectly.

I wondered if this could be implemented as an extension method on DurableOrchestrationContext, but my initial attempts failed. It's harder to implement than you might think as the Durable Task framework (which is what Durable Functions is built on) places very strict constraints on your use of the await keyword. However, thanks to some expert advice from Chris Gillum I was able to create an extension method that gave me the behaviour I wanted.

The only question was how this method should signal a timeout. Chris rightly suggested that throwing a TaskCanceledException was the proper way to do it, but that does mean the orchestrator function needs a try catch block. I was happy for my purposes for it to return null to simplify the orchestrator code (if only C# came with a built-in Option<T> type!). Either approach is possible, and in my code example below I show both ways:

public static class DurableOrchestrationContextExtensions
{
    public static Task<T> WaitForExternalEvent<T>(
        this DurableOrchestrationContext ctx, string name, TimeSpan timeout)
    {
        var tcs = new TaskCompletionSource<T>();
        var cts = new CancellationTokenSource();

        var timeoutAt = ctx.CurrentUtcDateTime + timeout;
        var timeoutTask = ctx.CreateTimer(timeoutAt, cts.Token);
        var waitForEventTask = ctx.WaitForExternalEvent<T>(name);

        waitForEventTask.ContinueWith(t =>
        {
            using (cts)
            {
                if (t.Exception != null)
                {
                    tcs.TrySetException(t.Exception);
                }
                else
                {
                    tcs.TrySetResult(t.Result);
                }
                cts.Cancel();
            }
        }, TaskContinuationOptions.ExecuteSynchronously);

        timeoutTask.ContinueWith(t =>
        {
            using (cts)
            {
                //tcs.TrySetCanceled(); - if you'd prefer to throw a TaskCanceled exception
                tcs.TrySetResult(default(T));
            }
        }, TaskContinuationOptions.ExecuteSynchronously);

        return tcs.Task;
    }
}

Hopefully something like this will become officially part of the Durable Functions, extension in the future, but in the mean-time it's very easy to use this extension method in your function app.

Sending external events to orchestrations

Durable Functions has a REST API that can be used to send an external event to an orchestration. You need to know the orchestration id, the name of the event, and the secure key to authorize the call. Then you post to the raiseEvent endpoint, and pass whatever JSON payload you want as the event body.

However, if you're waiting for manual approval, you're hardly going to expect the approver to crack open Postman and call the REST API directly. And most external systems will have their own way of communicating back to you, whether by a webhook, or posting a message to a queue, or some other mechanism.

So usually, you create another Azure Function that will be triggered by the external event, and from within that function, pass on the message to the orchestration. Within that function, you will discover the orchestration id you need to send the event to, and then use DurableOrchestrationClient.RaiseEventAsync to send the event to the orchestration.

In the following example, I have a regular HTTP triggered Azure Function, whose route contains the order id of the order to be approved (note this is not the same as the orchestration id). I've used a table storage binding to look up the orchestration id that relates to this order (The orchestrator has already written details of this order to table storage before it started waiting for this external event). And then I simply pass on whatever was in the body of the HTTP request as the event data for an external event, using DurableOrchestrationClient.RaiseEventAsync.

[FunctionName("ApproveOrderById")]
public static async Task<IActionResult> ApproveOrderById(
    [HttpTrigger(AuthorizationLevel.Function, "post", Route = "approve/{id}")]HttpRequest req,
    [OrchestrationClient] DurableOrchestrationClient client,
    [Table("Orders", "ORDER", "{id}", Connection = "AzureWebJobsStorage")] OrderEntity order,
    TraceWriter log, string id)
{
    log.Info($"Setting approval status of order {id}");

    if (order == null)
    {
        return new NotFoundResult();
    }
    var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
    await client.RaiseEventAsync(order.OrchestrationId, "OrderApprovalResult", requestBody);
    return new OkResult();
}

As you can see, the RaiseEventAsync method makes it really straightfoward to pass on an event to an orchestration, whatever mechanism the external system you are waiting on actually uses to report back to you.

Postscript 1 - Durable Functions presentation

I'm really pleased to announce that I'm going to be speaking about Durable Functions at ProgNET London on September 12th 2018. I'd love to see you there. Let me know if you're planning on attending.

Postscript 2 - MVP Award

I should also take this opportunity to say how grateful and honoured I am to have been awarded the Microsoft MVP award for the second time. Last year was an amazing experience, especially making my first ever trip to the USA to attend the MVP Summit.

My award category is now Azure, which makes sense as that's been my main focus over the past few years. So I want to thank everyone at Microsoft for building awesome cloud based products and developer tools which I'm having loads of fun learning and teaching. Also, thanks to everyone who has followed my blog, watched my Pluralsight courses, and offered support and encouragement along the way - it means a lot to me. Finally, a huge thank you to everyone who is working hard at building developer community and sharing knowledge through blogs, books, videos, talks, and open source projects. Whether or not you've been recognized for your contribution with an award, it really is appreciated.

Want to learn more about how easy it is to get up and running with Durable Functions? Be sure to check out my Pluralsight course Azure Durable Functions Fundamentals.

Comments

Comment by scottrudy

Thank you for the statement

"So usually, you create another Azure Function that will be triggered by the external event, and from within that function, pass on the message to the orchestration."

It wasn't obvious in the Microsoft Docs. That said, I'm not quite sure what you meant by the next statement

"Within that function, you will discover the orchestration id you need to send the event to"

I am currently passing the instance id and event name as a parameter to the external process and back to the other Azure Function, which works. How else would I "discover" the the id?

scottrudy
Comment by Mark Heath

All I mean is that if you didn't want your external process to be given the orchestration id, you could perform a lookup (e.g. against a database) yourself to find the orchestration id that the external event relates to.

Mark Heath