Exposing RethinkDB changefeed from ASP.NET Core

Recently I've been looking into using some "special tasks" databases with ASP.NET Core. One of them was RethinkDB. In general RethinkDB is a NoSQL database, which stores schemaless JSON documents. What's special about it, is its real-time capabilities - changefeeds. Changefeeds provide an option of continuously pushing updated query results to applications in real-time. The data can come to RethinkDB from various sources and be perceived as stream of events. I wanted to expose that stream through the application to the clients.

Setting up RethinkDB connection

There is a .NET Standard 2.0 driver for RethinkDB available as open source project (free to use if SSL/TLS encryption is not required). For interaction with the driver a globally provided singleton RethinkDb.Driver.RethinkDB.R should be used. This singleton together with open connection allows for using the database (the driver supports connections pooling with couple possible strategies, but I'm not going to use it here). Establishing connection requires hostname or IP and optionally port and timeout.

internal class RethinkDbOptions
{
    public string HostnameOrIp { get; set; }

    public int? DriverPort { get; set; }

    public int? Timeout { get; set; }
}

The connection object is expensive to create and it is advised to you use only one in the application. To achieve that the connection object can be encapsulated (together with the global singleton) in dedicated service.

internal class RethinkDbSingletonProvider : IRethinkDbSingletonProvider
{
    public RethinkDb.Driver.RethinkDB RethinkDbSingleton { get; }

    public RethinkDb.Driver.Net.Connection RethinkDbConnection { get; }

    public RethinkDbSingletonProvider(IOptions<RethinkDbOptions> options)
    {
        if (options == null)
        {
            throw new ArgumentNullException(nameof(options));
        }

        if (String.IsNullOrWhiteSpace(options.Value.HostnameOrIp))
        {
            throw new ArgumentNullException(nameof(RethinkDbOptions.HostnameOrIp));
        }

        var rethinkDbSingleton = RethinkDb.Driver.RethinkDB.R;

        var rethinkDbConnection = rethinkDbSingleton.Connection()
            .Hostname(options.Value.HostnameOrIp);

        if (options.Value.DriverPort.HasValue)
        {
            rethinkDbConnection.Port(options.Value.DriverPort.Value);
        }

        if (options.Value.Timeout.HasValue)
        {
            rethinkDbConnection.Timeout(options.Value.Timeout.Value);
        }

        RethinkDbConnection = rethinkDbConnection.Connect();

        RethinkDbSingleton = rethinkDbSingleton;
    }
}

To ensure a single instance the service should be registered as singleton.

internal static class ServiceCollectionExtensions
{
    public static IServiceCollection AddRethinkDb(this IServiceCollection services,
        Action<RethinkDbOptions> configureOptions)
    {
        if (services == null)
        {
            throw new ArgumentNullException(nameof(services));
        }
        if (configureOptions == null)
        {
            throw new ArgumentNullException(nameof(configureOptions));
        }

        services.Configure(configureOptions);
        services.TryAddSingleton<IRethinkDbSingletonProvider, RethinkDbSingletonProvider>();
        services.TryAddTransient<IRethinkDbService, RethinkDbService>();

        return services;
    }
}

The second service (RethinkDbService) will handle the RethinkDB specific logic.

Creating a data source

The changefeeds capability is great for solving specific use cases. But for demo purposes it's better to have something simple for brevity. Good enough example can be an IHostedService for gathering ThreadPool statistics.

internal class ThreadStats
{
    public int WorkerThreads { get; set; }

    public int MinThreads { get; set; }

    public int MaxThreads { get; set; }

    public override string ToString()
    {
        return $"Available: {WorkerThreads}, Active: {MaxThreads - WorkerThreads}, Min: {MinThreads}, Max: {MaxThreads}";
    }
}

internal class ThreadStatsService : IHostedService
{
    private readonly IRethinkDbService _rethinkDbService;

    ...

    private async Task GatherThreadStatsAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            ThreadPool.GetAvailableThreads(out var workerThreads, out var _);
            ThreadPool.GetMinThreads(out var minThreads, out var _);
            ThreadPool.GetMaxThreads(out var maxThreads, out var _);

            _rethinkDbService.InsertThreadStats(new ThreadStats
            {
                WorkerThreads = workerThreads,
                MinThreads = minThreads,
                MaxThreads = maxThreads
            });

            await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }
    }
}

If this code looks familiar it is because its based on one of David Fowler demos from NDC London 2018. Adding controller from that demo to the application provides an easy way to use up some threads.

