Recently I've been looking into using some "special tasks" databases with ASP.NET Core. One of them was RethinkDB. In general RethinkDB is a NoSQL database, which stores schemaless JSON documents. What's special about it, is its real-time capabilities - changefeeds. Changefeeds provide an option of continuously pushing updated query results to applications in real-time. The data can come to RethinkDB from various sources and be perceived as stream of events. I wanted to expose that stream through the application to the clients.

Setting up RethinkDB connection

There is a .NET Standard 2.0 driver for RethinkDB available as open source project (free to use if SSL/TLS encryption is not required). For interaction with the driver a globally provided singleton RethinkDb.Driver.RethinkDB.R should be used. This singleton together with open connection allows for using the database (the driver supports connections pooling with couple possible strategies, but I'm not going to use it here). Establishing connection requires hostname or IP and optionally port and timeout.

internal class RethinkDbOptions
{
    public string HostnameOrIp { get; set; }

    public int? DriverPort { get; set; }

    public int? Timeout { get; set; }
}

The connection object is expensive to create and it is advised to you use only one in the application. To achieve that the connection object can be encapsulated (together with the global singleton) in dedicated service.

internal class RethinkDbSingletonProvider : IRethinkDbSingletonProvider
{
    public RethinkDb.Driver.RethinkDB RethinkDbSingleton { get; }

    public RethinkDb.Driver.Net.Connection RethinkDbConnection { get; }

    public RethinkDbSingletonProvider(IOptions<RethinkDbOptions> options)
    {
        if (options == null)
        {
            throw new ArgumentNullException(nameof(options));
        }

        if (String.IsNullOrWhiteSpace(options.Value.HostnameOrIp))
        {
            throw new ArgumentNullException(nameof(RethinkDbOptions.HostnameOrIp));
        }

        var rethinkDbSingleton = RethinkDb.Driver.RethinkDB.R;

        var rethinkDbConnection = rethinkDbSingleton.Connection()
            .Hostname(options.Value.HostnameOrIp);

        if (options.Value.DriverPort.HasValue)
        {
            rethinkDbConnection.Port(options.Value.DriverPort.Value);
        }

        if (options.Value.Timeout.HasValue)
        {
            rethinkDbConnection.Timeout(options.Value.Timeout.Value);
        }

        RethinkDbConnection = rethinkDbConnection.Connect();

        RethinkDbSingleton = rethinkDbSingleton;
    }
}

To ensure a single instance the service should be registered as singleton.

internal static class ServiceCollectionExtensions
{
    public static IServiceCollection AddRethinkDb(this IServiceCollection services,
        Action<RethinkDbOptions> configureOptions)
    {
        if (services == null)
        {
            throw new ArgumentNullException(nameof(services));
        }
        if (configureOptions == null)
        {
            throw new ArgumentNullException(nameof(configureOptions));
        }

        services.Configure(configureOptions);
        services.TryAddSingleton<IRethinkDbSingletonProvider, RethinkDbSingletonProvider>();
        services.TryAddTransient<IRethinkDbService, RethinkDbService>();

        return services;
    }
}

The second service (RethinkDbService) will handle the RethinkDB specific logic.

Creating a data source

The changefeeds capability is great for solving specific use cases. But for demo purposes it's better to have something simple for brevity. Good enough example can be an IHostedService for gathering ThreadPool statistics.

internal class ThreadStats
{
    public int WorkerThreads { get; set; }

    public int MinThreads { get; set; }

    public int MaxThreads { get; set; }

    public override string ToString()
    {
        return $"Available: {WorkerThreads}, Active: {MaxThreads - WorkerThreads}, Min: {MinThreads}, Max: {MaxThreads}";
    }
}

internal class ThreadStatsService : IHostedService
{
    private readonly IRethinkDbService _rethinkDbService;

    ...

    private async Task GatherThreadStatsAsync(CancellationToken cancellationToken)
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            ThreadPool.GetAvailableThreads(out var workerThreads, out var _);
            ThreadPool.GetMinThreads(out var minThreads, out var _);
            ThreadPool.GetMaxThreads(out var maxThreads, out var _);

            _rethinkDbService.InsertThreadStats(new ThreadStats
            {
                WorkerThreads = workerThreads,
                MinThreads = minThreads,
                MaxThreads = maxThreads
            });

            await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }
    }
}

If this code looks familiar it is because its based on one of David Fowler demos from NDC London 2018. Adding controller from that demo to the application provides an easy way to use up some threads.

The programming model of RethinkDB driver is a little bit specific. In general one builds the query and at the end runs it providing the connection. This makes inserting the stats look like below.

