Exposing Cosmos DB change feed from ASP.NET Core

Some time ago I was working on exposing RethinkDB change feed from ASP.NET Core. Recently I've been asked to prepare a proof-of-concept for achieving same functionality on top of Azure Cosmos DB.

Defining an abstraction

First things first, what is the meaning of "same functionality" here. It's not about a specific use case like my demo application, but about the idea of real-time updates feed. In terms which we developers understand the best, the goal is to implement following interface.

public interface IChangeFeed<T>
{
    T CurrentNewValue { get; }

    Task<bool> MoveNextAsync(CancellationToken cancelToken = default(CancellationToken));
}

Clearly the interface derives from RethinkDB capabilities, so the implementation for it is straightforward.

internal class RethinkDbChangeFeed<T> : IChangeFeed<T>
{
    private readonly Cursor<Change<T>> _cursor;

    public T CurrentNewValue { get { return _cursor.Current.NewValue; } }

    public RethinkDbChangeFeed(Cursor<Change<T>> cursor)
    {
        _cursor = cursor;
    }

    public Task<bool> MoveNextAsync(CancellationToken cancelToken = default(CancellationToken))
    {
        return _cursor.MoveNextAsync(cancelToken);
    }
}

How about Cosmos DB?

Change feed in Cosmos DB

The Azure Cosmos DB has built-in support for change feed, but it's different from RethinkDB. It doesn't have the real-time nature. It's a persisted, sorted list of changed documents which can be read up to current moment and processed. In order to read the change feed one must get the partition key ranges for desired collection, create change feed query for every partition key range, get all responses from every query and iterate over the documents in the responses. To get the real-time feeling the entire process needs to be wrapped in polling pattern based on checkpoints.

The implementation

Knowing how change feed works in Cosmos DB it shouldn't be a surprise that implementation will be more complicated. In general the steps for reading the feed are not hard to implement if the goal is to read it completely. But the desired abstraction is in fact an enumerator. The feed should be represented as endless stream which can be advanced by single item. This calls for some kind of state machine. There is an easy way for implementing state machines in C# - you create a method which returns IEnumerable and use yield return in correct place. This could allow for straightforward implementation, but there is a catch. The reading the feed involves async calls and IAsyncEnumerable is not here yet. The resulting implementation would be blocking, which is not desired (if you are interested in how it would look check out BlockingCosmosDbChangefeed). For a correct asynchronous implementation the "state" needs to be managed manually. The state in this case consist of feed enumerator, query and partition key ranges. Approaching the process "backwards" gives following algorithm:

  1. If there is a change feed enumerator and it can be moved to next value, move and return.
  2. If there is a query and it contains more results, obtain new enumerator from query and return to step 1.
  3. If there are partition key ranges and current range is not the last one, create new query for next range and return to step 2.
  4. If there are partition key ranges and current range is the last one, wait (polling delay).
  5. Read partition key ranges.

This can be wrapped in a loop. Additionally there should be a cancellation token which allows to break that loop. A simple (it will go through unnecessary check while returning to step 2) implementation can look like below.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    public T CurrentNewValue { get; set; } = default(T);

    ...

    public async Task<bool> MoveNextAsync(CancellationToken cancelToken = default(CancellationToken))
    {
        while (!cancelToken.IsCancellationRequested)
        {
            if (MoveCollectionChangeFeedEnumeratorNext())
            {
                return true;
            }

            if (await ExecuteCollectionChangeFeedQueryNextResultAsync(cancelToken))
            {
                continue;
            }

            if (CreateDocumentChangeFeedQueryForNextPartitionKeyRange(cancelToken))
            {
                continue;
            }

            await WaitForNextPoll(cancelToken);

            await ReadCollectionPartitionKeyRanges(cancelToken);
        }

        return false;
    }
}

Let's go through the steps starting from the last one. Reading partition key ranges is a copy-paste from documentation, the only important thing here is to clear current index.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    private int _collectionPartitionKeyRangeIndex;
    private List<PartitionKeyRange> _collectionPartitionKeyRanges;

    ...

    private async Task ReadCollectionPartitionKeyRanges(CancellationToken cancelToken)
    {
        if (!cancelToken.IsCancellationRequested)
        {
            List<PartitionKeyRange> collectionPartitionKeyRanges = new List<PartitionKeyRange>();

            string collectionPartitionKeyRangesResponseContinuation = null;
            do
            {
                FeedResponse<PartitionKeyRange> collectionPartitionKeyRangesResponse =
                    await _documentClient.ReadPartitionKeyRangeFeedAsync(_collectionUri, new FeedOptions
                    {
                        RequestContinuation = collectionPartitionKeyRangesResponseContinuation
                    });

                collectionPartitionKeyRanges.AddRange(collectionPartitionKeyRangesResponse);
                collectionPartitionKeyRangesResponseContinuation =
                    collectionPartitionKeyRangesResponse.ResponseContinuation;
            }
            while (collectionPartitionKeyRangesResponseContinuation != null);

            _collectionPartitionKeyRanges = collectionPartitionKeyRanges;
            _collectionPartitionKeyRangeIndex = -1;
        }
    }
}

