My Server-Sent Events Middleware seems to be a mine of interesting issues. The latest one was about events being delivered with delay (in general one behind) under specific conditions.

The nature of the issue

Initially there was no hint at what are the conditions required for issue to manifest itself. The demo application was working correctly while the one on which the person who reported the issue was working didn't. Luckily the reporter was extremely helpful in diagnosing the issue and devoted some time to find the difference between his and mine code. The difference between working and not working scenario was presence of Response Compression Middleware which I've added while working on previous issue.

My first thought was that Response Compression Middleware must be writing to the response stream differently then my code (when Response Compression Middleware is present it wraps the original response stream). I've gone through the source code of BodyWrapperStream and found nothing. I went deeper and analyzed DeflateStream also without finding anything specific.

At this point I've decided to change approach and use Fiddler to see what was happening on the wire. To my surprise the first thing I've noticed is that the response was still gziped. That really baffled me so I've double checked that the Response Compression Middleware was removed. It was, so it must have been something external to my application. The only external component I was able to identify was IIS Express, so I quickly changed the launch drop down option to run on Kestrel only. That was it, without IIS in front everything was working as expected which meant that IIS (serving as reverse proxy) was compressing the response on its own which resulted in delayed delivery of events.

Preventing IIS from compressing the response

The first obvious option was changing the IIS configuration. This would certainly work but I would have to put the details into documentation and leave it as a trap for others using the same deployment scenario. I wanted to avoid that so I've started researching for other solution. The general conclusion from materials that I've found was that IIS will compress the response if it doesn't detect the Content-Encoding header. That gave me an idea. One of valid values for Content-Encoding is identity which indicates no compression/modification, it might be enough to prevent IIS from adding compression. I've added the code for setting the header to the middleware.

public class ServerSentEventsMiddleware
{
    ...

    public async Task Invoke(HttpContext context)
    {
        if (context.Request.Headers[Constants.ACCEPT_HTTP_HEADER] == Constants.SSE_CONTENT_TYPE)
        {
            DisableResponseBuffering(context);

            context.Response.Headers.Append("Content-Encoding", "identity");

            ...
        }
        else
        {
            await _next(context);
        }
    }

    ...
}

Running the demo application without Response Compression Middleware and behind IIS has confirmed that the solution was working. Now I had to make sure that I haven't broken anything else.

Maintaining compatibility with Response Compression Middleware

As the middleware is now setting the Content-Encoding it could somehow interfere with Response Compression Middleware. I've re-enabled it and run the test again. Screenshot bellow shows the result.

Chrome Developer Tools Network Tab - Multiple Content-Encoding

The response contains two Content-Encoding headers. The reason for this is that Response Compression Middleware is also blindly using IHeaderDictionary.Append. Unfortunately, the fact that header is present twice confuses the browser. The response is coming compressed but the browser treats it as not compressed. I couldn't change how Response Compression Middleware works so I had to be smarter when setting the header. Simply checking if the header is already present didn't work because Response Compression Middleware sets it upon first attempt to write. I was saved by HttpResponse.OnStarting which allows for interacting with the response just before sending the headers. I've replaced my header setting code with following method.

private void HandleContentEncoding(HttpContext context)
{
    context.Response.OnStarting(() =>
    {
        if (!context.Response.Headers.ContainsKey("Content-Encoding"))
        {
            context.Response.Headers.Append("Content-Encoding", "identity");
        }

        return _completedTask;
    });
}

This fixed the problem with two headers and allowed me to close the issue. The approach is universal and can be used in other scenarios with same requirement.

In last couple weeks I've been playing with ASP.NET Core MVC powered Web API. One of things I wanted to dig deeper into is support for HEAD method. The specification says that "The HEAD method is identical to GET except that the server MUST NOT return a message-body in the response. The metainformation contained in the HTTP headers in response to a HEAD request SHOULD be identical to the information sent in response to a GET request.". In practice the HEAD method is often being used for performing "exists" requests.

How ASP.NET Core is handling HEAD at the server level

Before looking at higher layers it is worth to understand what is the behavior of underlying server in case of HEAD request. The sample Web API mentioned in the beginning has a following end-middleware for handling cases when none of routes has been hit, it will be perfect for this task.

public class Startup
{
    ...

    public void Configure(IApplicationBuilder app)
    {
        ...

        app.Run(async (context) =>
        {
            context.Response.ContentLength = 34;
            await context.Response.WriteAsync("-- Demo.AspNetCore.Mvc.CosmosDB --");
        });
    }
}

First testing environment will be Kestrel. Response to a GET request (which will be used as baseline) looks like below.

HTTP/1.1 200 OK
Content-Length: 34
Date: Mon, 02 Oct 2017 19:22:38 GMT
Server: Kestrel

-- Demo.AspNetCore.Mvc.CosmosDB --

