Fetch API, Streams API, NDJSON, and ASP.NET Core MVC

Recently I've stumbled upon a blog post describing how to use Fetch API, Streams API, and NDJSON to improve user experience by streaming JSON objects from server to client. I immediately thought about a couple of places in projects I'm working on, which could benefit from that. In result, I've decided to create a POC with ASP.NET Core as a backend.

The mentioned blog post describes the client side pretty well, so I won't be focusing on it too much. If you want to get familiar with it before reading about the server side, go ahead, I'll wait.

Initial Scenario

The thing I usually find to be hard when starting work on a POC is a scenario which doesn't obfuscate the actual task. In this case I've decided to use weather forecasts idea from "ASP.NET Core application with Angular" template. Imagine a controller which returns the weather forecast for the next couple of days.

[Route("api/[controller]")]
[ApiController]
public class WeatherForecastsController : Controller
{
    public class WeatherForecast
    {
        ...
    }

    [HttpGet]
    public async Task<IEnumerable<WeatherForecast>> Get()
    {
        List<WeatherForecast> weatherForecasts = new List<WeatherForecast>();

        Random rng = new Random();

        for (int index = 1; index <= 10; index++)
        {
            await Task.Delay(100);

            weatherForecasts.Add(CreateWeatherForecast(index, rng));
        };

        return weatherForecasts;
    }

    private static WeatherForecast CreateWeatherForecast(int index, Random rng)
    {
        return new WeatherForecast
        {
            ...
        };
    }
}

I've omitted some details, which are not so important (don't worry, I will provide a link to complete demo at the end). The Task.Delay call simulates expensive server-side processing needed to generate every forecast.

On the client-side, there is a function which fetches the data from the above action and renders a table based on them.

function fetchWeatherForecasts() {
    clearWeatherForecasts();

    fetch('api/WeatherForecasts')
        .then(function (response) {
            return response.json();
        })
        .then(function (weatherForecasts) {
            weatherForecasts.forEach(appendWeatherForecast);
        });
}

This function is being called on a click of a button. The result is easy to guess. First, the current content of the table is cleared, and then the new content takes about a second to appear. Improving the user experience regarding that one-second delay is the goal of this POC.

Streaming NDJSON

How should an API for streaming objects from controller action look like? There is one similar scenario in ASP.NET Core - streaming from SignalR hub. Attempting to provide similar API in a controller could look like this.

...
public class WeatherForecastsController : Controller
{
    ...

    [HttpGet("stream")]
    public NdjsonStreamResult GetStream()
    {
        NdjsonStreamResult result = new NdjsonStreamResult();

        _ = StreamAsync(result);

        return result;
    }

    private async Task StreamAsync(NdjsonStreamResult result)
    {
        Random rng = new Random();

        for (int index = 1; index <= 10; index++)
        {
            await Task.Delay(100);

            await result.WriteAsync(CreateWeatherForecast(index, rng));
        };

        result.Complete();
    }

    ...
}

In this approach, writing an object to the NdjsonStreamResult should result in that object being immediately serialized and sent to the client. Once all objects are written, the result needs to be notified that it should complete. Pretty simple, the only challenge is NdjsonStreamResult implementation.

Typically, in ASP.NET Core, ActionResult implementations are offloading the actual processing to executors, which are usually singleton services. This case is different. There is a response specific state and constant interaction. It seems better to make the result an orchestrator. The thing that result will be orchestrating should be a writer capable of serializing the objects to the response. Instantiation of that writer is not a trivial task, so it would be nice to separate it as well, for example by introduction of a factory. Let's take a look at those three objects, starting from the bottom.

The writer should be a wrapper around response stream writer with NDJSON serialization capabilities. NDJSON is still JSON, it just serializes every object separately, puts a new line between them, and flushes after every object.

public class NdjsonTextWriter : INdjsonTextWriter
{
    private readonly TextWriter _textResponseStreamWriter;
    private readonly JsonTextWriter _jsonResponseStreamWriter;
    private readonly JsonSerializer _jsonSerializer;

