Implementing concurrent requests limit in ASP.NET Core for fun and education

In one of projects which I'm involved in we had to configure the limit for concurrent requests being processed. The project is quite specific, processing of a request is quite heavy and requires considerable resources. Because of that, when there was a high number of concurrent requests to process, the server was becoming unresponsive. I went on some research for best practices in this area. The "stress your server and see" seems to be the best advice but number of possible different approaches got me interested. In order to satisfy my curiosity I've decided to implement couple of them myself.

General remark

Limiting concurrent requests is a server responsibility, there shouldn't be a need for application to handle it. This is one of reasons for suggestion from the ASP.NET Team to run Kestrel behind fully featured web server like IIS or nginx (some limits support has come in 2.0). Despite that here I'm implementing a middleware. This is not that bad, registering such middleware at the beginning of the pipeline should prevent heavy processing early enough. Still I'm doing this just for fun, don't do it in production unless you have an unquestionable reason to do so.

Testing scenario

For the testing purposes I've decided to set up couple integration tests using XUnit and Microsoft.AspNetCore.TestHost. The general setup is very well described in documentation. As I intended to spawn multiple requests I wanted to have some rough timings captured for them, so I've prepared following extension method.

internal class HttpResponseMessageWithTiming
{
    internal HttpResponseMessage Response { get; set; }

    internal TimeSpan Timing { get; set; }
}

internal static class HttpClientExtensions
{
    internal static async Task<HttpResponseMessageWithTiming> GetWithTimingAsync(this HttpClient client,
        string requestUri)
    {
        Stopwatch stopwatch = Stopwatch.StartNew();

        HttpResponseMessage response = await client.GetAsync(requestUri);
        TimeSpan timing = stopwatch.Elapsed;

        stopwatch.Stop();

        return new HttpResponseMessageWithTiming
        {
            Response = response,
            Timing = timing
        };
    }
}

This is not perfect (the timings have risk of not being accurate) but should be good enough. I also wanted to have easy access to status code and timing during debugging so I've introduced another intermediate representation.

private struct HttpResponseInformation
{
    public HttpStatusCode StatusCode { get; set; }

    public TimeSpan Timing { get; set; }

    public override string ToString()
    {
        return $"StatusCode: {StatusCode} | Timing {Timing}";
    }
}

I've also created a prepare SUT method for instantiating TestServer.

private TestServer PrepareTestServer(IEnumerable<KeyValuePair<string, string>> configuration = null)
{
    IWebHostBuilder webHostBuilder = new WebHostBuilder()
        .UseStartup<Startup>();

    if (configuration != null)
    {
        ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
        configurationBuilder.AddInMemoryCollection(configuration);
        IConfiguration buildedConfiguration = configurationBuilder.Build();

        webHostBuilder.UseConfiguration(buildedConfiguration);
        webHostBuilder.ConfigureServices((services) =>
        {
            services.Configure<MaxConcurrentRequestsOptions>(options =>
                buildedConfiguration.GetSection("MaxConcurrentRequestsOptions").Bind(options)
            );
        });
    }

    return new TestServer(webHostBuilder);
}

The MaxConcurrentRequestsOptions class (empty at this point) will be used for controlling the behavior of the middleware. The Startup looks like this (to simulate long request processing):

public class Startup
{
    public void Configure(IApplicationBuilder app)
    {
        app.Run(async (context) =>
        {
            await Task.Delay(500);

            await context.Response.WriteAsync("-- Demo.AspNetCore.MaxConcurrentConnections --");
        });
    }
}

With all those elements in place I've created a general method to be reused by all tests.