Switching the method to HEAD (without any changes to the code) results in following.

HTTP/1.1 200 OK
Content-Length: 34
Date: Mon, 02 Oct 2017 19:22:38 GMT
Server: Kestrel

This shows that Kestrel is handling HEAD request quite nicely out of the box. All the headers are there and the write to the response body has been ignored. This is the exact behavior one should expect.

With this positive outcome application can be switched to the second testing environment which will be HTTP.sys server. Here the response to HEAD request is different.

HTTP/1.1 200 OK
Content-Length: 34
Date: Mon, 02 Oct 2017 19:25:43 GMT
Server: Microsoft-HTTPAPI/2.0

-- Demo.AspNetCore.Mvc.CosmosDB --

Unfortunately this is a malformed response as it contains body, which is incorrect from specification perspective and also removes the performance gain which HEAD request offers. This is something that should be addressed, but before that let's take a look at more complex scenario.

Adding ASP.NET Core MVC on top

Knowing how the servers are handling the HEAD method the scenario can be extended by adding MVC to the mix. For this purpose a simple GET action which takes an identifier as parameter can be used. The important part is that the action should return 404 Not Found for identifier which doesn't exist.

[Route("api/[controller]")]
public class CharactersController : Controller
{
    private readonly IMediator _mediator;

    public CharactersController(IMediator mediator)
    {
        _mediator = mediator;
    }

    ...

    [HttpGet("{id}")]
    public async Task<IActionResult> Get(string id)
    {
        Character character = await _mediator.Send(new GetSingleRequest<Character>(id));
        if (character == null)
        {
            return NotFound();
        }

        return new ObjectResult(character);
    }

    ...
}

In context of previous discoveries testing environments can be limited to Kestrel only. Making a GET request with valid identifier results in response with JSON body.

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Mon, 02 Oct 2017 19:40:25 GMT
Server: Kestrel
Transfer-Encoding: chunked

{"id":"1ba6271109d445c8972542985b2d3e96","createdDate":"2017-09-24T21:08:50.9990689Z","lastUpdatedDate":"2017-09-24T21:08:50.9990693Z","name":"Leia Organa","gender":"Female","height":150,"weight":49,"birthYear":"19BBY","skinColor":"Light","hairColor":"Brown","eyeColor":"Brown"}

Switching to HEAD produces a response which might be a little surprising.

HTTP/1.1 200 OK
Content-Length: 34
Date: Mon, 02 Oct 2017 19:42:10 GMT
Server: Kestrel

The presence of Content-Length and absence of Content-Type suggest this is not the response from the intended endpoint. In fact it looks like a response from the end-middleware. A request with invalid identifier returns exactly same response instead of expected 404. Taking one more look at the code should reveal why this shouldn't be a surprise. The action is decorated with HttpGetAttribute which makes it unreachable by HEAD request, in result application has indeed defaulted to the end-middleware. Adding HttpHeadAttribute should solve the problem.

[Route("api/[controller]")]
public class CharactersController : Controller
{
    ...

    [HttpGet("{id}")]
    [HttpHead("{id}")]
    public async Task<IActionResult> Get(string id)
    {
        ...
    }

    ...
}

After this change both HEAD requests (with valid and invalid identifier) return expected responses.

HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Mon, 02 Oct 2017 19:44:23 GMT
Server: Kestrel
HTTP/1.1 404 Not Found
Date: Mon, 02 Oct 2017 19:48:07 GMT
Server: Kestrel

This means that an action needs to be decorated with two attributes. Separating between GET and HEAD makes perfect sense when it's possible to optimize the HEAD request handling on server side but for simple scenario like this one it seems unnecessary. One possible improvement is custom HttpMethodAttribute which would allow both methods.

public class HttpGetOrHeadAttribute : HttpMethodAttribute
{
    private static readonly IEnumerable<string> _supportedMethods = new[] { "GET", "HEAD" };

    public HttpGetOrHeadAttribute()
        : base(_supportedMethods)
    { }

    public HttpGetOrHeadAttribute(string template)
        : base(_supportedMethods, template)
    {
        if (template == null)
        {
            throw new ArgumentNullException(nameof(template));
        }
    }
}

Still anybody who will be working on the project in future will have to know that a custom attribute must be used. It might be preferred to have a solution which can be applied once, especially keeping in mind that there is also HTTP.Sys issue to be solved.

Solving the problems in one place

In context of ASP.NET Core "one place" typically ends up being some kind of middleware. In this case a middleware could be used to perform an old trick of switching incoming HEAD request to GET. The switch should be only temporary, otherwise the Kestrel integrity checks might fail due to Content-Length being different from actual number of bytes written. There is also one important thing to remember. After switching the method Kestrel will stop ignoring writes to the body. The easiest solution to this is to change body stream to Stream.Null (this will also fix the problem observed in case of HTTP.Sys server).

