Revisiting Various Change Feeds Consumption in .NET

The first time I wrote about change feeds consumption from .NET (ASP.NET Core to be more precise) was back in 2018 in the context of RethinkDB. It was always a very powerful concept. Having access to an ordered flow of information about changes to items is a low-entry enabler for various event-driven, stream processing, or data movement scenarios. As a result, over the years, this capability (with various name variations around the words change, stream, and feed) has found its way to many databases and sometimes even other storage services. The list includes (but is not limited to) MongoDB, RavenDB, Cosmos DB, DynamoDB or Azure Blob Storage (in preview).

As I was cleaning up and updating a demo application that shows how to consume and expose various change feeds from ASP.NET Core, I decided to write down some notes to refresh the content from my previous posts.

IAsyncEnumerable as Universal Change Feed Abstraction

When I started working with change feeds over 5 years ago, I initially didn't put them behind any abstraction. I like to think that I was smart and avoided premature generalization. The abstraction came after a couple of months when I could clearly see that I was implementing the same concepts through similar components in different projects where teams were using RethinkDB, MongoDB, or Cosmos DB. The abstraction that I started advocating back then looked usually like this.

public interface IChangeFeed<T>
{
    T CurrentChange { get; }

    Task<bool> MoveNextAsync(CancellationToken cancelToken = default(CancellationToken));
}

In retrospect, I'm happy with this abstraction, because around two or more years later, when those teams and projects started to adopt C# 8 and .NET Core 3 (or later versions), refactoring all those implementations was a lot easier. C# 8 has brought async streams, a natural programming model for asynchronous streaming data sources. Asynchronous streaming data source is exactly what change feeds are and modeling them through IAsyncEnumerable results in nice and clean consumption patterns. This is why currently I advocate for using IAsyncEnumerable as a universal change feed abstraction. The trick to properly using that abstraction is defining the right change representation to be returned. That should depend on change feed capabilities and actual needs in a given context. Not all change feeds are the same. Some of them can provide information on all operations performed on an item, and some only on a subset. Some can provide old and new value, and some only the old. Your representation of change should consider all that. In the samples ahead I'm avoiding this problem by reducing the change representation to the changed version of the item.

Azure Cosmos DB Change Feed

Azure Cosmos DB change feed is the second (after the RethinkDB one) I've been writing about in the past. It's also the one which consumption has seen the most evolution through time.

The first consumption model was quite complicated. It required going through partition key ranges, building document change feed queries for them, and then obtaining enumerators. This whole process required managing its state, which resulted in non-trivial code. It's good that it has been deprecated as part of Azure Cosmos DB .NET SDK V2, and it's going out of support in August 2024.

Azure Cosmos DB .NET SDK V3 has brought the second consumption model based on change feed processor. The whole inner workings of consuming the change feed have been enclosed within a single class, which reduced the amount of code required. But change feed processor has its oddities. It requires an additional container - a lease container that deals with previously described state management. This is beneficial in complex scenarios as it allows for coordinated processing by multiple workers, but becomes an unnecessary complication for simple scenarios. It also provides only a push-based programming model. The consumer must provide a delegate to receive changes. Once again this is great for certain scenarios, but leads to awkward implementation when you want to abstract change feed as a stream.

The story doesn't end there, version 3.20.0 of Azure Cosmos DB .NET SDK has introduced the third consumption model based on change feed iterator. It provides a pull-based alternative to change feed processor for scenarios where it's more appropriate. With the change feed iterator the control over the pace of consuming the changes is given back to the consumer. State management is also optional, but it's the consumer's responsibility to persist continuation tokens if necessary. Additionally, the change feed iterator brings the option of obtaining a change feed for a specific partition key.

The below snippet shows a very simple consumer implementation of the change feed iterator model - no state management, just starting the consumption from a certain point in time and waiting one second before polling for new changes.

public async IAsyncEnumerable<T> FetchFeed(
    [EnumeratorCancellation] CancellationToken cancellationToken = default)
{
    FeedIterator<T> changeFeedIterator = _container.GetChangeFeedIterator<T>(
        ChangeFeedStartFrom.Time(DateTime.UtcNow),
        ChangeFeedMode.LatestVersion
    );

    while (changeFeedIterator.HasMoreResults && !cancellationToken.IsCancellationRequested)
    {
        FeedResponse<T> changeFeedResponse = await changeFeedIterator
            .ReadNextAsync(cancellationToken);

        if (changeFeedResponse.StatusCode == HttpStatusCode.NotModified)
        {
            await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }
        else
        {
            foreach (T item in changeFeedResponse)
            {
                yield return item;
            }
        }
    }
}

MongoDB Change Feed

MongoDB is probably the most popular NoSQL choice among the teams that I've been working with, which doesn't use cloud PaaS databases for their needs. Among its many features, it has quite powerful change feed (a.k.a. Change Streams) capability.