private HttpResponseInformation[] GetResponseInformation(Dictionary<string, string> configuration,
    int concurrentRequestsCount)
{
    HttpResponseInformation[] responseInformation;

    using (TestServer server = PrepareTestServer(configuration))
    {
        List<HttpClient> clients = new List<HttpClient>();
        for (int i = 0; i < concurrentRequestsCount; i++)
        {
            clients.Add(server.CreateClient());
        }

        List<Task<HttpResponseMessageWithTiming>> responsesWithTimingsTasks =
            new List<Task<HttpResponseMessageWithTiming>>();
        foreach (HttpClient client in clients)
        {
            responsesWithTimingsTasks.Add(Task.Run(async () => {
                return await client.GetWithTimingAsync("/");
            }));
        }
        Task.WaitAll(responsesWithTimingsTasks.ToArray());

        clients.ForEach(client => client.Dispose());

        responseInformation = responsesWithTimingsTasks.Select(task => new HttpResponseInformation
        {
            StatusCode = task.Result.Response.StatusCode,
            Timing = task.Result.Timing
        }).ToArray();
    }

    return responseInformation;
}

This (by providing different configuration and concurrentRequestsCount) will allow me to test different approaches I'm going to play with.

Limiting concurrent requests through middleware

The first and most important thing which middleware needs to support is a hard limit. The functionality is very simple in theory, if the application is processing maximum number of concurrent request every incoming request should immediately result in 503 Service Unavailable.

public class MaxConcurrentRequestsMiddleware
{
    private int _concurrentRequestsCount;

    private readonly RequestDelegate _next;
    private readonly MaxConcurrentRequestsOptions _options;

    public MaxConcurrentRequestsMiddleware(RequestDelegate next,
        IOptions<MaxConcurrentRequestsOptions> options)
    {
        _concurrentRequestsCount = 0;

        _next = next ?? throw new ArgumentNullException(nameof(next));
        _options = options?.Value ?? throw new ArgumentNullException(nameof(options));
    }

    public async Task Invoke(HttpContext context)
    {
        if (CheckLimitExceeded())
        {
            IHttpResponseFeature responseFeature = context.Features.Get<IHttpResponseFeature>();

            responseFeature.StatusCode = StatusCodes.Status503ServiceUnavailable;
            responseFeature.ReasonPhrase = "Concurrent request limit exceeded.";
        }
        else
        {
            await _next(context);

            // TODO: Decrement concurrent requests count
        }
    }

    private bool CheckLimitExceeded()
    {
        bool limitExceeded = false;

        // TODO: Check and increment concurrent requests count

        return limitExceeded;
    }
}

The challenge hides in maintaining the count of concurrent requests. It must be incremented and decremented in a thread safe way while affecting performance as little as possible. The Interlocked class with its atomic operations for shared variables seems perfect for the job. I've decided to use the Interlocked.CompareExchange "do-while pattern" for check and increment which should ensure that value will not be exceeded.

public class MaxConcurrentRequestsMiddleware
{
    ...

    public async Task Invoke(HttpContext context)
    {
        if (CheckLimitExceeded())
        {
            ...
        }
        else
        {
            await _next(context);

            Interlocked.Decrement(ref _concurrentRequestsCount);
        }
    }

    private bool CheckLimitExceeded()
    {
        bool limitExceeded;

        int initialConcurrentRequestsCount, incrementedConcurrentRequestsCount;
        do
        {
            limitExceeded = true;

            initialConcurrentRequestsCount = _concurrentRequestsCount;
            if (initialConcurrentRequestsCount >= _options.Limit)
            {
                break;
            }

            limitExceeded = false;
            incrementedConcurrentRequestsCount = initialConcurrentRequestsCount + 1;
        }
        while (initialConcurrentRequestsCount != Interlocked.CompareExchange(
            ref _concurrentRequestsCount, incrementedConcurrentRequestsCount, initialConcurrentRequestsCount));

        return limitExceeded;
    }
}

After adding the middleware to the pipeline I've set up a test with 30 concurrent requests and 10 as MaxConcurrentRequestsOptions.Limit. The result is best observed through responseInformation temporary value.

Visual Studio Locals Window - Content of responseInformation array for hard limit scenario

There are 10 requests which resulted in 200 OK and 20 requests which resulted in 503 Service Unavailable (red frames) - the desired effect.