public class HeadMethodMiddleware
{
    private readonly RequestDelegate _next;

    public HeadMethodMiddleware(RequestDelegate next)
    {
        _next = next ?? throw new ArgumentNullException(nameof(next));
    }

    public async Task Invoke(HttpContext context)
    {
        bool methodSwitched = false;

        if (HttpMethods.IsHead(context.Request.Method))
        {
            methodSwitched = true;

            context.Request.Method = HttpMethods.Get;
            context.Response.Body = Stream.Null;
        }

        await _next(context);

        if (methodSwitched)
        {
            context.Request.Method = HttpMethods.Head;
        }
    }
}

This middleware should be applied with caution. Some middlewares (for example StaticFiles) can have their own optimized handling of HEAD method. It is also possible that in case of some middlewares switching method can result in undesired side effects.

Few weeks ago I've received a question under my Server-Sent Events middleware for ASP.NET Core repository. I was quite busy at the time so I've only provided a short answer, but I've also promised to myself to describe the problem and solution in details as soon as possible. This post is me fulfilling that promise.

The problem

The question was about using Server-Sent Events in load balancing scenario. Under the hood Sever-Sent Events is using long-lived HTTP connection for delivering the messages. This means that client is connected to specific instance of the application behind the load balancer. It can look like on the diagram below where Client A is connected to Instance 1 and Client B to Instance 2.

Server-Sent Events with Load Balancing Diagram

The problem arises when a message resulting from an operation performed on Instance 1 needs to be broadcasted to all clients (so also Client B).

The solution

In order to solve the problem a communication channel is required, which instances can use to redistribute the messages. One way for achieving such communication channel is publish-subscribe pattern. Typical topology of publish-subscribe pattern implementation introduces a message broker.

Server-Sent Events with Load Balancing and Publish-Subscribe Pattern Diagram

Instead of sending the message directly to clients the application sends it to the broker. Then the broker sends the message to all subscribers (which may include the original sender) and they send it to the clients.

One example of such broker can be Redis with its Pub/Sub functionality.

The implementation

The starting point for the implementation will be demo project from my original post about Server-Sent Events. It has a notification functionality which allows client for sending messages to other clients.

public class NotificationsController : Controller
{
    private INotificationsServerSentEventsService _serverSentEventsService;

    public NotificationsController(INotificationsServerSentEventsService serverSentEventsService)
    {
        _serverSentEventsService = serverSentEventsService;
    }

    ...

    [ActionName("sse-notifications-sender")]
    [AcceptVerbs("POST")]
    public async Task<IActionResult> Sender(NotificationsSenderViewModel viewModel)
    {
        if (!String.IsNullOrEmpty(viewModel.Notification))
        {
            await _serverSentEventsService.SendEventAsync(new ServerSentEvent
            {
                Type = viewModel.Alert ? "alert" : null,
                Data = new List<string>(viewModel.Notification.Split(new string[] { "\r\n", "\n" },
                    StringSplitOptions.None))
            });
        }

        ModelState.Clear();

        return View("Sender", new NotificationsSenderViewModel());
    }
}

The controller is directly interacting with the Server-Sent Events middleware. This is the part which should be abstracted to allow using Redis when desired. A simple service for sending messages can be extracted.

public interface INotificationsService
{
    Task SendNotificationAsync(string notification, bool alert);
}

internal class LocalNotificationsService : INotificationsService
{
    private INotificationsServerSentEventsService _notificationsServerSentEventsService;

    public LocalNotificationsService(INotificationsServerSentEventsService notificationsServerSentEventsService)
    {
        _notificationsServerSentEventsService = notificationsServerSentEventsService;
    }

    public Task SendNotificationAsync(string notification, bool alert)
    {
        return _notificationsServerSentEventsService.SendEventAsync(new ServerSentEvent
        {
            Type = alert ? "alert" : null,
            Data = new List<string>(notification.Split(new string[] { "\r\n", "\n" },
                StringSplitOptions.None))
        });
    }
}

With the service in place the controller can be refactored.

public class NotificationsController : Controller
{
    private INotificationsService _notificationsService;

    public NotificationsController(INotificationsService notificationsService)
    {
        _notificationsService = notificationsService;
    }

    ...

    [ActionName("sse-notifications-sender")]
    [AcceptVerbs("POST")]
    public async Task<IActionResult> Sender(NotificationsSenderViewModel viewModel)
    {
        if (!String.IsNullOrEmpty(viewModel.Notification))
        {
            await _notificationsService.SendNotificationAsync(viewModel.Notification, viewModel.Alert);
        }

        ModelState.Clear();

        return View("Sender", new NotificationsSenderViewModel());
    }
}