internal class RethinkDbService: IRethinkDbService
{
    private const string DATABASE_NAME = "Demo_AspNetCore_RethinkDB";
    private const string THREAD_STATS_TABLE_NAME = "ThreadStats";

    private readonly RethinkDb.Driver.RethinkDB _rethinkDbSingleton;
    private readonly Connection _rethinkDbConnection;

    public RethinkDbService(IRethinkDbSingletonProvider rethinkDbSingletonProvider)
    {
        if (rethinkDbSingletonProvider == null)
        {
            throw new ArgumentNullException(nameof(rethinkDbSingletonProvider));
        }

        _rethinkDbSingleton = rethinkDbSingletonProvider.RethinkDbSingleton;
        _rethinkDbConnection = rethinkDbSingletonProvider.RethinkDbConnection;
    }

    public void InsertThreadStats(ThreadStats threadStats)
    {
        _rethinkDbSingleton.Db(DATABASE_NAME).Table(THREAD_STATS_TABLE_NAME)
            .Insert(threadStats).Run(_rethinkDbConnection);
    }
}

With the data source in place, the changefeed can be exposed.

Exposing changefeed with Server-Sent Events

Server-Sent Events feels like natural choice here. This is exactly the scenario this technology has been designed for. Also with ready to use library the implementation should be straightforward. First the required service and middleware needs to be registered.

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddRethinkDb(options =>
        {
            options.HostnameOrIp = "127.0.0.1";
        });

        services.AddServerSentEvents();

        services.AddSingleton<IHostedService, ThreadStatsService>();

        services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);
    }

    public void Configure(IApplicationBuilder app)
    {
        app.MapServerSentEvents("/thread-stats-changefeed");

        app.UseStaticFiles();

        app.UseMvc();

        app.Run(async (context) =>
        {
            await context.Response.WriteAsync("-- Demo.AspNetCore.RethinkDB --");
        });
    }
}

In order to acquire the changefeed from RethinkDB one needs to build the query, call Changes() on it and then run it by calling RunChanges()/RunChangesAsync(). This returns a cursor which can be iterated later.

internal class RethinkDbService: IRethinkDbService
{
    ...

    public Task<Cursor<Change<ThreadStats>>> GetThreadStatsChangefeedAsync(CancellationToken cancellationToken)
    {
        return _rethinkDbSingleton.Db(DATABASE_NAME).Table(THREAD_STATS_TABLE_NAME)
            .Changes().RunChangesAsync<ThreadStats>(_rethinkDbConnection, cancellationToken);
    }
}

With all the pieces in place the application can start sending events. Again IHostedService comes with help here by providing a nice way of utilizing the changefeed in background.

internal class ThreadStatsChangefeedService : IHostedService
{
    private readonly IRethinkDbService _rethinkDbService;
    private readonly IServerSentEventsService _serverSentEventsService;

    ...

    private async Task ExposeThreadStatsChangefeedAsync(CancellationToken cancellationToken)
    {
        var threadStatsChangefeed = await _rethinkDbService
            .GetThreadStatsChangefeedAsync(cancellationToken);

        while (!cancellationToken.IsCancellationRequested
               && (await threadStatsChangefeed.MoveNextAsync(cancellationToken)))
        {
            string newThreadStats = threadStatsChangefeed.Current.NewValue.ToString();
            await _serverSentEventsService.SendEventAsync(newThreadStats);
        }
    }
}

This is it. All that is left is registering the service and adding some UI. The complete demo can be found here.

WebSockets fallback

The sad truth is that not all major browsers support Server-Sent Events. It might not always be needed (in the context mentioned at the beginning the target clients were known to use browsers with support), but it might be nice to add a fallback for that 11% (at the moment of writing this). Such fallback can be implemented based on WebSockets. I've been writing about using WebSockets in ASP.NET Core several times before and I have a demo project up on GitHub, so there is no need to repeat anything of that here. For completeness the demo for this post contains the fallback, which is a simple WebSockets implementation capable only of broadcasting text messages.

Why I didn't use SignalR

This question will most likely come up. The honest answer is that I didn't need it. The Server-Sent Events fit the use case perfectly and the implementation was even shorter than it would be with SignalR (it's no longer true with WebSockets fallback, but even in that case the code is still pretty simple). Also the usage of SignalR wouldn't be typical here. It would either have to handle the changefeed in Hub or attempt to access clients in IHostedService. In my opinion this approach is cleaner.

I would say that the best way for hosting static files is CDN. But sometimes there are valid reasons not to use CDN and serve static files directly from the application. In such scenario it's important to think about performance. I'm not going to write about caching and cache busting, for those subjects I suggest this post by Andrew Lock. What I want to focus on is eliminating unnecessary request bytes and processing on the server side.