The programming model of RethinkDB driver is a little bit specific. In general one builds the query and at the end runs it providing the connection. This makes inserting the stats look like below.

internal class RethinkDbService: IRethinkDbService
{
    private const string DATABASE_NAME = "Demo_AspNetCore_RethinkDB";
    private const string THREAD_STATS_TABLE_NAME = "ThreadStats";

    private readonly RethinkDb.Driver.RethinkDB _rethinkDbSingleton;
    private readonly Connection _rethinkDbConnection;

    public RethinkDbService(IRethinkDbSingletonProvider rethinkDbSingletonProvider)
    {
        if (rethinkDbSingletonProvider == null)
        {
            throw new ArgumentNullException(nameof(rethinkDbSingletonProvider));
        }

        _rethinkDbSingleton = rethinkDbSingletonProvider.RethinkDbSingleton;
        _rethinkDbConnection = rethinkDbSingletonProvider.RethinkDbConnection;
    }

    public void InsertThreadStats(ThreadStats threadStats)
    {
        _rethinkDbSingleton.Db(DATABASE_NAME).Table(THREAD_STATS_TABLE_NAME)
            .Insert(threadStats).Run(_rethinkDbConnection);
    }
}

With the data source in place, the changefeed can be exposed.

Exposing changefeed with Server-Sent Events

Server-Sent Events feels like natural choice here. This is exactly the scenario this technology has been designed for. Also with ready to use library the implementation should be straightforward. First the required service and middleware needs to be registered.

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddRethinkDb(options =>
        {
            options.HostnameOrIp = "127.0.0.1";
        });

        services.AddServerSentEvents();

        services.AddSingleton<IHostedService, ThreadStatsService>();

        services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
    }

    public void Configure(IApplicationBuilder app)
    {
        app.MapServerSentEvents("/thread-stats-changefeed");

        app.UseStaticFiles();

        app.UseMvc();

        app.Run(async (context) =>
        {
            await context.Response.WriteAsync("-- Demo.AspNetCore.RethinkDB --");
        });
    }
}

In order to acquire the changefeed from RethinkDB one needs to build the query, call Changes() on it and then run it by calling RunChanges()/RunChangesAsync(). This returns a cursor which can be iterated later.

internal class RethinkDbService: IRethinkDbService
{
    ...

    public Task<Cursor<Change<ThreadStats>>> GetThreadStatsChangefeedAsync(CancellationToken cancellationToken)
    {
        return _rethinkDbSingleton.Db(DATABASE_NAME).Table(THREAD_STATS_TABLE_NAME)
            .Changes().RunChangesAsync<ThreadStats>(_rethinkDbConnection, cancellationToken);
    }
}

With all the pieces in place the application can start sending events. Again IHostedService comes with help here by providing a nice way of utilizing the changefeed in background.

internal class ThreadStatsChangefeedService : IHostedService
{
    private readonly IRethinkDbService _rethinkDbService;
    private readonly IServerSentEventsService _serverSentEventsService;

    ...

    private async Task ExposeThreadStatsChangefeedAsync(CancellationToken cancellationToken)
    {
        var threadStatsChangefeed = await _rethinkDbService
            .GetThreadStatsChangefeedAsync(cancellationToken);

        while (!cancellationToken.IsCancellationRequested
               && (await threadStatsChangefeed.MoveNextAsync(cancellationToken)))
        {
            string newThreadStats = threadStatsChangefeed.Current.NewValue.ToString();
            await _serverSentEventsService.SendEventAsync(newThreadStats);
        }
    }
}

This is it. All that is left is registering the service and adding some UI. The complete demo can be found here.

WebSockets fallback

The sad truth is that not all major browsers support Server-Sent Events. It might not always be needed (in the context mentioned at the beginning the target clients were known to use browsers with support), but it might be nice to add a fallback for that 11% (at the moment of writing this). Such fallback can be implemented based on WebSockets. I've been writing about using WebSockets in ASP.NET Core several times before and I have a demo project up on GitHub, so there is no need to repeat anything of that here. For completeness the demo for this post contains the fallback, which is a simple WebSockets implementation capable only of broadcasting text messages.

Why I didn't use SignalR

This question will most likely come up. The honest answer is that I didn't need it. The Server-Sent Events fit the use case perfectly and the implementation was even shorter than it would be with SignalR (it's no longer true with WebSockets fallback, but even in that case the code is still pretty simple). Also the usage of SignalR wouldn't be typical here. It would either have to handle the changefeed in Hub or attempt to access clients in IHostedService. In my opinion this approach is cleaner.