Waiting for External Events with Timeouts in Durable Functions
One of the workflow patterns that Azure Durable Functions supports is waiting for an external event. This is ideal for scenarios like waiting for manual approval before a process can continue, or waiting for an external system to perform a task.
Whenever you implement this pattern, it's very likely that you will also want to put a timeout into the workflow, so if the external event is not received, you can respond in some way.
Orchestrator code - waiting for events
Say for example, that we are implementing an order processing pipeline, and for orders over a certain value, we need to get manual approval.
Here's an example orchestrator function that requires a manual approval process for orders over $1000.
[FunctionName("O_ProcessOrder")]
public static async Task<object> ProcessOrder([OrchestrationTrigger] DurableOrchestrationContext ctx, TraceWriter log)
{
var order = ctx.GetInput<Order>();
if (order.Amount > 1000)
{
await ctx.CallActivityAsync("A_RequestOrderApproval", order);
using (var cts = new CancellationTokenSource())
{
var timeoutAt = ctx.CurrentUtcDateTime.AddHours(24);
var timeoutTask = ctx.CreateTimer(timeoutAt, cts.Token);
var approvalTask = ctx.WaitForExternalEvent<string>("OrderApprovalResult");
var winner = await Task.WhenAny(approvalTask, timeoutTask);
if (winner == approvalTask)
{
cts.Cancel(); // we should cancel the timeout task
}
else
{
// timed out
await ctx.CallActivityAsync("A_SendNotApprovedEmail", order);
return "Order not approved";
}
}
First of all, if approval is needed we call the A_RequestOrderApproval
activity function. This might send an email to someone who can review and approve the order.
Then we need a new CancellationTokenSource
to support cancelling the task that's waiting for an external event. Next we use DurableOrchestrationContext.CreateTimer
to create a task that will complete at a certain time. We might be tempted to use DateTime.UtcNow
here, but that's against the rules for an orchestrator function, which must be strictly deterministic. So instead we use DurableOrchestrationContext.CurrentUtcDateTime
to calcuate the timeout end time. In this example I've set it to 24 hours.
Notice we don't await
the return of ctx.CreateTimer
. That's because we're also going to call ctx.WaitForExternalEvent
in parallel. This is the task that's waiting for the external event to be sent to the workflow.
Next, we use Task.WhenAny
to see which task completes first. If it's the approvalTask
that means we got an external event before the timeout completed. We should cancel the timeout task because we don't need it. If the timeout task is the winner, then we can proceed with whatever action we want to take in the case of a timeout.
Notice that there is no way to cancel the WaitForExternalEvent
task. Should the event we were waiting for turn up in the future after the timeout has fired, then our orchestrator function will not take action on it, because the event sourcing history has already recorded that the timeout task won.
Simplifying waiting for timeout
One of the downsides of the orchestrator code shown above is that it is quite verbose, and (inspired by this tweet from Mikhail Shilkov) I wondered if it might be possible to create a helper overload of DurableOrchestrationContext.WaitForExternalEvent
that took a timeout.
Imagine if we could simplify the orchestrator code down to something like this:
await ctx.CallActivityAsync("A_RequestOrderApproval", order);
var approvalResult = await ctx.WaitForExternalEvent<string>(
"OrderApprovalResult", TimeSpan.FromHours(24));
if (approvalResult == null)
{
// we timed out
await ctx.CallActivityAsync("A_SendNotApprovedEmail", order);
return "Order not approved";
}
This has the advantage not only of being more succinct and readable, but also protecting developers from implementing this pattern incorrectly.
I wondered if this could be implemented as an extension method on DurableOrchestrationContext
, but my initial attempts failed. It's harder to implement than you might think as the Durable Task framework (which is what Durable Functions is built on) places very strict constraints on your use of the await
keyword. However, thanks to some expert advice from Chris Gillum I was able to create an extension method that gave me the behaviour I wanted.
The only question was how this method should signal a timeout. Chris rightly suggested that throwing a TaskCanceledException
was the proper way to do it, but that does mean the orchestrator function needs a try catch
block. I was happy for my purposes for it to return null
to simplify the orchestrator code (if only C# came with a built-in Option<T>
type!). Either approach is possible, and in my code example below I show both ways:
public static class DurableOrchestrationContextExtensions
{
public static Task<T> WaitForExternalEvent<T>(
this DurableOrchestrationContext ctx, string name, TimeSpan timeout)
{
var tcs = new TaskCompletionSource<T>();
var cts = new CancellationTokenSource();
var timeoutAt = ctx.CurrentUtcDateTime + timeout;
var timeoutTask = ctx.CreateTimer(timeoutAt, cts.Token);
var waitForEventTask = ctx.WaitForExternalEvent<T>(name);
waitForEventTask.ContinueWith(t =>
{
using (cts)
{
if (t.Exception != null)
{
tcs.TrySetException(t.Exception);
}
else
{
tcs.TrySetResult(t.Result);
}
cts.Cancel();
}
}, TaskContinuationOptions.ExecuteSynchronously);
timeoutTask.ContinueWith(t =>
{
using (cts)
{
//tcs.TrySetCanceled(); - if you'd prefer to throw a TaskCanceled exception
tcs.TrySetResult(default(T));
}
}, TaskContinuationOptions.ExecuteSynchronously);
return tcs.Task;
}
}
Hopefully something like this will become officially part of the Durable Functions, extension in the future, but in the mean-time it's very easy to use this extension method in your function app.
Sending external events to orchestrations
Durable Functions has a REST API that can be used to send an external event to an orchestration. You need to know the orchestration id, the name of the event, and the secure key to authorize the call. Then you post to the raiseEvent
endpoint, and pass whatever JSON payload you want as the event body.
However, if you're waiting for manual approval, you're hardly going to expect the approver to crack open Postman and call the REST API directly. And most external systems will have their own way of communicating back to you, whether by a webhook, or posting a message to a queue, or some other mechanism.
So usually, you create another Azure Function that will be triggered by the external event, and from within that function, pass on the message to the orchestration. Within that function, you will discover the orchestration id you need to send the event to, and then use DurableOrchestrationClient.RaiseEventAsync
to send the event to the orchestration.
In the following example, I have a regular HTTP triggered Azure Function, whose route contains the order id of the order to be approved (note this is not the same as the orchestration id). I've used a table storage binding to look up the orchestration id that relates to this order (The orchestrator has already written details of this order to table storage before it started waiting for this external event). And then I simply pass on whatever was in the body of the HTTP request as the event data for an external event, using DurableOrchestrationClient.RaiseEventAsync
.
[FunctionName("ApproveOrderById")]
public static async Task<IActionResult> ApproveOrderById(
[HttpTrigger(AuthorizationLevel.Function, "post", Route = "approve/{id}")]HttpRequest req,
[OrchestrationClient] DurableOrchestrationClient client,
[Table("Orders", "ORDER", "{id}", Connection = "AzureWebJobsStorage")] OrderEntity order,
TraceWriter log, string id)
{
log.Info($"Setting approval status of order {id}");
if (order == null)
{
return new NotFoundResult();
}
var requestBody = await new StreamReader(req.Body).ReadToEndAsync();
await client.RaiseEventAsync(order.OrchestrationId, "OrderApprovalResult", requestBody);
return new OkResult();
}
As you can see, the RaiseEventAsync
method makes it really straightfoward to pass on an event to an orchestration, whatever mechanism the external system you are waiting on actually uses to report back to you.
Postscript 1 - Durable Functions presentation
I'm really pleased to announce that I'm going to be speaking about Durable Functions at ProgNET London on September 12th 2018. I'd love to see you there. Let me know if you're planning on attending.
Postscript 2 - MVP Award
I should also take this opportunity to say how grateful and honoured I am to have been awarded the Microsoft MVP award for the second time. Last year was an amazing experience, especially making my first ever trip to the USA to attend the MVP Summit.
My award category is now Azure, which makes sense as that's been my main focus over the past few years. So I want to thank everyone at Microsoft for building awesome cloud based products and developer tools which I'm having loads of fun learning and teaching. Also, thanks to everyone who has followed my blog, watched my Pluralsight courses, and offered support and encouragement along the way - it means a lot to me. Finally, a huge thank you to everyone who is working hard at building developer community and sharing knowledge through blogs, books, videos, talks, and open source projects. Whether or not you've been recognized for your contribution with an award, it really is appreciated.
Comments
Thank you for the statement
It wasn't obvious in the Microsoft Docs. That said, I'm not quite sure what you meant by the next statement
I am currently passing the instance id and event name as a parameter to the external process and back to the other Azure Function, which works. How else would I "discover" the the id? scottrudy
All I mean is that if you didn't want your external process to be given the orchestration id, you could perform a lookup (e.g. against a database) yourself to find the orchestration id that the external event relates to.
Mark HeathHello, has this been incorporated into the source code itself?
Johnhttps://github.com/Azure/az...