Let's imagine an ASP.NET Core MVC application which uses cookie based authentication. The Startup may look more or less like below.

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        ...

        services.AddAuthentication(CookieAuthenticationDefaults.AuthenticationScheme)
            .AddCookie();

        services.AddMvc();
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        ...

        app.UseStaticFiles();

        app.UseAuthentication();

        app.UseMvc();
    }
}

After login this configuration will result in about 600 bytes of state coming from cookies (it can easily be more). Those are data which are send to the server with every request and server must parse them and store in memory. In case of publicly available static files this is waste of resources. The web development knows a generic solution for this problem - using a cookie-free origin.

Setting up a subdomain to act as a cookie-free origin doesn't require anything ASP.NET Core specific. All that is needed is subdomain configured to the same IP address as the main domain of the application, for example example.com and static.example.com (in development you can set this up by editing hosts file). In the result the cookies set by example.com will not be send to static.example.com. But just configuring the subdomain is not enough. If left like that it would expose the whole application under static.example.com. Certainly this is not desired, we want to branch the pipeline.

Subdomain based pipeline branching

ASP.NET Core provides couple of ways to branch the pipeline. The domain information is available to the application through Host header, so the way which allows for examining it is predicate based MapWhen() method. A simple extension could look like this.

public static class MapSubdomainExtensions
{
    public static IApplicationBuilder MapSubdomain(this IApplicationBuilder app,
        string subdomain, Action<IApplicationBuilder> configuration)
    {
        // Parameters validation removed for brevity.
        ...

        return app.MapWhen(GetSubdomainPredicate(subdomain), configuration);
    }

    private static Func<HttpContext, bool> GetSubdomainPredicate(string subdomain)
    {
        return (HttpContext context) =>
        {
            string hostSubdomain = context.Request.Host.Host.Split('.')[0];

            return (subdomain == hostSubdomain);
        };
    }
}

This extensions compares the provided subdomain with whatever is in the Host header before first dot. This can be considered a simplification, but an acceptable one. With the extension we can separate the pipeline for static files from the rest of application.

public class Startup
{
    ...

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        ...

        app.MapSubdomain("static", ConfigureStaticSubdomain);

        app.UseAuthentication();

        app.UseMvc();
    }

    private static void ConfigureStaticSubdomain(IApplicationBuilder app)
    {
        app.UseStaticFiles();
    }
}

Running the application now would result in static files not loading. The reason for that is simple, the generated content URLs are incorrect. It would be nice to modify a single place to handle correct URL generation.

Prefixing content URLs

As far as I know all static files URLs (unless entered manually) are generated by calling IUrlHelper.Content() method. Luckily ASP.NET Core makes it relatively easy to replace default implementation of IUrlHelper. First thing which is needed is custom implementation. It can be derived from default UrlHelper.

public class ContentSubdomainUrlHelper : UrlHelper
{
    private readonly string _contentSubdomain;

    public ContentSubdomainUrlHelper(ActionContext actionContext, string contentSubdomain)
        : base(actionContext)
    {
        if (String.IsNullOrWhiteSpace(contentSubdomain))
        {
            throw new ArgumentNullException(nameof(contentSubdomain));
        }

        _contentSubdomain = contentSubdomain;
    }

    public override string Content(string contentPath)
    {
        if (String.IsNullOrEmpty(contentPath))
        {
            return null;
        }

        if (contentPath[0] == '~')
        {
            PathString hostPathString = new PathString("//" + _contentSubdomain + "."
                + HttpContext.Request.Host);
            PathString applicationPathString = HttpContext.Request.PathBase;
            PathString contentPathString = new PathString(contentPath.Substring(1));

            return HttpContext.Request.Scheme + ":"
                + hostPathString.Add(applicationPathString).Add(contentPathString).Value;
        }

        return contentPath;
    }
}

Once again it makes an important assumption - that it's enough to prefix current request host with the subdomain in order to create correct URL.

The second thing is IUrlHelperFactory implementation. This service is responsible for creating IUrlHelper implementation instances. It has a single method. One important thing is to keep the same caching capability as in default implementation, otherwise it can cause a performance regression.

public class ContentSubdomainUrlHelperFactory : IUrlHelperFactory
{
    private string _contentSubdomain;

    public ContentSubdomainUrlHelperFactory(string contentSubdomain)
    {
        if (String.IsNullOrWhiteSpace(contentSubdomain))
        {
            throw new ArgumentNullException(nameof(contentSubdomain));
        }

        _contentSubdomain = contentSubdomain;
    }