    public NdjsonTextWriter(TextWriter textResponseStreamWriter, JsonSerializerSettings jsonSerializerSettings,
        JsonArrayPool<char> jsonArrayPool)
    {
        _textResponseStreamWriter = textResponseStreamWriter;

        _jsonResponseStreamWriter = new JsonTextWriter(textResponseStreamWriter)
        {
            ArrayPool = jsonArrayPool,
            CloseOutput = false,
            AutoCompleteOnClose = false
        };

        _jsonSerializer = JsonSerializer.Create(jsonSerializerSettings);
    }

    public async Task WriteAsync(object value)
    {
        _jsonSerializer.Serialize(_jsonResponseStreamWriter, value);
        await _textResponseStreamWriter.WriteAsync("\n");
        await _textResponseStreamWriter.FlushAsync();
    }

    public void Dispose()
    {
        _textResponseStreamWriter?.Dispose();
        ((IDisposable)_jsonResponseStreamWriter)?.Dispose();
    }
}

You might be wondering what JsonArrayPool is. It's an ASP.NET Core implementation of JSON.NET IArrayPool. After all, this is still JSON serialization and ASP.NET Core has a lot of infrastructure for it. It would be a waste not to use it. This also means that the factory should make use of MvcJsonOptions so serialization configuration is shared.

internal class NdjsonTextWriterFactory : INdjsonTextWriterFactory
{
    private static readonly string DEFAULT_CONTENT_TYPE = new MediaTypeHeaderValue("application/x-ndjson")
    {
        Encoding = Encoding.UTF8
    }.ToString();

    private readonly IHttpResponseStreamWriterFactory _httpResponseStreamWriterFactory;
    private readonly MvcJsonOptions _options;
    private readonly JsonArrayPool<char> _jsonArrayPool;

    public NdjsonTextWriterFactory(IHttpResponseStreamWriterFactory httpResponseStreamWriterFactory, IOptions<MvcJsonOptions> options,
        ArrayPool<char> innerJsonArrayPool)
    {
        // Null checks removed for brevity
        ...

        _jsonArrayPool = new JsonArrayPool<char>(innerJsonArrayPool);
    }

    public INdjsonTextWriter CreateWriter(ActionContext context, NdjsonStreamResult result)
    {
        // Null checks removed for brevity
        ...

        HttpResponse response = context.HttpContext.Response;

        ResponseContentTypeHelper.ResolveContentTypeAndEncoding(result.ContentType, response.ContentType, DEFAULT_CONTENT_TYPE,
            out var resolvedContentType, out var resolvedContentTypeEncoding);
        response.ContentType = resolvedContentType;

        if (result.StatusCode != null)
        {
            response.StatusCode = result.StatusCode.Value;
        }

        DisableResponseBuffering(context.HttpContext);

        return new NdjsonTextWriter(_httpResponseStreamWriterFactory.CreateWriter(response.Body, resolvedContentTypeEncoding),
            _options.SerializerSettings, _jsonArrayPool);
    }

    private static void DisableResponseBuffering(HttpContext context)
    {
        IHttpBufferingFeature bufferingFeature = context.Features.Get<IHttpBufferingFeature>();
        if (bufferingFeature != null)
        {
            bufferingFeature.DisableResponseBuffering();
        }
    }
}

The CreateWriter method does what you've probably already expected. It grabs the response, resolves the value of Content-Type header, sets a status code, with help of IHttpResponseStreamWriterFactory creates an instance of response stream writer, and instantiates NdjsonTextWriter. There is one extra thing - disabling response buffering. This is important, we don't want objects to stuck in response buffer instead of being sent to the client immediately.

