One of the things which are certain in the case of non-trivial code running in production are errors. Sooner or later they will happen and they don't even have to result from bugs. Some resources might be down, there can be a network problem, hardware failure, and many other short-lived issues. This is why resiliency is a key aspect of every application. In this post, I want to focus on handling transient errors in a very specific context - Durable Functions.

A Word of Caution - Don’t Use Polly

The fact that Durable Functions provide their own mechanism for automatic retry on failure should be a strong suggestion that it's the way it should be done. But we are creatures of habits and we often bring those habits from other types of applications to serverless ones. One of such habits is using Polly for retries. In the case of Durable Functions (or Azure Functions in general), this can have serious consequences. The time spent at awaits is counted as execution time, this will include delays between retries. Yes, the rules are different in orchestrator functions but you can't perform I/O operations there and those are the ones you will most likely want to retry. So, the built-in automatic retry is the way to go, but sometimes it might be tricky to achieve some scenarios.

Built-in Automatic Retry

To discuss the built-in automatic retry in Durable functions I'm going to use the below activity as an example.

[FunctionName(nameof(MessageOfTheDayActivity))]
public async Task<string> MessageOfTheDayActivity([ActivityTrigger] IDurableActivityContext activityContext)
{
    HttpClient httpClient = _httpClientFactory.CreateClient();

    return await httpClient.GetStringAsync("https://host/messageOfTheDay");
}

Yes, I know that performing an HTTP request is probably not the best choice. In simple cases, it is better to use Durable Functions HTTP features. That said, HttpClient allows me to easily show various scenarios.

In the above code, I'm using GetStringAsync which will throw an exception whenever an unsuccessful response is received. If this activity will be called via CallActivityWithRetryAsync, Durable Functions will retry it according to a policy defined with RetryOptions. The minimum which must be provided is the maximum number of attempts and the first retry interval.

[FunctionName(nameof(MessageOfTheDayOrchestration))]
public async Task<string> MessageOfTheDayOrchestration([OrchestrationTrigger] IDurableOrchestrationContext orchestrationContext)
{
    string messageOfTheDay = await orchestrationContext.CallActivityWithRetryAsync<string>(
        nameof(MessageOfTheDayActivity),
        new RetryOptions(TimeSpan.FromSeconds(5), 3),
        null);

    return messageOfTheDay;
}

So this will retry all requests which have resulted in a response with an unsuccessful status code. Is this truly the correct behavior? I would say no. There are unsuccessful status codes that shouldn't be retried. A good example is 403 Forbidden. In the case of this status code, the service is telling us that we are not allowed to make the request. Can the policy be adjusted to not retry when this happens? Yes, but first we need to modify the activity so it throws an exception which propagates the information we need.

[FunctionName(nameof(MessageOfTheDayActivity))]
public async Task<string> MessageOfTheDayActivity([ActivityTrigger] IDurableActivityContext activityContext)
{
    HttpClient httpClient = _httpClientFactory.CreateClient();

    using HttpResponseMessage response = await httpClient.GetAsync("https://host/message-of-the-day");

    if (!response.IsSuccessStatusCode)
    {
        throw new MessageOfTheDayApiException(response.StatusCode);
    }

    return await response.Content.ReadAsStringAsync();
}

There is a property on RetryOptions which allows for providing a callback to determine whether an activity should be retried. The important thing to remember here is that exceptions are wrapped and to get the original one you need to look at the inner exception.

[FunctionName(nameof(MessageOfTheDayOrchestration))]
public async Task<string> MessageOfTheDayOrchestration([OrchestrationTrigger] IDurableOrchestrationContext orchestrationContext)
{
    RetryOptions retryOptions = new RetryOptions(TimeSpan.FromSeconds(5), 3)
    {
        Handle = (Exception ex) =>
        {
            MessageOfTheDayApiException messageOfTheDayApiException = ex.InnerException as MessageOfTheDayApiException;

            if (messageOfTheDayApiException != null && messageOfTheDayApiException.StatusCode == HttpStatusCode.Forbidden)
            {
                return false;
            }

            return true;
        }
    };

    string messageOfTheDay = await orchestrationContext.CallActivityWithRetryAsync<string>(
        nameof(MessageOfTheDayActivity),
        retryOptions,
        null);

    return messageOfTheDay;
}

Avoiding unnecessary retries is just one of the aspects of a mature retry policy. There are more sophisticated scenarios, which can't be handled by built-in automatic retry.