    public IUrlHelper GetUrlHelper(ActionContext context)
    {
        // Parameters validation removed for brevity.
        ...

        object value;
        if (context.HttpContext.Items.TryGetValue(typeof(IUrlHelper), out value)
            && value is IUrlHelper)
        {
            return (IUrlHelper)value;
        }

        IUrlHelper urlHelper = new ContentSubdomainUrlHelper(context, _contentSubdomain);
        context.HttpContext.Items[typeof(IUrlHelper)] = urlHelper;

        return urlHelper;
    }
}

Replacing the default implementation should be done after calling AddMvc().

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        ...

        services.AddMvc();

        services.Replace(new ServiceDescriptor(typeof(IUrlHelperFactory),
            new ContentSubdomainUrlHelperFactory("static")));
    }

    ...
}

Running the application now would result in it almost working...

Enabling CORS for static files

The not working part are dynamically requested resources (for example fonts in case of Bootstrap or lazy loaded scripts). Introduction of dedicated subdomain has changed those requests into cross-origin ones. This can be fixed by adding CORS middleware to the branch.

public class Startup
{
    ...

    private static void ConfigureStaticSubdomain(IApplicationBuilder app)
    {
        app.UseCors(builder =>
        {
            builder.WithOrigins("https://example.com");
        });

        app.UseStaticFiles();
    }
}

Now everything works as it should.

As mentioned, the above implementation is simplified based on few assumptions. In most cases those should be true, but if not it shouldn't be hard to modify the code to handle more complex ones.

Sometimes there is a need for a web application to acquire client originating IP address (location dependent content, audit requirements etc.). ASP.NET Core provides HttpContext.Connection.RemoteIpAddress property which provides originating IP address for the connection. In todays web the origin of connection seen by the web server is rarely the client, more likely the last proxy on the path. So, in order to attempt acquiring the client originating IP address developer needs to work a little bit more harder than using a property.