Now the Redis based implementation of INotificationsService can be created. I've decided to use StackExchange.Redis which is a very popular Redis client for .NET (it's also being used by ASP.NET Core) with good documentation. The implementation is straightforward, the only challenge is distinguishing regular notifications from alerts. In context of publish-subscribe pattern one of approaches can be filtering based on topics. With this approach the application should use different channels for different types of messages.

internal class RedisNotificationsService : INotificationsService
{
    private const string NOTIFICATIONS_CHANNEL = "NOTIFICATIONS";
    private const string ALERTS_CHANNEL = "ALERTS";

    private ConnectionMultiplexer _redis;
    private INotificationsServerSentEventsService _notificationsServerSentEventsService;

    public RedisNotificationsService(INotificationsServerSentEventsService notificationsServerSentEventsService)
    {
        _redis = ConnectionMultiplexer.Connect("localhost");
        _notificationsServerSentEventsService = notificationsServerSentEventsService;

        ISubscriber subscriber = _redis.GetSubscriber();

        subscriber.Subscribe(NOTIFICATIONS_CHANNEL, async (channel, message) =>
        {
            await SendSseEventAsync(message, false);
        });

        subscriber.Subscribe(ALERTS_CHANNEL, async (channel, message) =>
        {
            await SendSseEventAsync(message, true);
        });
    }

    public Task SendNotificationAsync(string notification, bool alert)
    {
        ISubscriber subscriber = _redis.GetSubscriber();

        return subscriber.PublishAsync(alert ? ALERTS_CHANNEL : NOTIFICATIONS_CHANNEL, notification);
    }

    private Task SendSseEventAsync(string notification, bool alert)
    {
        return _notificationsServerSentEventsService.SendEventAsync(new ServerSentEvent
        {
            Type = alert ? "alert" : null,
            Data = new List<string>(notification.Split(new string[] { "\r\n", "\n" },
                StringSplitOptions.None))
        });
    }
}

The implementation can be further improved by extracting a base class with Server-Sent Events related functionality, but the service is ready to be used (it just needs to be registered).

The demo available on GitHub also provides configuration options for Redis connection and which INotificationsService implementation to use.

This approach can be used in exactly same way if the WebSockets are used instead of Server-Sent Events, or in any other scenario which requires similar communication pattern.

Recently I needed to add support for SSL Acceleration (Offloading) to one of projects I'm working on. In ASP.NET MVC this usually meant custom RequireHttpsAttribute, URL generator and IsHttps method. Whole team needed to be aware that custom components must be used instead of the ones provided by framework, otherwise the things would break. This is no longer case for ASP.NET Core, thanks to low level APIs like request features there is a more elegant way.

SSL Acceleration (Offloading)