Queueing additional requests

The hard limit approach gets the job done, but there are more flexible approaches. In general they are based on assumption that the application can cheaply store more requests in memory than it's currently processing and client can wait for the response a little bit longer. Additional requests can wait in queue (typically a FIFO one) for resources to be available. The queue should have a size limit, otherwise the application might end up processing only the queued requests with constantly growing latency.

Because of the size limit requirement this can't be simply implemented with ConcurrentQueue, a custom (again thread safe) solution is needed. The middleware should also be able to await the queue. On a high level a class looking like the one below should provide the desired interface.

private class MaxConcurrentRequestsEnqueuer
{
    private static readonly Task<bool> _enqueueFailedTask = Task.FromResult(false);
    private readonly int _maxQueueLength;

    public MaxConcurrentRequestsEnqueuer(int maxQueueLength)
    {
        _maxQueueLength = maxQueueLength;
    }

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        // TODO: Check the size and enqueue

        return enqueueTask;
    }

    public bool Dequeue()
    {
        bool dequeued = false;

        // TODO: Dequeue

        return dequeued;
    }
}

The EnqueueAsync returns a Task<bool> so middleware can await and use the Result as indicator if the request should be processed or not.

Internally the class will maintain a Queue<TaskCompletionSource<bool>>.

internal class MaxConcurrentRequestsEnqueuer
{
    ...
    private readonly object _lock = new object();
    private readonly Queue<TaskCompletionSource<bool>> _queue = new Queue<TaskCompletionSource<bool>>();

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    TaskCompletionSource<bool> enqueueTaskCompletionSource =
                        new TaskCompletionSource<bool>();

                    _queue.Enqueue(enqueueTaskCompletionSource);

                    enqueueTask = enqueueTaskCompletionSource.Task;
                }
            }
        }

        return enqueueTask;
    }

    public bool Dequeue()
    {
        bool dequeued = false;

        lock (_lock)
        {
            if (_queue.Count > 0)
            {
                _queue.Dequeue().SetResult(true);

                dequeued = true;
            }
        }

        return dequeued;
    }
}

Yes this is locking, not perfect but the request which ends up here is going to wait anyway so it can be considered as acceptable. Some consolation can be the fact that this class can be nicely used from the middleware.

public class MaxConcurrentRequestsMiddleware
{
    ...
    private readonly MaxConcurrentRequestsEnqueuer _enqueuer;

    public MaxConcurrentRequestsMiddleware(RequestDelegate next,
        IOptions<MaxConcurrentRequestsOptions> options)
    {
        ...

        if (_options.LimitExceededPolicy != MaxConcurrentRequestsLimitExceededPolicy.Drop)
        {
            _enqueuer = new MaxConcurrentRequestsEnqueuer(_options.MaxQueueLength);
        }
    }

    public async Task Invoke(HttpContext context)
    {
        if (CheckLimitExceeded() && !(await TryWaitInQueue()))
        {
            ...
        }
        else
        {
            await _next(context);

            if (ShouldDecrementConcurrentRequestsCount())
            {
                Interlocked.Decrement(ref _concurrentRequestsCount);
            }
        }
    }

    ...

    private async Task<bool> TryWaitInQueue()
    {
        return (_enqueuer != null) && (await _enqueuer.EnqueueAsync());
    }

    private bool ShouldDecrementConcurrentRequestsCount()
    {
        return (_enqueuer == null) || !_enqueuer.Dequeue();
    }
}

Below is the result of another test. The number of concurrent request is again 30 and the max number of concurrent requests is 10. The queue size has been also set to 10. The requests which have waited in queue are the ones in orange.

Visual Studio Locals Window - Content of responseInformation array for hard limit with FIFO queue scenario

This approach is called drop tail and has an alternative in form of drop head. In case of drop head when a new requests arrives (and the queue is full) the first request in the queue is being dropped. This can often result in better latency for waiting requests at price of dropped requests having to wait as well.