The most common way of preserving the client IP address is the X-Forwarded-For header (it's even more popular than standardized Forwarded header). It should contain a comma separated list of addresses which represent the request path through the network. The first entry on that list should be the client address. The X-Forwarded-For header in ASP.NET Core is supported out of the box by Forwarded Headers Middleware, it just needs to be added to the pipeline.

Unfortunately, the X-Forwarded-For header is often not implemented correctly by proxies (for example they might override the value instead of appending to it). In this context (and fact that Forwarded headers is not picking up as quickly as it should) the global reverse proxy provides like Akamai or CloudFlare have done what typically happens in software for such cases - they have introduced they own headers. Akamai is using True-Client-IP (it's not unique to Akamai to be precise, there are some others who are using it by Akamai is the biggest) and CloudFlare is using CF_CONNECTING_IP.

So, how all of those can be handled within an application if needed? Luckily the Forwarded Headers Middleware can be registered multiple times.

Stacking Forwarded Headers Middleware

The Forwarded Headers Middleware supports multiple headers with different responsibilities. The options allow for providing different names for those headers and choosing which one should be processed. Assuming that part of the stack will be Forwarded Headers Middleware configured to handle its default headers, two more needs to be added. For each one of them an extension method can be created to encapsulate the configuration.

For Akamai the ForwardedForHeaderName property should be set to True-Client-IP and processing limited to XForwardedFor.

public static class ForwardedHeadersExtensions
{
    public static IApplicationBuilder UseAkamaiTrueClientIp(this IApplicationBuilder app)
    {
        if (app == null)
        {
            throw new ArgumentNullException(nameof(app));
        }

        return app.UseForwardedHeaders(new ForwardedHeadersOptions
        {
            ForwardedForHeaderName = "True-Client-IP",
            ForwardedHeaders = ForwardedHeaders.XForwardedFor
        });
    }
}

The only difference in case of CloudFlare configuration is ForwardedForHeaderName property value.

public static class ForwardedHeadersExtensions
{
    ...

    public static IApplicationBuilder UseCloudFlareConnectingIp(this IApplicationBuilder app)
    {
        if (app == null)
        {
            throw new ArgumentNullException(nameof(app));
        }

        return app.UseForwardedHeaders(new ForwardedHeadersOptions
        {
            ForwardedForHeaderName = "CF_CONNECTING_IP",
            ForwardedHeaders = ForwardedHeaders.XForwardedFor
        });
    }
}

This allows to easily stack the middleware.

public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
    app.UseForwardedHeaders(new ForwardedHeadersOptions
    {
        ForwardedHeaders = ForwardedHeaders.All
    });
    app.UseAkamaiTrueClientIp();
    app.UseCloudFlareConnectingIp();

    ...
}

This works, but you might want something more. One of the issues with forwarded headers is trust - they can be easily spoofed. In case of global providers there are options to protect against that. For simplicity I will focus on CloudFlare from this point.

Preventing CloudFlare Connecting IP spoofing

CloudFlare makes its IP ranges available here. This list can be used to validate if incoming request is allowed to carry CF_CONNECTING_IP header as CloudFlare should be the last intermediary in front of the server. To perform that validation one can start with wrapping the ForwardedHeadersMiddleware.

public class CloudFlareConnectingIpMiddleware
{
    private readonly RequestDelegate _next;
    private readonly ForwardedHeadersMiddleware _forwardedHeadersMiddleware;

    public CloudFlareConnectingIpMiddleware(RequestDelegate next, ILoggerFactory loggerFactory)
    {
        _next = next ?? throw new ArgumentNullException(nameof(next));

        _forwardedHeadersMiddleware = new ForwardedHeadersMiddleware(next, loggerFactory,
            Options.Create(new ForwardedHeadersOptions
        {
            ForwardedForHeaderName = "CF_CONNECTING_IP",
            ForwardedHeaders = ForwardedHeaders.XForwardedFor
        }));
    }

    public Task Invoke(HttpContext context)
    {
        return _forwardedHeadersMiddleware.Invoke(context);
    }
}

To validate the request originating IP address the list provided by CloudFlare must be parsed to some usable form. There are existing parsers available, for example IPAddressRange.

public class CloudFlareConnectingIpMiddleware
{
    private static readonly IPAddressRange[] _cloudFlareIpAddressRanges = new IPAddressRange[]
    {
        IPAddressRange.Parse("103.21.244.0/22"),
        ...
        IPAddressRange.Parse("2a06:98c0::/29")
    };

    ...

    private bool IsCloudFlareIp(IPAddress ipadress)
    {
        bool isCloudFlareIp = false;

        for (int i = 0; i < _cloudFlareIpAddressRanges.Length; i++)
        {
            isCloudFlareIp = _cloudFlareIpAddressRanges[i].Contains(ipadress);
            if (isCloudFlareIp)
            {
                break;
            }
        }

        return isCloudFlareIp;
    }
}

With the ability to check if request is incoming from CloudFlare, the ForwardedHeadersMiddleware can be called only when needed.

public class CloudFlareConnectingIpMiddleware
{
    ...

    public Task Invoke(HttpContext context)
    {
        if (context.Request.Headers.ContainsKey("CF_CONNECTING_IP")
            && IsCloudFlareIp(context.Connection.RemoteIpAddress))
        {
            return _forwardedHeadersMiddleware.Invoke(context);
        }

        return _next(context);
    }

    ...
}

Complete code can be found here.

There is one drawback to this approach. What if IP ranges change? This can be handled as well. CloudFlare provides two endpoints which return text lists - one for IPv4 and one for IPv6. Those two endpoints can be used by IHostedService based background task to update the ranges on startup or periodically. I'll leave this as additional exercise.

I have a demo project on GitHub which accompanies my blog series about Web Push based push notifications in ASP.NET Core. There is one thing in that project which I wanted to "fix" for some time. That thing is requesting delivery of notifications, which is being done inside an action.

public class PushNotificationsApiController : Controller
{
    ...

    [HttpPost("notifications")]
    public async Task<IActionResult> SendNotification([FromBody]PushMessageViewModel message)
    {
        PushMessage pushMessage = new PushMessage(message.Notification)
        {
            Topic = message.Topic,
            Urgency = message.Urgency
        };

        await _subscriptionStore.ForEachSubscriptionAsync((PushSubscription subscription) =>
        {
            _notificationService.SendNotificationAsync(subscription, pushMessage);
        });

        return NoContent();
    }
}

If you have read post about requesting delivery you know it's an expensive operation. Taking into consideration possible high number of subscription this is something which shouldn't be done in context of request. It would be much better to queue it in the background, independent of any request. Back in ASP.NET days this could be done with QueueBackgroundWorkItem method, but it's not available in ASP.NET Core (at least not yet). However, there is a prototype implementation based on IHostedService which can be used as it is or adjusted to specific case. I've decided to go the second path. First step on that path is the queue itself.

Creating the queue

The queue interface should be simple. Only two operations are needed: enqueue and dequeue. The dequeue should be returning Task so the dequeuer can wait for new items. It also should accept a CancellationToken so the dequeuer can be stopped while it's waiting on dequeue.

internal interface IPushNotificationsQueue
{
    void Enqueue(PushMessage message);

    Task<PushMessage> DequeueAsync(CancellationToken cancellationToken);
}

The implementation is based on ConcurrentQueue and SemaphoreSlim. That SemaphoreSlim is where the magic happens. The DequeueAsync should be waiting on that semaphore. When a new message is enqueued the semaphore should be released, which allow the DequeueAsync to continue. If the semaphore will be raised more than once, the next call to DequeueAsync will not wait, just decrement the internal count of the semaphore until it's back at 0 again.

internal class PushNotificationsQueue : IPushNotificationsQueue
{
    private readonly ConcurrentQueue<PushMessage> _messages = new ConcurrentQueue<PushMessage>();
    private readonly SemaphoreSlim _messageEnqueuedSignal = new SemaphoreSlim(0);

    public void Enqueue(PushMessage message)
    {
        if (message == null)
        {
            throw new ArgumentNullException(nameof(message));
        }

        _messages.Enqueue(message);

        _messageEnqueuedSignal.Release();
    }

    public async Task<PushMessage> DequeueAsync(CancellationToken cancellationToken)
    {
        await _messageEnqueuedSignal.WaitAsync(cancellationToken);

        _messages.TryDequeue(out PushMessage message);

        return message;
    }
}

Having the queue, next step is implementing the dequeuer.

Implementing the dequeuer

The dequeuer is an implementation of IHostedService. In general it should be waiting on DequeueAsync and perform the same logic as the action does. But there are two important differences from the code in action here. A services scope needs to be created. The reason is IPushSubscriptionStore. By itself it's transient, so it wouldn't cause any issues, but its Sqlite implementation depends on DbContext which is scoped. Furthermore, the whole processing must support cancellation in order for the host to be able to shutdown graceful.

internal class PushNotificationsDequeuer : IHostedService
{
    private readonly IServiceProvider _serviceProvider;
    private readonly IPushNotificationsQueue _messagesQueue;
    private readonly IPushNotificationService _notificationService;
    private readonly CancellationTokenSource _stopTokenSource = new CancellationTokenSource();

    private Task _dequeueMessagesTask;

    public PushNotificationsDequeuer(IServiceProvider serviceProvider,
        IPushNotificationsQueue messagesQueue, IPushNotificationService notificationService)
    {
        _serviceProvider = serviceProvider;
        _messagesQueue = messagesQueue;
        _notificationService = notificationService;
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _dequeueMessagesTask = Task.Run(DequeueMessagesAsync);

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _stopTokenSource.Cancel();

        return Task.WhenAny(_dequeueMessagesTask, Task.Delay(Timeout.Infinite, cancellationToken));
    }

    private async Task DequeueMessagesAsync()
    {
        while (!_stopTokenSource.IsCancellationRequested)
        {
            PushMessage message = await _messagesQueue.DequeueAsync(_stopTokenSource.Token);

            if (!_stopTokenSource.IsCancellationRequested)
            {
                using (IServiceScope serviceScope = _serviceProvider.CreateScope())
                {
                    IPushSubscriptionStore subscriptionStore =
                        serviceScope.ServiceProvider.GetRequiredService<IPushSubscriptionStore>();

                    await subscriptionStore.ForEachSubscriptionAsync(
                        (PushSubscription subscription) =>
                        {
                            _notificationService.SendNotificationAsync(subscription, message,
                                _stopTokenSource.Token);
                        },
                        _stopTokenSource.Token
                    );
                }

            }
        }

    }
}

Now the queue and dequeuer just need to be registered (both as singletons).

public static class ServiceCollectionExtensions
{
    ...

    public static IServiceCollection AddPushNotificationsQueue(this IServiceCollection services)
    {
        services.AddSingleton<IPushNotificationsQueue, PushNotificationsQueue>();
        services.AddSingleton&ktlIHostedService, PushNotificationsDequeuer>();

        return services;
    }
}

Queueing requesting delivery

With queue and dequeuer available the action can be changed to pass the message to the background.

public class PushNotificationsApiController : Controller
{
    ...

    [HttpPost("notifications")]
    public IActionResult SendNotification([FromBody]PushMessageViewModel message)
    {
        _pushNotificationsQueue.Enqueue(new PushMessage(message.Notification)
        {
            Topic = message.Topic,
            Urgency = message.Urgency
        });

        return NoContent();
    }
}

It is important to note, that the dequeuer is sequential. If one would want to parallelize there are two ways. One way is to use the dequeuer implementation as a base and register multiple delivered dequeuers. The other way is to introduce parallelization inside the dequeuer. In this approach a single instance would manage multiple reading threads. It's also easy to achieve, just a proper synchronization inside StopAsync method is needed. I prefer the second approach as the first is rather ugly.

ASP.NET Core comes with out-of-the-box support for server side response caching. It's easy to use and (when configured properly) can give you nice performance boost. But it also has some shortcomings.

Under the hood it utilizes in-memory caching which means that the cache has low latency at price of increased memory usage. In high load scenarios this can lead to memory pressure and memory pressure can lead to entries being evicted prior to its expiration. This also means that cache is not durable. If the process goes down for any reason the cache needs to be repopulated. Last but not least, it provides no support for load balancing scenarios - every node has to keep its own full cache.

Load balancing with in-memory response cache

None of those limitations may be a problem for you, but if it is, you might want to trade some cache latency to solve them. One of approaches can be using a distributed cache like Redis. This way the nodes are no longer responsible for holding the cache, the memory usage is lower and when an instance recycles it doesn't have to warm up again.

Load balancing with Redis backed response cache

Implementing Redis backed IResponseCache

The heart of server side response caching in ASP.NET Core is ResponseCachingMiddleware. It orchestrates the entire process and makes other components like IResponseCachingPolicyProvider, IResponseCachingKeyProvider and IResponseCache talk to each other. The component, which needs to be implemented in order to switch caching from in-memory to Redis, is IResponseCache as it represent the storage for entries. It needs to be able to set or get IResponseCacheEntry by a string key. The IResponseCacheEntry doesn't make any assumptions about the shape of an entry (it's an empty interface) so the only thing that can be done with an instance of it is to blindly attempt binary serialization. That might not be a good idea, so it might be better to focus on its implementations: CachedResponse and CachedVaryByRules. They can be stored in Redis by using Hashes. I'm going to focus only on CachedResponse as CachedVaryByRules is simpler and can be done by replicating same approach.