The incoming change information can cover a wide spectrum of operations which can come from a single collection, database, or entire deployment. If the operation relates to a document, the change feed can provide the current version, the previous version, and the delta. There is also support for resume tokens which can be used to manage state if needed.

One unintuitive thing when it comes to MongoDB change feed is that it's only available when you are running a replica set or a sharded cluster. This doesn't mean that you have to run a cluster. You can run a single instance as a replica set (even in a container), you just need the right configuration (you will find a workflow that handles such a deployment to Azure Container Instances in the demo repository).

The consumption of MongoDB change feed is available through the Watch and WatchAsync methods available on IMongoCollection, IMongoDatabase, and IMongoClient instances. The below snippet watches a single collection and configures the change feed to return the current version of the document. You can also provide a pipeline definition when calling Watch or WatchAsync to filter the change feed (for example to monitor only specific operation types).

public async IAsyncEnumerable<T> FetchFeed(
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    IAsyncCursor<ChangeStreamDocument<T>> changefeed = await _collection.WatchAsync(
        new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup },
        cancellationToken: cancellationToken
    );

    while (!cancellationToken.IsCancellationRequested)
    {
        while (await changefeed.MoveNextAsync(cancellationToken))
        {
            IEnumerator<ChangeStreamDocument<T>>  changefeedCurrentEnumerator = changefeed
                .Current.GetEnumerator();

            while (changefeedCurrentEnumerator.MoveNext())
            {
                if (changefeedCurrentEnumerator.Current.OperationType
                    == ChangeStreamOperationType.Insert)
                {
                    yield return changefeedCurrentEnumerator.Current.FullDocument;
                }

                ...
            }
        }

        await Task.Delay(_moveNextDelay, cancellationToken);
    }
}

Azure Blob Storage Change Feed

Azure Blob Storage is the odd one on this list because it's an object storage, not a database. Its change feed provides information about changes to blobs and blobs metadata in an entire storage account. Under the hood the change feed is implemented as a special container (yes it's visible, yes you can take a look) which is being created once you enable it. As it is a container you should consider the configuration of the retention period as it will affect your costs.

There is one more important aspect of Azure Blob Storage change feed when considering its usage - latency. It's pretty slow. It can take minutes for changes to appear.

From the consumption perspective, it follows the enumerator approach. You can obtain the enumerator by calling BlobChangeFeedClient.GetChangesAsync. The enumerator is not infinite, it will return the changes currently available and once you process them you have to poll for new ones. This makes managing the continuation tokens required even for a local state. What is unique is that you can request changes within a specified time window.

The change feed supports six events in the latest schema version. In addition to expected ones like created or deleted, there are some interesting ones like tier changed. The information never contains the item, which shouldn't be surprising as in the context of object storage this would be quite risky.

The below snippet streams the change feed by locally managing the continuation token and for changes that represent blob creation, it downloads the current version of the item.

public async IAsyncEnumerable<T> FetchFeed(
    [EnumeratorCancellation]CancellationToken cancellationToken = default)
{
    string? continuationToken = null;

    TokenCredential azureCredential = new DefaultAzureCredential();

    BlobServiceClient blobServiceClient = new BlobServiceClient(_serviceUri, azureCredential);
    BlobChangeFeedClient changeFeedClient = _blobServiceClient.GetChangeFeedClient();

    while (!cancellationToken.IsCancellationRequested)
    {
        IAsyncEnumerator<Page<BlobChangeFeedEvent>> changeFeedEnumerator = changeFeedClient
            .GetChangesAsync(continuationToken)
            .AsPages()
            .GetAsyncEnumerator();

        while (await changeFeedEnumerator.MoveNextAsync())
        {
            foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedEnumerator.Current.Values)
            {
                if ((changeFeedEvent.EventType == BlobChangeFeedEventType.BlobCreated)
                    && changeFeedEvent.Subject.StartsWith($"/blobServices/default/containers/{_container}"))
                {
                    BlobClient createdBlobClient = new BlobClient(
                        changeFeedEvent.EventData.Uri,
                        azureCredential);

                    if (await createdBlobClient.ExistsAsync())
                    {
                        MemoryStream blobContentStream =
                            new MemoryStream((int)changeFeedEvent.EventData.ContentLength);
                        await createdBlobClient.DownloadToAsync(blobContentStream);
                        blobContentStream.Seek(0, SeekOrigin.Begin);

                        yield return JsonSerializer.Deserialize<T>(blobContentStream);
                    }
                }
            }

            continuationToken = changeFeedEnumerator.Current.ContinuationToken;
        }

        await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
    }
}

There Is More

The above samples are in no way exhaustive. They don't show all the features of given change feeds and they don't show all the change feeds out there. But they are a good start, this is why I've been evolving them for the past five years.