Exposing Cosmos DB change feed from ASP.NET Core - Revisited

Just over a year ago I've written about exposing Cosmos DB change feed from ASP.NET Core. A lot has changed in those past 17 months, so I've decided it's the highest time I write an update to that post.

Updating the Abstraction

As the previous post was part of a series (and the demo project provides implementations for multiple databases), it introduced an abstraction for a real-time updates feed. That was nothing else than an asynchronous enumerator. Since C# 8 we have this out of the box.

public interface IChangefeed<out T>
{
    IAsyncEnumerable<T> FetchFeed(CancellationToken cancellationToken = default);
}

Switch to IAsyncEnumerable makes the implementation more readable. There is now a clear flow.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    public async IAsyncEnumerable<T> FetchFeed([EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            await ReadCollectionPartitionKeyRanges();

            while (CreateDocumentChangeFeedQueryForNextPartitionKeyRange())
            {
                while (await ExecuteCollectionChangeFeedQueryNextResultAsync(cancellationToken))
                {
                    while (MoveCollectionChangeFeedEnumeratorNext())
                    {
                        yield return _collectionChangeFeedEnumerator.Current;
                    }
                }
            }

            await WaitForNextPoll(cancellationToken);
        }
    }
}

Unfortunately, it still isn't simple. If you have read the previous post you know that those method calls are hiding a lot of complexity. But there is an alternative approach, one which I've hinted in the previous post: change feed processor.

Change Feed Processor

Change feed processor available for Azure Cosmos DB .NET SDK V2 wasn't easy to use as well. It required the implementation of several types and didn't provide a flexible programming model. Azure Cosmos DB .NET SDK V3 changed things quite a bit. First of all change feed processor has been built into the SDK. It has also been given a fluent API and a simpler programming model. Creating a change feed processor now requires only a monitored container, lease container, name for the instance, a delegate, and about 10 lines of code.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    private static DateTime _startTime = DateTime.Now;

    private readonly Container _container;
    private readonly Container _leaseContainer;
    private readonly TimeSpan _pollInterval;

    ...

    public async IAsyncEnumerable<T> FetchFeed([EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
        ...

        ChangeFeedProcessor changeFeedProcessor = _container.GetChangeFeedProcessorBuilder<T>($"{typeof(T).Name}_ChangeFeedProcessor",
        async (newChanges, cancellationToken) =>
        {

        })
        .WithInstanceName("Demo.AspNetCore.Changefeed")
        .WithStartTime(_startTime)
        .WithPollInterval(_pollInterval)
        .WithLeaseContainer(_leaseContainer)
        .Build();

        ...
    }
}

One important remark regarding a lease container is its partitioning. It should be either non-partitioned or partitioned by id. This means, that if you want to create a lease container through SDK (which doesn't allow creating non-partitioned containers) there is only one way.

await database.CreateContainerIfNotExistsAsync(LEASE_CONTAINER_NAME, "/id");

Now, how to use a delegate to provide an IAsyncEnumerable based API? A solution for simple scenarios can be a semaphores based producer-consumer implementation.

A Semaphores Based Producer-Consumer

Two semaphores can be used for synchronization between the delegate and its context.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    public async IAsyncEnumerable<T> FetchFeed([EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
        IReadOnlyCollection<T> changesToProcess = null;
        SemaphoreSlim changesToProcessSignal = new SemaphoreSlim(0, 1);
        SemaphoreSlim changesProcessedSignal = new SemaphoreSlim(0, 1);

        ...
    }
}

When the delegate is called with new changes it will set the changesToProcess collection, release the changesToProcessSignal semaphore, and wait on changesProcessedSignal semaphore.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    public async IAsyncEnumerable<T> FetchFeed([EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
        ...

        ChangeFeedProcessor changeFeedProcessor = _container.GetChangeFeedProcessorBuilder<T>($"{typeof(T).Name}_ChangeFeedProcessor",
        async (changes, cancellationToken) =>
        {
            changesToProcess = changes;

            changesToProcessSignal.Release();

            try
            {
                await changesProcessedSignal.WaitAsync(cancellationToken);
            }
            catch (OperationCanceledException)
            { }
        })
        ...
        .Build();

        await changeFeedProcessor.StartAsync();

        ...
    }
}

Once the change feed processor is started, the code "outside" will wait on changesToProcessSignal semaphore. Once it's released it will process the changes and notify the delegate by releasing changesProcessedSignal semaphore.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    public async IAsyncEnumerable<T> FetchFeed([EnumeratorCancellation]CancellationToken cancellationToken = default)
    {
        ...

        try
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                await changesToProcessSignal.WaitAsync(cancellationToken);

                foreach (T item in changesToProcess)
                {
                    yield return item;
                }
                changesToProcess = null;

                changesProcessedSignal.Release();
            }
        }
        finally
        {
            await changeFeedProcessor.StopAsync();
        }
    }
}

All this will be constantly running until a cancellation token will be triggered and the change feed processor stopped.

This is a lot simpler implementation than the one from the previous post. There are also more options now. If you need you can replace semaphores with a queue or take a completely different processing approach.

New Version Coming?

When I'm writing this, there is already V4 of Azure Cosmos DB .NET SDK on the way. But it's not a revolution like V3 was. It's mostly about embracing .NET Core 3. This means that some time will pass before I'll have to revisit this post once again.