An instance of CachedResponse can't be represented by single hash because it contains headers collection. What can be done is represent it by two separated hashes, which share a key pattern. First, some helper methods which will take care of conversion (I will be using StackExchange.Redis).

internal class RedisResponseCache : IResponseCache
{
    ...

    private HashEntry[] CachedResponseToHashEntryArray(CachedResponse cachedResponse)
    {
        MemoryStream bodyStream = new MemoryStream();
        cachedResponse.Body.CopyTo(bodyStream);

        return new HashEntry[]
        {
            new HashEntry("Type", nameof(CachedResponse)),
            new HashEntry(nameof(cachedResponse.Created), cachedResponse.Created.ToUnixTimeMilliseconds()),
            new HashEntry(nameof(cachedResponse.StatusCode), cachedResponse.StatusCode),
            new HashEntry(nameof(cachedResponse.Body), bodyStream.ToArray())
        };
    }

    private HashEntry[] HeaderDictionaryToHashEntryArray(IHeaderDictionary headerDictionary)
    {
        HashEntry[] headersHashEntries = new HashEntry[headerDictionary.Count];

        int headersHashEntriesIndex = 0;
        foreach (KeyValuePair<string, StringValues> header in headerDictionary)
        {
            headersHashEntries[headersHashEntriesIndex++] = new HashEntry(header.Key, (string)header.Value);
        }

        return headersHashEntries;
    }
}