SSL Acceleration is a process of using a hardware accelerator for performing SSL decryption and/or decryption. The process usually takes place on a load balancer or firewall, in which case it's called SSL Offloading. There are two flavors off SSL Offloading: SSL Bridging and SSL Termination. SSL Bridging usually doesn't require anything specific from application, but SSL Termination does. In case of SSL Termination the SSL connection doesn't go beyond the SSL Accelerator. The are two main benefits from SSL Termination:

  • Improved performance (the web servers don't have to use resources for SSL processing)
  • Simplified certificate management (the certificates are managed on a single device instead of every web server in cluster)

The drawback is that HTTPS traffic is not reaching the application. In this context the performance benefit can be questioned. The application is no longer able to fully utilize some of HTTP/2 features (for example Server Push) while the resources gain might not be that significant as modern CPUs have good support for encryption/decryption.

Despite the fact that SSL is being terminated, the application still must be able to verify if the original request was made over HTTPS (otherwise it could lower the application security). Typically the SSL Accelerators are providing information about the original protocol through dedicated HTTP header (one quite popular is X-Forwarded-Proto) which application needs to properly interpret.

Making ASP.NET Core understand SSL Acceleration

The "properly interpret" means that application needs to detect the presence of the header and if the value indicates that original request was over HTTPS it should be treated as such. In case of ASP.NET Core the perfect behavior would be for HttpContext.Request.IsHttps to return true. This would automatically make RequireHttpsAttribute and AddRedirectToHttps from URL Rewriting Middleware behave correctly. Also any other code which depends on that property will keep working as expected.

Luckily the value of HttpContext.Request.IsHttps is based on IHttpRequestFeature.Scheme property which value can be changed by application. Assuming that the header name is X-Forwarded-Proto and its value is the original scheme in lower case, following snippet is exactly what is needed.

if (!context.Request.IsHttps)
{
    if (context.Request.Headers.ContainsKey("X-Forwarded-Proto")
        && context.Request.Headers["X-Forwarded-Proto"].Equals("https"))
    {
        IHttpRequestFeature httpRequestFeature = context.Features.Get<IHttpRequestFeature>();
        httpRequestFeature.Scheme = "https";
    }
}

This snippet can be easily wrapped inside a reusable and parametrized middleware like one here.

This scenario is a nice example of how ASP.NET Core is layered and how much power gives the access to the low level building blocks.

In one of projects which I'm involved in we had to configure the limit for concurrent requests being processed. The project is quite specific, processing of a request is quite heavy and requires considerable resources. Because of that, when there was a high number of concurrent requests to process, the server was becoming unresponsive. I went on some research for best practices in this area. The "stress your server and see" seems to be the best advice but number of possible different approaches got me interested. In order to satisfy my curiosity I've decided to implement couple of them myself.

General remark

Limiting concurrent requests is a server responsibility, there shouldn't be a need for application to handle it. This is one of reasons for suggestion from the ASP.NET Team to run Kestrel behind fully featured web server like IIS or nginx (some limits support has come in 2.0). Despite that here I'm implementing a middleware. This is not that bad, registering such middleware at the beginning of the pipeline should prevent heavy processing early enough. Still I'm doing this just for fun, don't do it in production unless you have an unquestionable reason to do so.

Testing scenario

For the testing purposes I've decided to set up couple integration tests using XUnit and Microsoft.AspNetCore.TestHost. The general setup is very well described in documentation. As I intended to spawn multiple requests I wanted to have some rough timings captured for them, so I've prepared following extension method.

internal class HttpResponseMessageWithTiming
{
    internal HttpResponseMessage Response { get; set; }

    internal TimeSpan Timing { get; set; }
}

internal static class HttpClientExtensions
{
    internal static async Task<HttpResponseMessageWithTiming> GetWithTimingAsync(this HttpClient client,
        string requestUri)
    {
        Stopwatch stopwatch = Stopwatch.StartNew();

        HttpResponseMessage response = await client.GetAsync(requestUri);
        TimeSpan timing = stopwatch.Elapsed;

        stopwatch.Stop();

        return new HttpResponseMessageWithTiming
        {
            Response = response,
            Timing = timing
        };
    }
}

This is not perfect (the timings have risk of not being accurate) but should be good enough. I also wanted to have easy access to status code and timing during debugging so I've introduced another intermediate representation.

private struct HttpResponseInformation
{
    public HttpStatusCode StatusCode { get; set; }

    public TimeSpan Timing { get; set; }

    public override string ToString()
    {
        return $"StatusCode: {StatusCode} | Timing {Timing}";
    }
}

I've also created a prepare SUT method for instantiating TestServer.

private TestServer PrepareTestServer(IEnumerable<KeyValuePair<string, string>> configuration = null)
{
    IWebHostBuilder webHostBuilder = new WebHostBuilder()
        .UseStartup<Startup>();

    if (configuration != null)
    {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.AddInMemoryCollection(configuration);
        IConfiguration buildedConfiguration = configurationBuilder.Build();

        webHostBuilder.UseConfiguration(buildedConfiguration);
        webHostBuilder.ConfigureServices((services) =>
        {
            services.Configure<MaxConcurrentRequestsOptions>(options =>
                buildedConfiguration.GetSection("MaxConcurrentRequestsOptions").Bind(options)
            );
        });
    }

    return new TestServer(webHostBuilder);
}

The MaxConcurrentRequestsOptions class (empty at this point) will be used for controlling the behavior of the middleware. The Startup looks like this (to simulate long request processing):

public class Startup
{
    public void Configure(IApplicationBuilder app)
    {
        app.Run(async (context) =>
        {
            await Task.Delay(500);

            await context.Response.WriteAsync("-- Demo.AspNetCore.MaxConcurrentConnections --");
        });
    }
}

With all those elements in place I've created a general method to be reused by all tests.

private HttpResponseInformation[] GetResponseInformation(Dictionary<string, string> configuration,
    int concurrentRequestsCount)
{
    HttpResponseInformation[] responseInformation;

    using (TestServer server = PrepareTestServer(configuration))
    {
        List<HttpClient> clients = new List<HttpClient>();
        for (int i = 0; i < concurrentRequestsCount; i++)
        {
            clients.Add(server.CreateClient());
        }

        List<Task<HttpResponseMessageWithTiming>> responsesWithTimingsTasks =
            new List<Task<HttpResponseMessageWithTiming>>();
        foreach (HttpClient client in clients)
        {
            responsesWithTimingsTasks.Add(Task.Run(async () => {
                return await client.GetWithTimingAsync("/");
            }));
        }
        Task.WaitAll(responsesWithTimingsTasks.ToArray());

        clients.ForEach(client => client.Dispose());

        responseInformation = responsesWithTimingsTasks.Select(task => new HttpResponseInformation
        {
            StatusCode = task.Result.Response.StatusCode,
            Timing = task.Result.Timing
        }).ToArray();
    }

    return responseInformation;
}

This (by providing different configuration and concurrentRequestsCount) will allow me to test different approaches I'm going to play with.

Limiting concurrent requests through middleware

The first and most important thing which middleware needs to support is a hard limit. The functionality is very simple in theory, if the application is processing maximum number of concurrent request every incoming request should immediately result in 503 Service Unavailable.

public class MaxConcurrentRequestsMiddleware
{
    private int _concurrentRequestsCount;

    private readonly RequestDelegate _next;
    private readonly MaxConcurrentRequestsOptions _options;

    public MaxConcurrentRequestsMiddleware(RequestDelegate next,
        IOptions<MaxConcurrentRequestsOptions> options)
    {
        _concurrentRequestsCount = 0;

        _next = next ?? throw new ArgumentNullException(nameof(next));
        _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
    }

    public async Task Invoke(HttpContext context)
    {
        if (CheckLimitExceeded())
        {
            IHttpResponseFeature responseFeature = context.Features.Get<IHttpResponseFeature>();

            responseFeature.StatusCode = StatusCodes.Status503ServiceUnavailable;
            responseFeature.ReasonPhrase = "Concurrent request limit exceeded.";
        }
        else
        {
            await _next(context);

            // TODO: Decrement concurrent requests count
        }
    }

    private bool CheckLimitExceeded()
    {
        bool limitExceeded = false;

        // TODO: Check and increment concurrent requests count

        return limitExceeded;
    }
}

The challenge hides in maintaining the count of concurrent requests. It must be incremented and decremented in a thread safe way while affecting performance as little as possible. The Interlocked class with its atomic operations for shared variables seems perfect for the job. I've decided to use the Interlocked.CompareExchange "do-while pattern" for check and increment which should ensure that value will not be exceeded.

public class MaxConcurrentRequestsMiddleware
{
    ...

    public async Task Invoke(HttpContext context)
    {
        if (CheckLimitExceeded())
        {
            ...
        }
        else
        {
            await _next(context);

            Interlocked.Decrement(ref _concurrentRequestsCount);
        }
    }

    private bool CheckLimitExceeded()
    {
        bool limitExceeded;

        int initialConcurrentRequestsCount, incrementedConcurrentRequestsCount;
        do
        {
            limitExceeded = true;

            initialConcurrentRequestsCount = _concurrentRequestsCount;
            if (initialConcurrentRequestsCount >= _options.Limit)
            {
                break;
            }

            limitExceeded = false;
            incrementedConcurrentRequestsCount = initialConcurrentRequestsCount + 1;
        }
        while (initialConcurrentRequestsCount != Interlocked.CompareExchange(
            ref _concurrentRequestsCount, incrementedConcurrentRequestsCount, initialConcurrentRequestsCount));

        return limitExceeded;
    }
}

After adding the middleware to the pipeline I've set up a test with 30 concurrent requests and 10 as MaxConcurrentRequestsOptions.Limit. The result is best observed through responseInformation temporary value.

Visual Studio Locals Window - Content of responseInformation array for hard limit scenario

There are 10 requests which resulted in 200 OK and 20 requests which resulted in 503 Service Unavailable (red frames) - the desired effect.

Queueing additional requests

The hard limit approach gets the job done, but there are more flexible approaches. In general they are based on assumption that the application can cheaply store more requests in memory than it's currently processing and client can wait for the response a little bit longer. Additional requests can wait in queue (typically a FIFO one) for resources to be available. The queue should have a size limit, otherwise the application might end up processing only the queued requests with constantly growing latency.

Because of the size limit requirement this can't be simply implemented with ConcurrentQueue, a custom (again thread safe) solution is needed. The middleware should also be able to await the queue. On a high level a class looking like the one below should provide the desired interface.

private class MaxConcurrentRequestsEnqueuer
{
    private static readonly Task<bool> _enqueueFailedTask = Task.FromResult(false);
    private readonly int _maxQueueLength;

    public MaxConcurrentRequestsEnqueuer(int maxQueueLength)
    {
        _maxQueueLength = maxQueueLength;
    }

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        // TODO: Check the size and enqueue

        return enqueueTask;
    }

    public bool Dequeue()
    {
        bool dequeued = false;

        // TODO: Dequeue

        return dequeued;
    }
}

