Receiving JSON Objects Stream (NDJSON) in ASP.NET Core MVC

In this post, I'm continuing (and probably wrapping) my small series about async streams and NDJSON in .NET. The last subject remaining to complete the circle is receiving NDJSON in ASP.NET Core and working with it in an asynchronous manner.

Formatting Incoming NDJSON Into an Async Stream

What do I mean by working with incoming NDJSON request in an asynchronous manner? I mean a controller action that can accept IAsyncEnumerable<T> as a parameter and is able to access the elements as soon as they are received - without waiting for the entire request.

[ApiController]
public class WeatherForecastsController : Controller
{
    ...

    [HttpPost("stream")]
    public async Task<IActionResult> PostStream(IAsyncEnumerable<WeatherForecast> weatherForecasts)
    {
        await foreach (WeatherForecast weatherForecast in weatherForecasts)
        {
            _logger.LogInformation($"{weatherForecast.Summary} ({DateTime.UtcNow})");
        }

        return Ok();
    }
}

In order to achieve that, first we need to add support for application/x-ndjson content type. Similarly to HttpClient implementation, I'm going to limit the support to UTF-8. Also, I'm going to make IAsyncEnumerable<T> the only supported type.

public class SystemTextNdjsonInputFormatter : TextInputFormatter, IInputFormatterExceptionPolicy
{
    ...

    private static readonly Type _asyncEnumerableType = typeof(IAsyncEnumerable<>);

    public InputFormatterExceptionPolicy ExceptionPolicy => InputFormatterExceptionPolicy.MalformedInputExceptions;

    public JsonSerializerOptions SerializerOptions { get; }

    public SystemTextNdjsonInputFormatter()
    {
        SerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);

        SupportedEncodings.Add(UTF8EncodingWithoutBOM);

        SupportedMediaTypes.Add(new MediaTypeHeaderValue("application/x-ndjson"));
    }

    protected override bool CanReadType(Type type)
    {
        return type.IsGenericType && (type.GetGenericTypeDefinition() == _asyncEnumerableType);
    }

    ...
}

The second thing is to make sure that elements are deserialized and made available as soon as they are received. This is where things get a little bit challenging. In order to generate an async stream, we need an async method that returns IAsyncEnumerable<T>. This can't be an instance method of our formatter as it's not generic. This means we need to place that method in a separate type and create its instance through reflection. At the same time, that separate type must implement a non-generic interface that exposes this method so it can be called from the formatter.

public interface IAsyncEnumerableModelReader
{
    object ReadModel(Stream inputStream, JsonSerializerOptions serializerOptions);
}

public class AsyncEnumerableModelReader<T> : IAsyncEnumerableModelReader
{
    public object ReadModel(Stream inputStream, JsonSerializerOptions serializerOptions)
    {
        return ReadModelInternal(inputStream, serializerOptions);
    }

    private static async IAsyncEnumerable<T> ReadModelInternal(Stream inputStream,
        JsonSerializerOptions serializerOptions)
    {
        ...
    }
}

Returning IAsyncEnumerable<T> as an Object is perfectly fine in this case because InputFormatterResult.Success() would cause this casting anyway. Now, for the reflection part.

public class SystemTextNdjsonInputFormatter : TextInputFormatter, IInputFormatterExceptionPolicy
{
    ...

    public override Task<InputFormatterResult> ReadRequestBodyAsync(InputFormatterContext context, Encoding encoding)
    {
        ...

        Type modelReaderType = typeof(AsyncEnumerableModelReader<>)
            .MakeGenericType(context.ModelType.GetGenericArguments()[0]);
        IAsyncEnumerableModelReader modelReader = (IAsyncEnumerableModelReader)Activator.CreateInstance(modelReaderType);

        return Task.FromResult(InputFormatterResult.Success(
            modelReader.ReadModel(context.HttpContext.Request.Body, SerializerOptions)
        ));
    }
}

The final remaining part is deserialization. There is one small trap here as well - we shouldn't use EndOfStream to check if we have read everything. This is because EndOfStream can perform a synchronous read and those are disallowed in ASP.NET Core. Instead of that, we can check the value returned from ReadLineAsync() for null.

private class AsyncEnumerableModelReader<T> : IAsyncEnumerableModelReader
{
    ...

    private static async IAsyncEnumerable<T> ReadModelInternal(Stream inputStream,
        JsonSerializerOptions serializerOptions)
    {
        using StreamReader inputStreamReader = new(inputStream);

        string valueUtf8Json = await inputStreamReader.ReadLineAsync();
        while (!(valueUtf8Json is null))
        {
            yield return JsonSerializer.Deserialize<T>(valueUtf8Json, serializerOptions);

            valueUtf8Json = await inputStreamReader.ReadLineAsync();
        }
    }
}

What About Model Binding and Validation

A question which one may have here is how does it play with model binding and validation. The short answer is it doesn't. ASP.NET Core doesn't contain a model binder and validation strategy for IAsyncEnumerable<T>, so any attributes put on element type will be ignored. Trying to make it work, in my opinion, could cause more bad than good. Having support for model binding could be valuable, but model validation is problematic even at the conceptual level. The typical way to use model validation is to skip any execution if there is anything invalid in the model. This approach would negate the benefits of an async stream. If someone would want to validate individual elements as they come, this is still possible by calling TryValidateModel.

Give Me the Code

Of course, you can find ready-to-use code here. I've also made some packages available on NuGet, and you can find a demo project here.