Creating the query requires passing the partition key range identifier (we need to correctly advance index here and check bounds) and the point from which the feed should be queried. This can be either a beginning of the feed, point in time or checkpoint. All three options can be provided in any combination, in such case the checkpoint is most important and the beginning is least important. This implementation provides the point in time (start of the application) and attempts to provide checkpoint. If the checkpoint is present it will win.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    private static DateTime _startTime = DateTime.Now;

    ...

    private IDocumentQuery<Document> _collectionChangeFeedQuery;
    private readonly Dictionary<string, string> _collectionPartitionKeyRangesCheckpoints =
        new Dictionary<string, string>();

    ...

    private bool CreateDocumentChangeFeedQueryForNextPartitionKeyRange(CancellationToken cancelToken)
    {
        if ((_collectionPartitionKeyRanges != null)
            && ((++_collectionPartitionKeyRangeIndex) < _collectionPartitionKeyRanges.Count)
            && !cancelToken.IsCancellationRequested)
        {
            string collectionPartitionKeyRangeCheckpoint = null;
            _collectionPartitionKeyRangesCheckpoints
                .TryGetValue(_collectionPartitionKeyRanges[_collectionPartitionKeyRangeIndex].Id,
                             out collectionPartitionKeyRangeCheckpoint);

            _collectionChangeFeedQuery = _documentClient.CreateDocumentChangeFeedQuery(_collectionUri,
            new ChangeFeedOptions
            {
                PartitionKeyRangeId = _collectionPartitionKeyRanges[_collectionPartitionKeyRangeIndex].Id,
                RequestContinuation = collectionPartitionKeyRangeCheckpoint,
                MaxItemCount = -1,
                StartTime = _startTime
            });

            return true;
        }

        return false;
    }
}

Obtaining the enumerator from query is done by executing new result. This is also the moment when the checkpoint can stored. The result provides it in form of the last logical sequence number of the document.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    private IEnumerator<T> _collectionChangeFeedEnumerator;

    ...

    private async Task<bool> ExecuteCollectionChangeFeedQueryNextResultAsync(CancellationToken cancelToken)
    {
        if ((_collectionChangeFeedQuery != null)
            && _collectionChangeFeedQuery.HasMoreResults
            && !cancelToken.IsCancellationRequested)
        {
            FeedResponse<T> collectionChangeFeedResponse =
                await _collectionChangeFeedQuery.ExecuteNextAsync<T>(cancelToken);
            _collectionPartitionKeyRangesCheckpoints
                [_collectionPartitionKeyRanges[_collectionPartitionKeyRangeIndex].Id] =
                collectionChangeFeedResponse.ResponseContinuation;

            _collectionChangeFeedEnumerator = collectionChangeFeedResponse.GetEnumerator();

            return true;
        }

        return false;
    }
}

The enumerator is standard .NET enumerator, we can go through it and dispose it when done.

internal class CosmosDbChangefeed<T> : IChangefeed<T>
{
    ...

    private bool MoveCollectionChangeFeedEnumeratorNext()
    {
        if (_collectionChangeFeedEnumerator != null)
        {
            if (_collectionChangeFeedEnumerator.MoveNext())
            {
                CurrentNewValue = _collectionChangeFeedEnumerator.Current;
                return true;
            }

            _collectionChangeFeedEnumerator.Dispose();
            _collectionChangeFeedEnumerator = null;
        }

        return false;
    }

    ...


}

This is it. I've skipped the wait step, but it's just a Task.Delay with condition. The whole solution can be found here.

Cosmos DB change feed processor

I had a very specific abstraction to implement, this is why I've used the SDK directly. There is a way to avoid the low level details by using change feed processor library. It hides a lot of complexity and allows easy distribution across multiple consumers. All you need to do is provide two implementations (DocumentFeedObserver/IChangeFeedObserver and DocumentFeedObserverFactory/IChangeFeedObserverFactory) and a lease collection (to coordinate multiple clients). It's a great alternative in many cases.