The EnqueueAsync returns a Task<bool> so middleware can await and use the Result as indicator if the request should be processed or not.

Internally the class will maintain a Queue<TaskCompletionSource<bool>>.

internal class MaxConcurrentRequestsEnqueuer
{
    ...
    private readonly object _lock = new object();
    private readonly Queue<TaskCompletionSource<bool>> _queue = new Queue<TaskCompletionSource<bool>>();

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    TaskCompletionSource<bool> enqueueTaskCompletionSource =
                        new TaskCompletionSource<bool>();

                    _queue.Enqueue(enqueueTaskCompletionSource);

                    enqueueTask = enqueueTaskCompletionSource.Task;
                }
            }
        }

        return enqueueTask;
    }

    public bool Dequeue()
    {
        bool dequeued = false;

        lock (_lock)
        {
            if (_queue.Count > 0)
            {
                _queue.Dequeue().SetResult(true);

                dequeued = true;
            }
        }

        return dequeued;
    }
}

Yes this is locking, not perfect but the request which ends up here is going to wait anyway so it can be considered as acceptable. Some consolation can be the fact that this class can be nicely used from the middleware.

public class MaxConcurrentRequestsMiddleware
{
    ...
    private readonly MaxConcurrentRequestsEnqueuer _enqueuer;