With the conversion in place the entry in cache can be set (I will show only async version). It is important to set expiration for both hashes.

internal class RedisResponseCache : IResponseCache
{
    private ConnectionMultiplexer _redis;

    public RedisResponseCache(string redisConnectionMultiplexerConfiguration)
    {
        if (String.IsNullOrWhiteSpace(redisConnectionMultiplexerConfiguration))
        {
            throw new ArgumentNullException(nameof(redisConnectionMultiplexerConfiguration));
        }

        _redis = ConnectionMultiplexer.Connect(redisConnectionMultiplexerConfiguration);
    }

    ...

    public async Task SetAsync(string key, IResponseCacheEntry entry, TimeSpan validFor)
    {
        if (entry is CachedResponse cachedResponse)
        {
            string headersKey = key + "_Headers";

            IDatabase redisDatabase = _redis.GetDatabase();

            await redisDatabase.HashSetAsync(key, CachedResponseToHashEntryArray(cachedResponse));
            await redisDatabase.HashSetAsync(headersKey, HeaderDictionaryToHashEntryArray(cachedResponse.Headers));

            await redisDatabase.KeyExpireAsync(headersKey, validFor);
            await redisDatabase.KeyExpireAsync(key, validFor);
        }
        else if (entry is CachedVaryByRules cachedVaryByRules)
        {
            ...
        }
    }

    ...
}

Getting entry from cache is similar. An opposite conversion methods are needed.

internal class RedisResponseCache : IResponseCache
{
    ...