The MaxConcurrentRequestsEnqueuer can be easily modified to support drop head.

internal class MaxConcurrentRequestsEnqueuer
{
    public enum DropMode
    {
        Tail = MaxConcurrentRequestsLimitExceededPolicy.FifoQueueDropTail,
        Head = MaxConcurrentRequestsLimitExceededPolicy.FifoQueueDropHead
    }

    ...
    private readonly DropMode _dropMode;

    public MaxConcurrentRequestsEnqueuer(int maxQueueLength, DropMode dropMode)
    {
        ...
        _dropMode = dropMode;
    }

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    enqueueTask = InternalEnqueueAsync();
                }
                else if (_dropMode == DropMode.Head)
                {
                    _queue.Dequeue().SetResult(false);

                    enqueueTask = InternalEnqueueAsync();
                }
            }
        }

        return enqueueTask;
    }

    ...

    private Task<bool> InternalEnqueueAsync()
    {
        TaskCompletionSource<bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        _queue.Enqueue(enqueueTaskCompletionSource);

        return enqueueTaskCompletionSource.Task;
    }
}

My test scenario is not sophisticated enough (all requests are arriving roughly at the same time) to show the difference between drop tail and drop head, but this article has some nice comparison.

Further improvements

There are two more things which would be "nice to have" for the queue. First is support for request aborting. Right now if the request gets cancelled by client it will still wait in queue taking spot which one of incoming requests could take.

In order to implement the support for request aborting the internals of MaxConcurrentRequestsEnqueuer needs to be change in a way which enables removing specific request. To achieve that the Queue<TaskCompletionSource<bool>> can no longer be used, it will be replaced with LinkedList<TaskCompletionSource<bool>>. This allows enqueue and dequeue to remain a O(1) operations and adds possibility of O(n) removal. The O(n) is a pessimistic value, in reality the aborted request will be typically at the beginning of the list so it will be found quickly. Only a couple of changes to MaxConcurrentRequestsEnqueuer is needed.

internal class MaxConcurrentRequestsEnqueuer
{
    ...
    private readonly LinkedList<TaskCompletionSource<bool>> _queue =
        new LinkedList<TaskCompletionSource<bool>>();

    ...

    public Task<bool> EnqueueAsync()
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    enqueueTask = InternalEnqueueAsync();
                }
                else if (_dropMode == DropMode.Head)
                {
                    InternalDequeue(false);

                    enqueueTask = InternalEnqueueAsync();
                }
            }
        }

        return enqueueTask;
    }

    public bool Dequeue()
    {
        bool dequeued = false;

        lock (_lock)
        {
            if (_queue.Count > 0)
            {
                InternalDequeue(true);

                dequeued = true;
            }
        }

        return dequeued;
    }

    private Task<bool> InternalEnqueueAsync()
    {
        TaskCompletionSource<bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        _queue.AddLast(enqueueTaskCompletionSource);

        return enqueueTaskCompletionSource.Task;
    }

    private void InternalDequeue(bool result)
    {
        TaskCompletionSource<bool> enqueueTaskCompletionSource = _queue.First.Value;

        _queue.RemoveFirst();

        enqueueTaskCompletionSource.SetResult(result);
    }
}

Now the support for CancellationToken (which will carry the information about request being aborted) can be added.

internal class MaxConcurrentRequestsEnqueuer
{
    ...

    public Task<bool> EnqueueAsync(CancellationToken cancellationToken)
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        if (_maxQueueLength > 0)
        {
            lock (_lock)
            {
                if (_queue.Count < _maxQueueLength)
                {
                    enqueueTask = InternalEnqueueAsync(cancellationToken);
                }
                else if (_dropMode == DropMode.Head)
                {
                    InternalDequeue(false);

                    enqueueTask = InternalEnqueueAsync(cancellationToken);
                }
            }
        }