Handling More Sophisticated Cases

Another interesting HTTP response status code is 429 Too Many Requests. Receiving this status code means that you are being rate limited. You should wait and retry, but waiting is a little bit more tricky in this case. The information on how long to wait usually comes in form of the Retry-After header value. So the waiting period can be different every time and is known only after execution. How can we deal with that? First, we need to make sure that our exception propagates not only the status code but also the extracted Retry-After value (if it has been provided).

[FunctionName(nameof(MessageOfTheDayActivity))]
public async Task<string> MessageOfTheDayActivity([ActivityTrigger] IDurableActivityContext activityContext)
{
    HttpClient httpClient = _httpClientFactory.CreateClient();

    using HttpResponseMessage response = await httpClient.GetAsync("https://host/message-of-the-day");

    if (!response.IsSuccessStatusCode)
    {
        throw new MessageOfTheDayApiException(response.StatusCode, GetRetryAfter(response));
    }

    return await response.Content.ReadAsStringAsync();
}

Now, having the necessary information, we can try to do something useful with it. Of course, we can't use it as a retry interval, so built-in capabilities will not help us. What we can do is exclude this scenario from automatic retry and use two Durable Functions concepts to handle it ourselves: timers and orchestrator capability to restart itself. Once we "catch" 429, we can create a timer to wait the needed period and after that use IDurableOrchestrationContext.ContinueAsNew to restart the orchestration.

[FunctionName(nameof(MessageOfTheDayOrchestration))]
public async Task<string> MessageOfTheDayOrchestration([OrchestrationTrigger] IDurableOrchestrationContext orchestrationContext)
{
    string messageOfTheDay = String.Empty;

    RetryOptions retryOptions = new RetryOptions(TimeSpan.FromSeconds(5), 3)
    {
        Handle = (Exception ex) =>
        {
            MessageOfTheDayApiException messageOfTheDayApiException = ex.InnerException as MessageOfTheDayApiException;

            if (messageOfTheDayApiException != null && (
                messageOfTheDayApiException.StatusCode == HttpStatusCode.Forbidden ||
                messageOfTheDayApiException.StatusCode == HttpStatusCode.TooManyRequests
                ))
            {
                return false;
            }

            return true;
        }
    };

    try
    {
        messageOfTheDay = await orchestrationContext.CallActivityWithRetryAsync<string>(
            nameof(MessageOfTheDayActivity),
            retryOptions,
            null);
    }
    catch (Exception ex)
    {
        MessageOfTheDayApiException messageOfTheDayApiException = ex.InnerException as MessageOfTheDayApiException;

        if (messageOfTheDayApiException != null &&
            messageOfTheDayApiException.StatusCode == HttpStatusCode.TooManyRequests &&
            messageOfTheDayApiException.RetryAfter.HasValue)
        {
            DateTime retryAt = orchestrationContext.CurrentUtcDateTime.Add(messageOfTheDayApiException.RetryAfter.Value);
            await orchestrationContext.CreateTimer(retryAt, CancellationToken.None);

            orchestrationContext.ContinueAsNew(null);
        }
    }

    return messageOfTheDay;
}

This will allow for the proper handling of 429 Too Many Requests. This is also the way to build more complex failure handling - by leveraging Durable Functions specific mechanisms.

Be Thoughtful

Proper error handling requires consideration. You need to think about how you want to react to different situations so you don't end up with pointless retries, forever retries, or retry storms. You should also be using the right tools for the job, especially in Durable Functions which have very specific programming model and wrong patterns will cost you money or cause timeouts.

Back in 2017, I've written about WebSocket per-message compression in ASP.NET Core. Back then the built-in support was deep in backlog with no foreseeable delivery, so I've created my own hackey implementation. Luckily it's 2021 and the built-in support is about to come with ASP.NET Core 6. I was very eager to archive my repository so I've decided to quickly take a look at what changes are needed to benefit from this new capability.

Enabling and Configuring Compression Extension Negotiation

In order to allow compression extension negotiation between client and server, you need to use AcceptWebSocketAsync overload which takes WebSocketAcceptContext as parameter and set DangerousEnableCompression to true.

Let's say you have a WebSocket implementation which supports subprotocol negotiation.

WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(textSubProtocol?.SubProtocol);

The version with compression negotiation enabled would look like below.

WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(new WebSocketAcceptContext
{
    SubProtocol = textSubProtocol?.SubProtocol,
    DangerousEnableCompression = true
});

You might be wondering why such a "scary" property name. It's not because things may fail. If the client doesn't support compressions (or doesn't support compression with specific parameters), the negotiated connection will have compression disabled. It's about security. Similarly to HTTPS, encrypted WebSocket connections are subject to CRIME/BREACH attacks. If you are using encryption, you should be very cautious and not compress sensitive messages.

Regarding parameters, you can set no context takeover and max window bits through DisableServerContextTakeover and ServerMaxWindowBits properties of WebSocketAcceptContext. The documentation states that they are advanced options that control how the compression works. I did high level description of them in my previous post and going deeper into details of sliding-window compression is beyond the scope of this post. So, without going to deep, I'm going to say that they allow for tuning between memory overhead and compression ratio.

Per-Message Compression

As we already know, using encryption and compression together is dangerous from security perspective. But what if only some specific messages are of sensitive nature? There is an option to disable compression for a specific message.

Most common used overload of SendAsync is the one which takes bool for endOfMessage parameter.

await _webSocket.SendAsync(messageSegment, messageType, true, cancellationToken);

In order to be able to disable compression on per-message basis, you need to use overload which accepts WebSocketMessageFlags instead.

await _webSocket.SendAsync(messageSegment, messageType, WebSocketMessageFlags.EndOfMessage, cancellationToken);

Now you can disable compression for specific message by adding the DisableCompression flag.

await _webSocket.SendAsync(messageSegment, messageType, WebSocketMessageFlags.EndOfMessage | WebSocketMessageFlags.DisableCompression, cancellationToken);

Of course this approach can be used anytime when you want to disable compression per-message, not only in security related scenarios.

Summary

WebSocket per-message compression in ASP.NET Core 6 is feature complete and I cannot be more happy to stop using my own implementation. If you want to play with it, I've updated my WebSocket demo to use it.

Recently I've published code and written posts about working with asynchronous streaming data sources over HTTP using NDJSON. One of the follow-up questions I've received was how does it relate to async streaming coming in ASP.NET Core 6. As the answer isn't obvious, I've decided to describe the subject in more detail.

ASP.NET Core and IAsyncEnumerable

Since ASP.NET Core 3, IAsyncEnumerable can be returned directly from controller actions.

[Route("api/[controller]")]
[ApiController]
public class WeatherForecastsController : Controller
{
    ...

    [HttpGet("weather-forecast")]
    public IAsyncEnumerable<WeatherForecast> GetWeatherForecastStream()
    {
        async IAsyncEnumerable<WeatherForecast> streamWeatherForecastsAsync()
        {
            for (int daysFromToday = 1; daysFromToday <= 10; daysFromToday++)
            {
                WeatherForecast weatherForecast = await _weatherForecaster.GetWeatherForecastAsync(daysFromToday);

                yield return weatherForecast;
            };
        };

        return streamWeatherForecastsAsync();
    }
}

The ASP.NET Core will iterate IAsyncEnumerable in an asynchronous manner, buffer the result, and send it down the wire. The gain here is no blocking of calls and no risk of thread pool starvation, but there is no streaming of data to the client. If one would like to test this by making some requests, a possible first attempt could look like this.

private static async Task ConsumeJsonStreamAsync()
{
    Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Receving weather forecasts . . .");

    using HttpClient httpClient = new();

    using HttpResponseMessage response = await httpClient.GetAsync(
        "...",
        HttpCompletionOption.ResponseHeadersRead
    ).ConfigureAwait(false);

    response.EnsureSuccessStatusCode();

    IAsyncEnumerable<WeatherForecast> weatherForecasts = await response.Content
        .ReadFromJsonAsync<IAsyncEnumerable<WeatherForecast>>().ConfigureAwait(false);

    await foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] {weatherForecast.Summary}");
    }

    Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Weather forecasts has been received.");
}

This will fail. Until .NET 6 System.Text.Json doesn't support IAsyncEnumerable, the only option is to use IEnumerable.

private static async Task ConsumeJsonStreamAsync()
{
    ...

    IEnumerable<WeatherForecast> weatherForecasts = await response.Content
        .ReadFromJsonAsync<IEnumerable<WeatherForecast>>().ConfigureAwait(false);

    foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        ...
    }

    ...
}

The output will look like below (assuming it takes around 100ms to generate a single forecast).