    public MaxConcurrentRequestsMiddleware(RequestDelegate next,
        IOptions<MaxConcurrentRequestsOptions> options)
    {
        ...

        if (_options.LimitExceededPolicy != MaxConcurrentRequestsLimitExceededPolicy.Drop)
        {
            _enqueuer = new MaxConcurrentRequestsEnqueuer(_options.MaxQueueLength);
        }
    }

    public async Task Invoke(HttpContext context)
    {
        if (CheckLimitExceeded() && !(await TryWaitInQueue()))
        {
            ...
        }
        else
        {
            await _next(context);

            if (ShouldDecrementConcurrentRequestsCount())
            {
                Interlocked.Decrement(ref _concurrentRequestsCount);
            }
        }
    }

    ...

    private async Task<bool> TryWaitInQueue()
    {
        return (_enqueuer != null) && (await _enqueuer.EnqueueAsync());
    }

    private bool ShouldDecrementConcurrentRequestsCount()
    {
        return (_enqueuer == null) || !_enqueuer.Dequeue();
    }
}

Below is the result of another test. The number of concurrent request is again 30 and the max number of concurrent requests is 10. The queue size has been also set to 10. The requests which have waited in queue are the ones in orange.

Visual Studio Locals Window - Content of responseInformation array for hard limit with FIFO queue scenario

This approach is called drop tail and has an alternative in form of drop head. In case of drop head when a new requests arrives (and the queue is full) the first request in the queue is being dropped. This can often result in better latency for waiting requests at price of dropped requests having to wait as well.

The MaxConcurrentRequestsEnqueuer can be easily modified to support drop head.

internal class MaxConcurrentRequestsEnqueuer
{
    public enum DropMode
    {
        Tail = MaxConcurrentRequestsLimitExceededPolicy.FifoQueueDropTail,
        Head = MaxConcurrentRequestsLimitExceededPolicy.FifoQueueDropHead
    }

    ...
    private readonly DropMode _dropMode;

    public MaxConcurrentRequestsEnqueuer(int maxQueueLength, DropMode dropMode)
    {
        ...
        _dropMode = dropMode;
    }

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    enqueueTask = InternalEnqueueAsync();
                }
                else if (_dropMode == DropMode.Head)
                {
                    _queue.Dequeue().SetResult(false);

                    enqueueTask = InternalEnqueueAsync();
                }
            }
        }

        return enqueueTask;
    }

    ...

    private Task<bool> InternalEnqueueAsync()
    {
        TaskCompletionSource<bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        _queue.Enqueue(enqueueTaskCompletionSource);

        return enqueueTaskCompletionSource.Task;
    }
}

My test scenario is not sophisticated enough (all requests are arriving roughly at the same time) to show the difference between drop tail and drop head, but this article has some nice comparison.

Further improvements

There are two more things which would be "nice to have" for the queue. First is support for request aborting. Right now if the request gets cancelled by client it will still wait in queue taking spot which one of incoming requests could take.

In order to implement the support for request aborting the internals of MaxConcurrentRequestsEnqueuer needs to be change in a way which enables removing specific request. To achieve that the Queue<TaskCompletionSource<bool>> can no longer be used, it will be replaced with LinkedList<TaskCompletionSource<bool>>. This allows enqueue and dequeue to remain a O(1) operations and adds possibility of O(n) removal. The O(n) is a pessimistic value, in reality the aborted request will be typically at the beginning of the list so it will be found quickly. Only a couple of changes to MaxConcurrentRequestsEnqueuer is needed.

internal class MaxConcurrentRequestsEnqueuer
{
    ...
    private readonly LinkedList<TaskCompletionSource<bool>> _queue =
        new LinkedList<TaskCompletionSource<bool>>();

    ...

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    enqueueTask = InternalEnqueueAsync();
                }
                else if (_dropMode == DropMode.Head)
                {
                    InternalDequeue(false);

                    enqueueTask = InternalEnqueueAsync();
                }
            }
        }

        return enqueueTask;
    }

    public bool Dequeue()
    {
        bool dequeued = false;

        lock (_lock)
        {
            if (_queue.Count > 0)
            {
                InternalDequeue(true);

                dequeued = true;
            }
        }

        return dequeued;
    }

    private Task<bool> InternalEnqueueAsync()
    {
        TaskCompletionSource<bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        _queue.AddLast(enqueueTaskCompletionSource);

        return enqueueTaskCompletionSource.Task;
    }

    private void InternalDequeue(bool result)
    {
        TaskCompletionSource<bool> enqueueTaskCompletionSource = _queue.First.Value;

        _queue.RemoveFirst();

        enqueueTaskCompletionSource.SetResult(result);
    }
}