With writer and factory in place, it's time to put everything together by implementing the NdjsonStreamResult. There are some traps here. First is waiting for information about completion. The ExecuteResultAsync method cannot finish before it's certain that all objects have been sent. The second trap is synchronization. It is possible that first call to WriteAsync will happen before the ExecuteResultAsync has been called or that it didn't finish preparing the writer. In such a case, WriteAsync should wait. So there are two places where we need to wait for something. As we want to make the whole implementation thread pool friendly, it might be a good idea to use TaskCompletionSource for both.

public class NdjsonStreamResult : ActionResult, IStatusCodeActionResult
{
    private INdjsonTextWriter _ndjsonTextWriter;
    private readonly TaskCompletionSource<bool> _readyTaskCompletionSource = new TaskCompletionSource<bool>();
    private readonly TaskCompletionSource<bool> _completeTaskCompletionSource = new TaskCompletionSource<bool>();

    public string ContentType { get; set; }

    public int? StatusCode { get; set; }

    public override async Task ExecuteResultAsync(ActionContext context)
    {
        // Null checks removed for brevity
        ...

        INdjsonTextWriterFactory ndjsonTextWriterFactory = context.HttpContext.RequestServices.GetRequiredService<INdjsonTextWriterFactory>();
        using (_ndjsonTextWriter = ndjsonTextWriterFactory.CreateWriter(context, this))
        {
            _readyTaskCompletionSource.SetResult(true);

            await _completeTaskCompletionSource.Task;
        }
    }

    public async Task WriteAsync(object value)
    {
        if (!_readyTaskCompletionSource.Task.IsCompletedSuccessfully)
        {
            await _readyTaskCompletionSource.Task;
        }

        await _ndjsonTextWriter.WriteAsync(value);
    }

    public void Complete()
    {
        _completeTaskCompletionSource.SetResult(true);
    }
}

This should do the trick. The last thing left is modifying the client side code.

function fetchWeatherForecastsStream() {
    clearWeatherForecasts();

    fetch('api/WeatherForecasts/stream')
        .then(function (response) {
            const weatherForecasts = response.body
                .pipeThrough(new TextDecoderStream())
                .pipeThrough(parseNDJSON());

            readWeatherForecastsStream(weatherForecasts.getReader());
        });
}

function parseNDJSON() {
    let ndjsonBuffer = '';

    return new TransformStream({
        transform: function(ndjsonChunk, controller) {
            ndjsonBuffer += ndjsonChunk;

            const jsonValues = ndjsonBuffer.split('\n');
            jsonValues.slice(0, -1).forEach(function (jsonValue) { controller.enqueue(JSON.parse(jsonValue)); });

            ndjsonBuffer = jsonValues[jsonValues.length - 1];
        },
        flush: function(controller) {
            if (ndjsonBuffer) {
                controller.enqueue(JSON.parse(ndjsonBuffer));
            }
        }
    });
}

function readWeatherForecastsStream(weatherForecastsStreamReader) {
    weatherForecastsStreamReader.read()
        .then(function (result) {
            if (!result.done) {
                appendWeatherForecast(result.value);

                readWeatherForecastsStream(weatherForecastsStreamReader);
            }
        });
}

The code above is entirely based on the blog post I've mentioned in the beginning.

So what happens when the fetchWeatherForecastsStream function is called on a click of a button? It still takes about a second for the entire table to appear, but it starts appearing almost immediately. The user sees the data appearing row by row and is able to start reading them before the whole processing is done. This improves the user experience and perceived performance of the application.

This Is Cool

Yes, this is cool. Of course, it shouldn't be blindly used everywhere. The use case must be suitable (sometimes we need to wait for all the data) and there must be a place for improvement. The streaming approach might bring some improvements if the response is simply large (by avoiding long serialization and deserialization). It will shine in scenarios with noticeable per item fetching or processing. But for small, quick to generate response it probably be an overkill.

The above implementation is a POC. There is probably a place for improvements. There also might be a bug hiding somewhere. But if you want to play with it (which I strongly encourage), the complete source code is available on GitHub.