[08:12:59.184] Receving weather forecasts . . .
[08:13:01.380] Cool
[08:13:01.381] Warm
[08:13:01.381] Sweltering
[08:13:01.381] Hot
[08:13:01.381] Chilly
[08:13:01.382] Scorching
[08:13:01.382] Hot
[08:13:01.382] Freezing
[08:13:01.382] Chilly
[08:13:01.382] Bracing
[08:13:01.382] Weather forecasts has been received.

Here the gain of properly using NDJSON is clear, as in such case the output would look more like this.

[08:13:01.400] Receving weather forecasts . . .
[08:13:01.538] Mild
[08:13:01.633] Freezing
[08:13:01.755] Mild
[08:13:01.862] Warm
[08:13:01.968] Warm
[08:13:02.075] Sweltering
[08:13:02.184] Freezing
[08:13:02.294] Chilly
[08:13:02.401] Freezing
[08:13:02.506] Hot
[08:13:02.513] Weather forecasts has been received.

Async Streaming in ASP.NET Core 6

One of ASP.NET Core improvements in .NET 6 is support for async streaming of IAsyncEnumerable. In .NET 6, System.Text.Json can serialize incoming IAsyncEnumerable in asynchronous manner. Thanks to that, the ASP.NET Core no longer buffers IAsyncEnumerable at ObjectResult level, the decision is made at output formatter level and the buffering occurs only in the case of Newtonsoft.Json based one.

Also, deserialization to IAsyncEnumerable is now supported by System.Text.Json, so the client code which was failing now works.

private static async Task ConsumeJsonStreamAsync()
{
    ...

    IAsyncEnumerable<WeatherForecast> weatherForecasts = await response.Content
        .ReadFromJsonAsync<IAsyncEnumerable<WeatherForecast>>().ConfigureAwait(false);

    await foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        ...
    }

    ...
}

Unfortunately, the result of running that code is disappointing. There is no difference from deserialization to IEnumerable. That shouldn't be a surprise as DeserializeAsync method (which is being used under the hood) signature doesn't allow streaming. This is why a new API, DeserializeAsyncEnumerable method, has been introduced to handle streaming deserialization.

private static async Task ConsumeJsonStreamAsync()
{
    ...

    Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
    IAsyncEnumerable<WeatherForecast> weatherForecasts = JsonSerializer.DeserializeAsyncEnumerable<WeatherForecast>(
        responseStream,
        new JsonSerializerOptions
        {
            PropertyNameCaseInsensitive = true,
            DefaultBufferSize = 128
        });

    await foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        ...
    }

    ...
}

This time I was surprised because the result also didn't change. As I had no idea if my expectations about the behavior were correct, I've decided to ask. Turns out I've hit a bug. I've quickly downgraded from Preview 6 to Preview 4 and finally achieved the result I was fishing for.

[08:28:51.662] Receving weather forecasts . . .
[08:28:51.967] Cool
[08:28:52.068] Sweltering
[08:28:52.288] Cool
[08:28:52.289] Freezing
[08:28:52.396] Freezing
[08:28:52.614] Cool
[08:28:52.614] Cool
[08:28:52.723] Cool
[08:28:52.851] Cool
[08:28:52.851] Chilly
[08:28:52.854] Weather forecasts has been received.

You may have noticed that I've set DefaultBufferSize while passing JsonSerializerOptions to the DeserializeAsyncEnumerable method. This is very important if one wants to achieve streaming behavior. Internally, DeserializeAsyncEnumerable will read from the stream until the buffer is full or the stream has ended. If the buffer size is large (and the default is 16KB) there will be a significant delay in asynchronous iteration (in fact you can see irregularity in the above output resulting from exactly that).

The Conclusion

Async Streaming in ASP.NET Core 6 will allow achieving similar effects to NDJSON, if you understand your data very well and know how to configure the deserialization. That's the main difference from NDJSON. In the case of NDJSON streaming capability is a consequence of the format. It is possible to implement it on top of JSON serializers/deserializers available in different platforms and languages. In the case of async streaming, it's a consequence of serializer/deserializer internal nature. That internal nature will be different on different platforms. The first example coming to mind are browsers.

The good thing is that in ASP.NET Core 6 one doesn't have to choose between async streaming and NDJSON. Because ObjectResult is no longer buffering IAsyncEnumerable, content negotiation becomes possible. I'm showing exactly that capability in ASP.NET Core 6 branch of my demo project (which will become main after GA).