Now the support for CancellationToken (which will carry the information about request being aborted) can be added.

internal class MaxConcurrentRequestsEnqueuer
{
    ...

    public Task<bool> EnqueueAsync(CancellationToken cancellationToken)
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    enqueueTask = InternalEnqueueAsync(cancellationToken);
                }
                else if (_dropMode == DropMode.Head)
                {
                    InternalDequeue(false);

                    enqueueTask = InternalEnqueueAsync(cancellationToken);
                }
            }
        }

        return enqueueTask;
    }

    ...

    private Task<bool> InternalEnqueueAsync(CancellationToken cancellationToken)
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        TaskCompletionSource <bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        cancellationToken.Register(CancelEnqueue, enqueueTaskCompletionSource);

        if (!cancellationToken.IsCancellationRequested)
        {
            _queue.AddLast(enqueueTaskCompletionSource);
            enqueueTask = enqueueTaskCompletionSource.Task;
        }

        return enqueueTask;
    }

    private void CancelEnqueue(object state)
    {
        bool removed = false;

        TaskCompletionSource<bool> enqueueTaskCompletionSource = ((TaskCompletionSource<bool>)state);
        lock (_lock)
        {
            removed = _queue.Remove(enqueueTaskCompletionSource);
        }

        if (removed)
        {
            enqueueTaskCompletionSource.SetResult(false);
        }
    }

    ...
}

The middleware can pass HttpContext.RequestAborted as the CancellationToken (it is also a good idea to wrapp the response status setting code in IsCancellationRequested check), which should ensure that all aborted requests will be removed from queue quickly.

Second useful thing is limitation of time which request can spend in queue. This is just additional protection in cases when queue gets stuck and clients doesn't implement their own timeouts. As the cancellation is already supported this can be nicely introduced with help of CancellationTokenSource.CancelAfter and CancellationTokenSource.CreateLinkedTokenSource.

internal class MaxConcurrentRequestsEnqueuer
{
    ...
    private readonly int _maxTimeInQueue;

    public MaxConcurrentRequestsEnqueuer(int maxQueueLength, DropMode dropMode, int maxTimeInQueue)
    {
        ...
        _maxTimeInQueue = maxTimeInQueue;
    }

    ...

    private Task<bool> InternalEnqueueAsync(CancellationToken cancellationToken)
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        TaskCompletionSource <bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        CancellationToken enqueueCancellationToken =
            GetEnqueueCancellationToken(enqueueTaskCompletionSource, cancellationToken);

        if (!enqueueCancellationToken.IsCancellationRequested)
        {
            _queue.AddLast(enqueueTaskCompletionSource);
            enqueueTask = enqueueTaskCompletionSource.Task;
        }

        return enqueueTask;
    }

    private CancellationToken GetEnqueueCancellationToken(
        TaskCompletionSource<bool> enqueueTaskCompletionSource, CancellationToken cancellationToken)
    {
        CancellationToken enqueueCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken,
            GetTimeoutToken(enqueueTaskCompletionSource)
        ).Token;

        enqueueCancellationToken.Register(CancelEnqueue, enqueueTaskCompletionSource);

        return enqueueCancellationToken;
    }

    private CancellationToken GetTimeoutToken(TaskCompletionSource<bool> enqueueTaskCompletionSource)
    {
        CancellationToken timeoutToken = CancellationToken.None;

        if (_maxTimeInQueue != MaxConcurrentRequestsOptions.MaxTimeInQueueUnlimited)
        {
            CancellationTokenSource timeoutTokenSource = new CancellationTokenSource();

            timeoutToken = timeoutTokenSource.Token;
            timeoutToken.Register(CancelEnqueue, enqueueTaskCompletionSource);

            timeoutTokenSource.CancelAfter(_maxTimeInQueue);
        }

        return timeoutToken;
    }

    ...
}

The timeout can be clearly shown through test by setting max time in queue to value lower than processing time (for example 300ms).

Visual Studio Locals Window - Content of responseInformation array for hard limit with FIFO queue and max time in queue scenario

As expected there are two groups of dropped requests, the ones which were dropped immediately and ones which were dropped after max time in queue.

There is more

This (quite long) post doesn't fully explore the subject. There are other approaches, for example based on LIFO queue. If this post got you interested in the subject I suggest further research.

More "polished" version of MaxConcurrentRequestsEnqueuer and MaxConcurrentRequestsMiddleware with demo project and tests can be found on GitHub.

Older Posts