    private CachedResponse CachedResponseFromHashEntryArray(HashEntry[] hashEntries)
    {
        CachedResponse cachedResponse = new CachedResponse();

        foreach (HashEntry hashEntry in hashEntries)
        {
            switch (hashEntry.Name)
            {
                case nameof(cachedResponse.Created):
                    cachedResponse.Created = DateTimeOffset.FromUnixTimeMilliseconds((long)hashEntry.Value);
                    break;
                case nameof(cachedResponse.StatusCode):
                    cachedResponse.StatusCode = (int)hashEntry.Value;
                    break;
                case nameof(cachedResponse.Body):
                    cachedResponse.Body = new MemoryStream(hashEntry.Value);
                    break;
            }
        }

        return cachedResponse;
    }

    private IHeaderDictionary HeaderDictionaryFromHashEntryArray(HashEntry[] headersHashEntries)
    {
        IHeaderDictionary headerDictionary = new HeaderDictionary();

        foreach (HashEntry headersHashEntry in headersHashEntries)
        {
            headerDictionary.Add(headersHashEntry.Name, (string)headersHashEntry.Value);
        }

        return headerDictionary;
    }
}

So the hashes can be retrieved and entry recreated (only async version again).

internal class RedisResponseCache : IResponseCache
{
    ...

    public async Task<IResponseCacheEntry> GetAsync(string key)
    {
        IResponseCacheEntry responseCacheEntry = null;

        IDatabase redisDatabase = _redis.GetDatabase();

        HashEntry[] hashEntries = await redisDatabase.HashGetAllAsync(key);

        string type = hashEntries.First(e => e.Name == "Type").Value;
        if (type == nameof(CachedResponse))
        {
            HashEntry[] headersHashEntries = await redisDatabase.HashGetAllAsync(key + "_Headers");

            if ((headersHashEntries != null) && (headersHashEntries.Length > 0)
                && (hashEntries != null) && (hashEntries.Length > 0))
            {
                CachedResponse cachedResponse = CachedResponseFromHashEntryArray(hashEntries);
                cachedResponse.Headers = HeaderDictionaryFromHashEntryArray(headersHashEntries);

                responseCacheEntry = cachedResponse;
            }
        }
        else if (type == nameof(CachedVaryByRules))
        {
            ...
        }

        return responseCacheEntry;
    }

    ...
}

At this point adding sync versions and code for CachedVaryByRules shouldn't be hard.

Having the implementation is step one, step two is using it.

Using custom IResponseCache

Back in ASP.NET Core 1.1 ResponseCachingMiddleware had a constructor which allowed for providing your own implementations of IResponseCache. This constructor is gone in 2.0 in order to guarantee a limit on memory usage by making ResponseCachingMiddleware use its own private instance of cache. The IResponseCache implementation can still be replaced by (please don't throw rocks at me) using reflection. Yes, reflection is not a perfect solution. It results in less readable and harder to maintain code. But here a very little of it is needed, just enough to gain access to a single field _cache. A custom middleware can be delivered from ResponseCachingMiddleware and expose this field through property.

internal class RedisResponseCachingMiddleware : ResponseCachingMiddleware
{
    private RedisResponseCache Cache
    {
        set
        {
            FieldInfo cacheFieldInfo = typeof(ResponseCachingMiddleware)
                .GetField("_cache", BindingFlags.NonPublic | BindingFlags.Instance);

            cacheFieldInfo.SetValue(this, value);
        }
    }

    public RedisResponseCachingMiddleware(RequestDelegate next, IOptions<RedisResponseCachingOptions> options,
        ILoggerFactory loggerFactory, IResponseCachingPolicyProvider policyProvider,
        IResponseCachingKeyProvider keyProvider)
        : base(next, options, loggerFactory, policyProvider, keyProvider)
    {
        Cache = new RedisResponseCache(options.Value.RedisConnectionMultiplexerConfiguration);
    }
}

The RedisResponseCachingOptions extends ResponseCachingOptions by adding option needed to establish connection to Redis. Now this all can be put together by providing the options and registering custom middleware instead of calling UseResponseCaching method.

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddResponseCaching();
        services.Configure((options) =>
        {
            ...
            options.RedisConnectionMultiplexerConfiguration = "localhost";
        });
    }

    public void Configure(IApplicationBuilder app)
    {
        ...

        app.UseMiddleware<RedisResponseCachingMiddleware>();

        ...
    }
}

This is a fully working solution. Redis is used just as an example, you can use a distribute cache of your choosing or even a database if you wish (although I wouldn't recommend that). It's all about implementing IResponseCache and following the pattern for replacing it.

Older Posts