In this post, I'm continuing (and probably wrapping) my small series about async streams and NDJSON in .NET. The last subject remaining to complete the circle is receiving NDJSON in ASP.NET Core and working with it in an asynchronous manner.

Formatting Incoming NDJSON Into an Async Stream

What do I mean by working with incoming NDJSON request in an asynchronous manner? I mean a controller action that can accept IAsyncEnumerable<T> as a parameter and is able to access the elements as soon as they are received - without waiting for the entire request.

[ApiController]
public class WeatherForecastsController : Controller
{
    ...

    [HttpPost("stream")]
    public async Task<IActionResult> PostStream(IAsyncEnumerable<WeatherForecast> weatherForecasts)
    {
        await foreach (WeatherForecast weatherForecast in weatherForecasts)
        {
            _logger.LogInformation($"{weatherForecast.Summary} ({DateTime.UtcNow})");
        }

        return Ok();
    }
}

In order to achieve that, first we need to add support for application/x-ndjson content type. Similarly to HttpClient implementation, I'm going to limit the support to UTF-8. Also, I'm going to make IAsyncEnumerable<T> the only supported type.

public class SystemTextNdjsonInputFormatter : TextInputFormatter, IInputFormatterExceptionPolicy
{
    ...

    private static readonly Type _asyncEnumerableType = typeof(IAsyncEnumerable<>);

    public InputFormatterExceptionPolicy ExceptionPolicy => InputFormatterExceptionPolicy.MalformedInputExceptions;

    public JsonSerializerOptions SerializerOptions { get; }

    public SystemTextNdjsonInputFormatter()
    {
        SerializerOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);

        SupportedEncodings.Add(UTF8EncodingWithoutBOM);

        SupportedMediaTypes.Add(new MediaTypeHeaderValue("application/x-ndjson"));
    }

    protected override bool CanReadType(Type type)
    {
        return type.IsGenericType && (type.GetGenericTypeDefinition() == _asyncEnumerableType);
    }

    ...
}

The second thing is to make sure that elements are deserialized and made available as soon as they are received. This is where things get a little bit challenging. In order to generate an async stream, we need an async method that returns IAsyncEnumerable<T>. This can't be an instance method of our formatter as it's not generic. This means we need to place that method in a separate type and create its instance through reflection. At the same time, that separate type must implement a non-generic interface that exposes this method so it can be called from the formatter.

public interface IAsyncEnumerableModelReader
{
    object ReadModel(Stream inputStream, JsonSerializerOptions serializerOptions);
}

public class AsyncEnumerableModelReader<T> : IAsyncEnumerableModelReader
{
    public object ReadModel(Stream inputStream, JsonSerializerOptions serializerOptions)
    {
        return ReadModelInternal(inputStream, serializerOptions);
    }

    private static async IAsyncEnumerable<T> ReadModelInternal(Stream inputStream,
        JsonSerializerOptions serializerOptions)
    {
        ...
    }
}

Returning IAsyncEnumerable<T> as an Object is perfectly fine in this case because InputFormatterResult.Success() would cause this casting anyway. Now, for the reflection part.

public class SystemTextNdjsonInputFormatter : TextInputFormatter, IInputFormatterExceptionPolicy
{
    ...

    public override Task<InputFormatterResult> ReadRequestBodyAsync(InputFormatterContext context, Encoding encoding)
    {
        ...

        Type modelReaderType = typeof(AsyncEnumerableModelReader<>)
            .MakeGenericType(context.ModelType.GetGenericArguments()[0]);
        IAsyncEnumerableModelReader modelReader = (IAsyncEnumerableModelReader)Activator.CreateInstance(modelReaderType);

        return Task.FromResult(InputFormatterResult.Success(
            modelReader.ReadModel(context.HttpContext.Request.Body, SerializerOptions)
        ));
    }
}

The final remaining part is deserialization. There is one small trap here as well - we shouldn't use EndOfStream to check if we have read everything. This is because EndOfStream can perform a synchronous read and those are disallowed in ASP.NET Core. Instead of that, we can check the value returned from ReadLineAsync() for null.

private class AsyncEnumerableModelReader<T> : IAsyncEnumerableModelReader
{
    ...