        return enqueueTask;
    }

    ...

    private Task<bool> InternalEnqueueAsync(CancellationToken cancellationToken)
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        TaskCompletionSource <bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        cancellationToken.Register(CancelEnqueue, enqueueTaskCompletionSource);

        if (!cancellationToken.IsCancellationRequested)
        {
            _queue.AddLast(enqueueTaskCompletionSource);
            enqueueTask = enqueueTaskCompletionSource.Task;
        }

        return enqueueTask;
    }

    private void CancelEnqueue(object state)
    {
        bool removed = false;

        TaskCompletionSource<bool> enqueueTaskCompletionSource = ((TaskCompletionSource<bool>)state);
        lock (_lock)
        {
            removed = _queue.Remove(enqueueTaskCompletionSource);
        }

        if (removed)
        {
            enqueueTaskCompletionSource.SetResult(false);
        }
    }

    ...
}

The middleware can pass HttpContext.RequestAborted as the CancellationToken (it is also a good idea to wrapp the response status setting code in IsCancellationRequested check), which should ensure that all aborted requests will be removed from queue quickly.

Second useful thing is limitation of time which request can spend in queue. This is just additional protection in cases when queue gets stuck and clients doesn't implement their own timeouts. As the cancellation is already supported this can be nicely introduced with help of CancellationTokenSource.CancelAfter and CancellationTokenSource.CreateLinkedTokenSource.

internal class MaxConcurrentRequestsEnqueuer
{
    ...
    private readonly int _maxTimeInQueue;

    public MaxConcurrentRequestsEnqueuer(int maxQueueLength, DropMode dropMode, int maxTimeInQueue)
    {
        ...
        _maxTimeInQueue = maxTimeInQueue;
    }

    ...

    private Task<bool> InternalEnqueueAsync(CancellationToken cancellationToken)
    {
        Task<bool> enqueueTask = _enqueueFailedTask;

        TaskCompletionSource <bool> enqueueTaskCompletionSource = new TaskCompletionSource<bool>();

        CancellationToken enqueueCancellationToken =
            GetEnqueueCancellationToken(enqueueTaskCompletionSource, cancellationToken);

        if (!enqueueCancellationToken.IsCancellationRequested)
        {
            _queue.AddLast(enqueueTaskCompletionSource);
            enqueueTask = enqueueTaskCompletionSource.Task;
        }

        return enqueueTask;
    }

    private CancellationToken GetEnqueueCancellationToken(
        TaskCompletionSource<bool> enqueueTaskCompletionSource, CancellationToken cancellationToken)
    {
        CancellationToken enqueueCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken,
            GetTimeoutToken(enqueueTaskCompletionSource)
        ).Token;

        enqueueCancellationToken.Register(CancelEnqueue, enqueueTaskCompletionSource);

        return enqueueCancellationToken;
    }

    private CancellationToken GetTimeoutToken(TaskCompletionSource<bool> enqueueTaskCompletionSource)
    {
        CancellationToken timeoutToken = CancellationToken.None;

        if (_maxTimeInQueue != MaxConcurrentRequestsOptions.MaxTimeInQueueUnlimited)
        {
            CancellationTokenSource timeoutTokenSource = new CancellationTokenSource();

            timeoutToken = timeoutTokenSource.Token;
            timeoutToken.Register(CancelEnqueue, enqueueTaskCompletionSource);

            timeoutTokenSource.CancelAfter(_maxTimeInQueue);
        }

        return timeoutToken;
    }

    ...
}

The timeout can be clearly shown through test by setting max time in queue to value lower than processing time (for example 300ms).

Visual Studio Locals Window - Content of responseInformation array for hard limit with FIFO queue and max time in queue scenario

As expected there are two groups of dropped requests, the ones which were dropped immediately and ones which were dropped after max time in queue.

There is more

This (quite long) post doesn't fully explore the subject. There are other approaches, for example based on LIFO queue. If this post got you interested in the subject I suggest further research.

More "polished" version of MaxConcurrentRequestsEnqueuer and MaxConcurrentRequestsMiddleware with demo project and tests can be found on GitHub.