Azure Functions 2.0 extensibility - triggers
This is my second post about Azure Functions extensibility. In the end of previous one I've mentioned that I've been lucky enough to find a real context for my learning adventure. This context came up in chat with colleagues from "friendly" project. They were looking to integrate Azure Functions with RethinkDB. The obvious question is: Why? Cosmos DB is a natural choice for greenfield project which wants to use NoSQL database and serverless approach. Unfortunately not all projects out there are greenfield. There is a lot of brownfield projects, and this "friendly" project of mine fits into this category. They simply can't drop RethinkDB without huge investment (which is planned for later time), so they've decided to use lift and shift approach for migrating existing functionality to Azure. As part of that migration RethinkDB will be containerized and deployed to Azure Container Instances. The desire to be able to use Azure Functions with this setup shouldn't be a surprise, as it adds a lot of value to the migration. The goal is to have below architecture available.
I've offered to work on the Azure Functions extensions which would make the integration possible. So far I was able to create trigger binding and in this post I would like to share what I've learned.
Trigger mapping and configuration
If you've read my previous post, you already know that at the root of every binding (input, output or trigger) there is an attribute. This attribute maps binding to function parameter and provides way to configure it. Take a look at below example.
[Binding]
[AttributeUsage(AttributeTargets.Parameter)]
public sealed class RethinkDbTriggerAttribute : Attribute
{
public string DatabaseName { get; private set; }
public string TableName { get; private set; }
[AppSetting]
public string HostnameSetting { get; set; }
...
public bool IncludeTypes { get; set; } = false;
public RethinkDbTriggerAttribute(string databaseName, string tableName)
{
...
DatabaseName = databaseName;
TableName = tableName;
}
}
The properties of attribute can be split in three groups.
First are read-only properties set through constructor. Those represent aspects which needs to be configured for every function separately. In case of RethinkDB trigger we have here DatabaseName
and TableName
properties, as it's reasonable to expect that every function will be observing different table.
Second group are standard properties. Those represent function specific, but optional settings. In case of RethinkDB trigger this is represented by IncludeTypes
property.
Last group is the most interesting one. Properties decorated with AppSettingAttribute
. Those allow for pointing to values from application settings, which makes them perfect for settings which might be shared between many functions. This is also why they are usually optional, to leave a possibility for global option which will be used by all functions and overridden only by specific ones. In case of RethinkDB trigger this is HostnameSetting
property, as most functions will be connecting to the same server or cluster.
Providing binding
The root (attribute) is in place, so the trigger binding can be built on top of it. This requires implementation of ITriggerBindingProvider
and ITriggerBinding
.
Implementation of ITriggerBindingProvider
will be called when functions are being discovered. At this point TryCreateAsync
method will be called by framework to acquire an ITriggerBinding
instance. The most important part of passed context are information about the parameter for which that instance should be created. It's important to remember that this doesn't have to be a parameter with trigger attribute. This is why the parameter custom attributes should be searched for the trigger one and if it's missing a Task
containing null
result should be returned. The fact that ITriggerBindingProvider
is looking for the trigger attribute instance makes it also natural place to read settings from the attribute. This is why, usually, the constructor takes IConfiguration
as parameter, along with trigger specific options and services (grabbing those from DI can be done as part of trigger registration).
internal class RethinkDbTriggerAttributeBindingProvider : ITriggerBindingProvider
{
...
private readonly Task<ITriggerBinding> _nullTriggerBindingTask =
Task.FromResult<ITriggerBinding>(null);
public RethinkDbTriggerAttributeBindingProvider(IConfiguration configuration,
RethinkDbOptions options, IRethinkDBConnectionFactory rethinkDBConnectionFactory,
INameResolver nameResolver, ILoggerFactory loggerFactory)
{
...
}
public Task<ITriggerBinding> TryCreateAsync(TriggerBindingProviderContext context)
{
...
ParameterInfo parameter = context.Parameter;
RethinkDbTriggerAttribute triggerAttribute =
parameter.GetCustomAttribute<RethinkDbTriggerAttribute>(inherit: false);
if (triggerAttribute is null)
{
return _nullTriggerBindingTask;
}
string triggerHostname = ResolveTriggerAttributeHostname(triggerAttribute);
Task<Connection> triggerConnectionTask =
_rethinkDBConnectionFactory.GetConnectionAsync(triggerHostname);
TableOptions triggerTableOptions = ResolveTriggerTableOptions(triggerAttribute);
return Task.FromResult<ITriggerBinding>(
new RethinkDbTriggerBinding(parameter, triggerConnectionTask,
triggerTableOptions, triggerAttribute.IncludeTypes)
);
}
private string ResolveTriggerAttributeHostname(RethinkDbTriggerAttribute triggerAttribute)
{
string hostname = null;
if (!String.IsNullOrEmpty(triggerAttribute.HostnameSetting))
{
hostname = _configuration.GetConnectionStringOrSetting(triggerAttribute.HostnameSetting);
if (String.IsNullOrEmpty(hostname))
{
throw new InvalidOperationException("...");
}
return hostname;
}
hostname = _options.Hostname;
if (String.IsNullOrEmpty(hostname))
{
throw new InvalidOperationException("...");
}
return hostname;
}
private TableOptions ResolveTriggerTableOptions(RethinkDbTriggerAttribute triggerAttribute)
{
return new TableOptions(
ResolveAttributeValue(triggerAttribute.DatabaseName),
ResolveAttributeValue(triggerAttribute.TableName)
);
}
private string ResolveAttributeValue(string attributeValue)
{
return _nameResolver.Resolve(attributeValue) ?? attributeValue;
}
}
Created ITriggerBinding
instance is used only by single function. It has several responsibilities:
- Defining the type of value returned by trigger.
- Binding with different possible types for which trigger can provide value.
- Creating a listener for events which should result in function execution.
Defining the type of value returned by trigger is simple, it only requires implementation of TriggerValueType
property.
Providing binding with different types requires implementation of BindingDataContract
property and BindAsync
method (which also means ITriggerData
implementation). This part is optional, as additional converters and value providers can be added through binding rules while registering the trigger.
Listener creation is the key aspect of trigger binding, its heart. Listener instance should be fully initialized with function specific settings and ITriggeredFunctionExecutor
available from context.
internal class RethinkDbTriggerBinding : ITriggerBinding
{
...
private readonly Task<ITriggerData> _emptyBindingDataTask =
Task.FromResult<ITriggerData>(new TriggerData(null, new Dictionary<string, object>()));
public Type TriggerValueType => typeof(DocumentChange);
public IReadOnlyDictionary<string, Type> BindingDataContract { get; } =
new Dictionary<string, Type>();
public RethinkDbTriggerBinding(ParameterInfo parameter, Task<Connection>
rethinkDbConnectionTask, TableOptions rethinkDbTableOptions, bool includeTypes)
{
...
}
...
public Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
return _emptyBindingDataTask;
}
public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
{
...
return Task.FromResult<IListener>(
new RethinkDbTriggerListener(context.Executor, _rethinkDbConnectionTask,
_rethinkDbTable, _includeTypes)
);
}
...
}
Listener implementation is important enough to get its own section.
Listening for events
For those with ASP.NET Core experience, listener implementation may be something familiar. The reason is that IListener
interface resembles IHostedService
. This is great observation. It allows for using established patterns for dealing with asynchrony and long-running task, like BackgroundService
. I've implemented IHostedService
listening for RethinkDB changefeed in the past, so I've simply reused that knowledge here. The important part is to remember to call ITriggeredFunctionExecutor.TryExecuteAsync
when new change arrives.
internal class RethinkDbTriggerListener : IListener
{
...
private Task _listenerTask;
private CancellationTokenSource _listenerStoppingTokenSource;
public RethinkDbTriggerListener(ITriggeredFunctionExecutor executor,
Task<Connection> rethinkDbConnectionTask, Table rethinkDbTable,
bool includeTypes)
{
...
}
...
public void Cancel()
{
StopAsync(CancellationToken.None).Wait();
}
public Task StartAsync(CancellationToken cancellationToken)
{
_listenerStoppingTokenSource = new CancellationTokenSource();
_listenerTask = ListenAsync(_listenerStoppingTokenSource.Token);
return _listenerTask.IsCompleted ? _listenerTask : Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (_listenerTask == null)
{
return;
}
try
{
_listenerStoppingTokenSource.Cancel();
}
finally
{
await Task.WhenAny(_listenerTask,
Task.Delay(Timeout.Infinite, cancellationToken));
}
}
public void Dispose()
{ }
private async Task ListenAsync(CancellationToken listenerStoppingToken)
{
Connection rethinkDbConnection =
(_rethinkDbConnectionTask.Status == TaskStatus.RanToCompletion)
? _rethinkDbConnectionTask.Result : (await _rethinkDbConnectionTask);
Cursor<DocumentChange> changefeed = await _rethinkDbTable.Changes()
.OptArg("include_types", _includeTypes)
.RunCursorAsync<DocumentChange>(rethinkDbConnection, listenerStoppingToken);
while (!listenerStoppingToken.IsCancellationRequested
&& (await changefeed.MoveNextAsync(listenerStoppingToken)))
{
await _executor.TryExecuteAsync(
new TriggeredFunctionData() { TriggerValue = changefeed.Current },
CancellationToken.None
);
}
changefeed.Close();
}
}
Now all needed classes are ready.
Trigger registration
In my previous post I've mentioned that bindings are registered inside of Initialize
method of IExtensionConfigProvider
implementation. IExtensionConfigProvider
implementation is also a gateway to DI, all needed dependencies can be injected through constructor. This includes services provided by Azure Functions runtime, binding specific global options and binding specific services registered in IWebJobsStartup
.
The registration itself boils down to creating a binding rule based on trigger attribute, mapping that rule to ITriggerBindingProvider
and adding any additional converters.
[Extension("RethinkDB")]
internal class RethinkDbExtensionConfigProvider : IExtensionConfigProvider
{
...
public RethinkDbExtensionConfigProvider(IConfiguration configuration,
IOptions<RethinkDbOptions> options,
IRethinkDBConnectionFactory rethinkDBConnectionFactory,
INameResolver nameResolver,
ILoggerFactory loggerFactory)
{
...
}
public void Initialize(ExtensionConfigContext context)
{
...
var triggerAttributeBindingRule = context.AddBindingRule<RethinkDbTriggerAttribute>();
triggerAttributeBindingRule.BindToTrigger<DocumentChange>(
new RethinkDbTriggerAttributeBindingProvider(_configuration, _options,
_rethinkDBConnectionFactory, _nameResolver, _loggerFactory)
);
triggerAttributeBindingRule.AddConverter<..., ...>(...);
}
}
Trigger usage
Properly registered trigger can be used in exactly same way as built in ones (or extensions provided by Microsoft). One just needs to apply trigger attribute to proper parameter.
public static class RethinkDbTriggeredFunction
{
[FunctionName("RethinkDbTriggeredFunction")]
public static void Run([RethinkDbTrigger(
databaseName: "Demo_AspNetCore_Changefeed_RethinkDB",
tableName: "ThreadStats",
HostnameSetting = "RethinkDbHostname")]DocumentChange change,
ILogger log)
{
log.LogInformation("[ThreadStats Change Received] " + change.GetNewValue<ThreadStats>());
}
}
This function will be triggered every time a change in specified table occurs.