    private static async IAsyncEnumerable<T> ReadModelInternal(Stream inputStream,
        JsonSerializerOptions serializerOptions)
    {
        using StreamReader inputStreamReader = new(inputStream);

        string valueUtf8Json = await inputStreamReader.ReadLineAsync();
        while (!(valueUtf8Json is null))
        {
            yield return JsonSerializer.Deserialize<T>(valueUtf8Json, serializerOptions);

            valueUtf8Json = await inputStreamReader.ReadLineAsync();
        }
    }
}

What About Model Binding and Validation

A question which one may have here is how does it play with model binding and validation. The short answer is it doesn't. ASP.NET Core doesn't contain a model binder and validation strategy for IAsyncEnumerable<T>, so any attributes put on element type will be ignored. Trying to make it work, in my opinion, could cause more bad than good. Having support for model binding could be valuable, but model validation is problematic even at the conceptual level. The typical way to use model validation is to skip any execution if there is anything invalid in the model. This approach would negate the benefits of an async stream. If someone would want to validate individual elements as they come, this is still possible by calling TryValidateModel.

Give Me the Code

Of course, you can find ready-to-use code here. I've also made some packages available on NuGet, and you can find a demo project here.

My first post on working with NDJSON stream in .NET was a result of a use case I needed to satisfy in one of the projects I'm involved with (and it worked great). As a result, I've received some questions on this subject. I've already answered the most common one on this blog, but there were a couple more that were really frequent. So I've decided I'm going to add a few more posts to this series:

Streaming NDJSON With HttpClient

When I was writing my first post about NDJSON, I didn't know there were services out there using it as an input format. The fact is that they do, which creates a need for a way to stream NDJSON with HttpClient. The natural expectation is to be able to POST an async stream, preferably with simple and elegant code.

async IAsyncEnumerable<WeatherForecast> streamWeatherForecastsAsync()
{
    for (int daysFromToday = 1; daysFromToday <= 10; daysFromToday++)
    {
        WeatherForecast weatherForecast = await GetWeatherForecastAsync(daysFromToday).ConfigureAwait(false);

        yield return weatherForecast;
    };
};

using HttpClient httpClient = new();

using HttpResponseMessage response = await httpClient.PostAsync("...",
    new NdjsonAsyncEnumerableContent<WeatherForecast>(streamWeatherForecastsAsync())
    ).ConfigureAwait(false);

response.EnsureSuccessStatusCode();

The way to achieve the elegancy in the above snippet is through the custom implementation of HttpContent. The first part of it is the boilerplate - setting up media-type and charset, initializing serializer. I'm not going to dive into transcoding and limit the implementation to UTF-8.

public class NdjsonEnumerableContent<T> : HttpContent
{
    private static readonly JsonSerializerOptions _defaultJsonSerializerOptions = new(JsonSerializerDefaults.Web);

    private readonly IAsyncEnumerable<T> _values;
    private readonly JsonSerializerOptions _jsonSerializerOptions;

    public NdjsonAsyncEnumerableContent(IAsyncEnumerable<T> values, JsonSerializerOptions? jsonSerializerOptions = null)
    {
        _values = values ?? throw new ArgumentNullException(nameof(values));
        _jsonSerializerOptions = jsonSerializerOptions ?? _defaultJsonSerializerOptions;

        Headers.ContentType = new MediaTypeHeaderValue("application/x-ndjson")
        {
            CharSet = Encoding.UTF8.WebName
        };
    }

    ...

    protected override bool TryComputeLength(out long length)
    {
        length = -1;

        return false;
    }
}

The actual work happens in SerializeToStreamAsync. Here the goal is to enumerate the async stream, immediately serialize every incoming object and flush it over the wire together with the delimiter.

public class NdjsonEnumerableContent : HttpContent
{
    private static readonly byte[] _newlineDelimiter = Encoding.UTF8.GetBytes("\n");
    ...

    protected override async Task SerializeToStreamAsync(Stream stream, TransportContext? context)
    {
        await foreach (T value in _values.ConfigureAwait(false))
        {
            await JsonSerializer.SerializeAsync<T>(stream, value, _jsonSerializerOptions).ConfigureAwait(false);
            await stream.WriteAsync(_newlineDelimiter).ConfigureAwait(false);
            await stream.FlushAsync().ConfigureAwait(false);
        }
    }

    ...
}

This way one can fully utilize async streams when working with services that can accept NDJSON. If you are looking for something ready to use, I've made a more polished version available here with some extensions to make it even more elegant to use.

Older Posts