A couple of weeks ago the Cosmos DB team has announced support for patching documents. This is quite a useful and long-awaited feature, as up to this point the only way to change the stored document was to completely replace it. This feature opens up new scenarios. For example, if you are providing a Web API on top of Cosmos DB, you can now directly implement support for the PATCH request method. I happen to have a small demo of such Web API, so I've decided to refresh it and play with leveraging this new capability.

Adding PATCH Requests Support to an ASP.NET Core Web API

An important part of handling a PATCH request is deciding on the request body format. The standard approach for that, in the case of JSON-based APIs, is JSON Patch. In fact, Cosmos DB is also using JSON Patch, so it should be easier to leverage it this way.

ASP.NET Core provides support for JSON Patch, but I've decided not to go with it. Why? It's designed around applying operations to an instance of an object and as a result, has internal assumptions about the supported list of operations. In the case of Cosmos DB, the supported operations are different, and that "applying" capability is not needed (it was a great way to implement PATCH when Cosmos DB provided on replace option). I've figured out it will be better to start fresh, with a lightweight model.

public class JsonPatchOperation
{
    [Required]
    public string Op { get; set; }

    [Required]
    public string Path { get; set; }

    public object Value { get; set; }
}

public class JsonPatch : List<JsonPatchOperation>
{ }

This class is quite generic and should allow for handling any request which body is compliant with JSON Patch structure. The first concretization I want to introduce is the list of available operations. Cosmos DB currently supports five operations: Add, Set, Remove, Replace, and Increment. In this demo (at least for now) I'm skipping the Increment because it would require a little bit different handling than others.

public enum JsonPatchOperationType
{
    Add,
    Set,
    Remove,
    Replace,,
    Invalid
}

As you can see, in the above enumeration I've also added the Invalid value. This will give me a way to represent the operations I don't intend to support through a specific value instead of e.g. throwing an exception.

public class JsonPatchOperation
{
    private string _op;
    private JsonPatchOperationType _operationType;

    [Required]
    public string Op
    {
        get { return _op; }

        set
        {
            JsonPatchOperationType operationType;
            if (!Enum.TryParse(value, ignoreCase: true, result: out operationType))
            {
                operationType = JsonPatchOperationType.Invalid;
            }

            _operationType = operationType;

            _op = value;
        }
    }

    public JsonPatchOperationType OperationType => _operationType;

    ...
}

Having not supported operations represented through a specific value also allows me to implement IValidatableObject which checks for them. This way, if the class is used as an action model, making a request with an unsupported operation will trigger a 400 Bad Request response.

public class JsonPatchOperation : IValidatableObject
{
    ...

    public IEnumerable<ValidationResult> Validate(ValidationContext validationContext)
    {
        if (OperationType == JsonPatchOperationType.Invalid)
        {
            yield return new ValidationResult($"Not supported operation: {Op}.", new[] { nameof(Op) });
        }
    }
}

Now all is needed is an action that will support the PATCH method.

[Route("api/[controller]")]
[ApiController]
public class CharactersController : Controller
{
    ...

    [HttpPatch("{id}")]
    public async Task<ActionResult<Character>> Patch(string id, JsonPatch update)
    {
        ...
    }

    ...
}

The next step will be proxying the deserialized JSON Patch to Cosmos DB .NET SDK.

Utilizing Cosmos DB Partial Updates

The Cosmos DB .NET SDK exposes partial document updates through the PatchItemAsync method on a container. This method expects a collection of PatchOperation instances. Instances representing specific operations can be created through static methods on PatchOperation which names correspond to operations names. So the conversion from JsonPatch to PatchOperation collection requires calling the appropriate method for every JsonPatchOperation. This is something a simple extensions method should handle.

public static class JsonPatchExtensions
{
    public static IReadOnlyList<PatchOperation> ToCosmosPatchOperations(this JsonPatch jsonPatchOperations)
    {

        List<PatchOperation> cosmosPatchOperations = new List<PatchOperation>(jsonPatchOperations.Count);
        foreach (JsonPatchOperation jsonPatchOperation in jsonPatchOperations)
        {
            switch (jsonPatchOperation.OperationType)
            {
                case JsonPatchOperationType.Add:
                    cosmosPatchOperations.Add(PatchOperation.Add(jsonPatchOperation.Path, jsonPatchOperation.Value));
                    break;
                case JsonPatchOperationType.Remove:
                    cosmosPatchOperations.Add(PatchOperation.Remove(jsonPatchOperation.Path));
                    break;
                case JsonPatchOperationType.Replace:
                    cosmosPatchOperations.Add(PatchOperation.Replace(jsonPatchOperation.Path, jsonPatchOperation.Value));
                    break;
                case JsonPatchOperationType.Set:
                    System.Int32 test = 25;
                    cosmosPatchOperations.Add(PatchOperation.Set(jsonPatchOperation.Path, jsonPatchOperation.Value));
                    break;
            }
        }

        return cosmosPatchOperations;
    }
}

Making the proper call in our action.

[Route("api/[controller]")]
[ApiController]
public class CharactersController : Controller
{
    ...

    [HttpPatch("{id}")]
    public async Task<ActionResult<Character>> Patch(string id, JsonPatch update)
    {
        ...

        ItemResponse<Character> characterItemResponse = await _starWarsCosmosClient.Characters.PatchItemAsync<Character>(
            id,
            PartitionKey.None,
            update.ToCosmosPatchOperations());

        return characterItemResponse.Resource;
    }

    ...
}

And we have some testable code. In order to test I've decided to attempt two operations: Set on /height and Add on /weight. This is represented by below JSON Patch body.

[
  {
    "op": "set",
    "path": "/height",
    "value": 195
  },
  {
    "op": "add",
    "path": "/weight",
    "value": 90
  }
]

I've hit F5, crated the request in Postman, and clicked Send. What I've received was a 500 Internal Server Error with a weird Newtonsoft.Json deserialization exception. A quick look into Cosmos DB Explorer revealed that the document now looks like this.

{
    ...
    "height": {
        "valueKind": 4
    },
    "weight": {
        "valueKind": 4
    },
    ...
}

This is not what I was expecting. What happened? The fact that PatchItemAsync worked without an exception suggested that this must be how Cosmos DB .NET SDK interpreted the JsonPatchOperation.Value. Through debugging I've quickly discovered that what JsonPatchOperation.Value actually holds is System.Text.Json.JsonElement. The Cosmos DB .NET SDK has no other way of dealing with that than serializing public properties - regardless of how smart the implementation is. This is because Cosmos DB .NET SDK (at least for now) is based on Newtonsoft.Json.

So, this is going to be a little bit harder. Conversion from JsonPatch to PatchOperation collection will require deserializing the value along the way. As this is still part of deserializing the request, I've figured out it will be best to put it into JsonPatchOperation.

public class JsonPatchOperation : IValidatableObject
{
    ...

    public T GetValue<T>()
    {
        return ((JsonElement)Value).Deserialize<T>();
    }
}

This will need to be called with the right type parameter, so a mapping between paths and types needs to be obtained. I've decided to make this information available through JsonPatch by making it generic and spicing with reflection.

public class JsonPatch<T> : List<JsonPatchOperation>
{
    private static readonly IDictionary<string, Type> _pathsTypes;

    static JsonPatch()
    {
        _pathsTypes = typeof(T).GetProperties().ToDictionary(p => $"/{Char.ToLowerInvariant(p.Name[0]) + p.Name[1..]}", p => p.PropertyType);
    }

    public Type GetTypeForPath(string path)
    {
        return _pathsTypes[path];
    }
}

A disclaimer is in order. This simple code will work only in this simple case. I'm looking only at top-level properties because my document only has top-level properties. But, in general, a path can be targeting a nested property e.g. /mather/familyName. This code will have to get more complicated to handle such a case.

To make the conversion from JsonPatch to PatchOperation work correctly, sadly, more reflection is needed as those generic calls have to be made with the right parameters at the runtime. The needed pieces can live together with the conversion extension method.

public static class JsonPatchExtensions
{
    private static MethodInfo _createAddPatchOperationMethodInfo = typeof(JsonPatchExtensions)
        .GetMethod(nameof(JsonPatchExtensions.CreateAddPatchOperation), BindingFlags.NonPublic | BindingFlags.Static);
    private static MethodInfo _createReplacePatchOperationMethodInfo = typeof(JsonPatchExtensions)
        .GetMethod(nameof(JsonPatchExtensions.CreateReplacePatchOperation), BindingFlags.NonPublic | BindingFlags.Static);
    private static MethodInfo _createSetPatchOperationMethodInfo = typeof(JsonPatchExtensions)
        .GetMethod(nameof(JsonPatchExtensions.CreateSetPatchOperation), BindingFlags.NonPublic | BindingFlags.Static);

    ...

    private static PatchOperation CreateAddPatchOperation<T>(JsonPatchOperation jsonPatchOperation)
    {
        return PatchOperation.Add(jsonPatchOperation.Path, jsonPatchOperation.GetValue<T>());
    }

    private static PatchOperation CreateReplacePatchOperation<T>(JsonPatchOperation jsonPatchOperation)
    {
        return PatchOperation.Replace(jsonPatchOperation.Path, jsonPatchOperation.GetValue<T>());
    }

    private static PatchOperation CreateSetPatchOperation<T>(JsonPatchOperation jsonPatchOperation)
    {
        return PatchOperation.Set(jsonPatchOperation.Path, jsonPatchOperation.GetValue<T>());
    }

    private static PatchOperation CreatePatchOperation<T>(
        MethodInfo createSpecificPatchOperationMethodInfo,
        JsonPatch<T> jsonPatchOperations,
        JsonPatchOperation jsonPatchOperation)
    {
        Type jsonPatchOperationValueType = jsonPatchOperations.GetTypeForPath(jsonPatchOperation.Path);

        MethodInfo createSpecificPatchOperationWithValueTypeMethodInfo =
            createSpecificPatchOperationMethodInfo.MakeGenericMethod(jsonPatchOperationValueType);

        return (PatchOperation)createSpecificPatchOperationWithValueTypeMethodInfo.Invoke(null, new object[] { jsonPatchOperation });
    }
}

The code above has been structured to isolate the reflection related part and cache the well-known things. Of course, this is a subject of personal preference, but I hope it's readable.

The last remaining thing is modifying the extension method.

public static class JsonPatchExtensions
{
    ...

    public static IReadOnlyList<PatchOperation> ToCosmosPatchOperations<T>(this JsonPatch jsonPatchOperations)
    {
        List<PatchOperation> cosmosPatchOperations = new List<PatchOperation>(jsonPatchOperations.Count);
        foreach (JsonPatchOperation jsonPatchOperation in jsonPatchOperations)
        {
            switch (jsonPatchOperation.OperationType)
            {
                case JsonPatchOperationType.Add:
                    cosmosPatchOperations.Add(CreatePatchOperation(_createAddPatchOperationMethodInfo, jsonPatchOperations, jsonPatchOperation));
                    break;
                case JsonPatchOperationType.Remove:
                    cosmosPatchOperations.Add(PatchOperation.Remove(jsonPatchOperation.Path));
                    break;
                case JsonPatchOperationType.Replace:
                    cosmosPatchOperations.Add(CreatePatchOperation(_createReplacePatchOperationMethodInfo, jsonPatchOperations, jsonPatchOperation));
                    break;
                case JsonPatchOperationType.Set:
                    cosmosPatchOperations.Add(CreatePatchOperation(_createSetPatchOperationMethodInfo, jsonPatchOperations, jsonPatchOperation));
                    break;
            }
        }

        return cosmosPatchOperations;
    }

    ...
}

Adjusting the action code, building the demo, running, going to Postman, sending a request, and it works!

What’s Missing?

This is demoware, so there is always something missing. I've already mentioned that parts of the code are handling only simple cases. But there are two additional subjects which require more attention.

One is validation. This code is not validating if paths provided for operations represent valid properties. It's also not validating if values can be converted to the right types and if the operations result in an invalid state of the entity. First, two things should be achievable at the JsonPatch level. The last one will require additional code in action.

The other subject is performance around that reflection code. It can be improved by caching the generic methods for specific types, but as the solution grows it might not be enough. It is worth thinking about another option here.

I might tackle those two subjects, but I don't know if and when. You can keep an eye on the demo project because if I do, the code will certainly end there.

One of the common requests for my Server-Sent Events library is the ability to disconnect clients from a server. My usual answer was that this is best to be implemented as a logical operation where the server sends an event with a specific type and clients to react to it by closing the connection. I was avoiding putting disconnect functionality into the library because it's not fully defined by the protocol.

Disconnecting a Client in Server-Sent Events

Disconnecting a client from a server is a little bit tricky when it comes to Server-Sent Events. The reason for that is automatic reconnect. If the server simply closes the connection, the client will wait a defined period of time and reconnect. The only way to prevent the client from reconnecting is to respond with a 204 No Content status code. So complete flow of disconnecting a client should look like on the diagram below.

Server-Sent Events Client Disconnect Flow Diagram

There is just one challenge here - the server needs to be able to tell that it is the same client trying to reconnect.

Identifying Clients in Server-Sent Events

Server-Sent Events doesn't provide any specific way of identifying clients. No dedicated session mechanism. This is a place where choices and opinions are starting, a place where a library should try to stay generic. I've given a lot of thought to whether I want it to include any specific implementation with the library and I've decided to provide only the contract and a no-op implementation.

public interface IServerSentEventsClientIdProvider
{
    Guid AcquireClientId(HttpContext context);

    void ReleaseClientId(Guid clientId, HttpContext context);
}

This gives consumers full freedom (and responsibility) to implement this aspect in whichever way they prefer. For example, below is a simple cookie-based implementation. In the case of a production scenario, you probably want to think a little bit more about cookie options, if the value should be protected, etc. Also, remember that cookies bear legal requirements.

internal class CookieBasedServerSentEventsClientIdProvider : IServerSentEventsClientIdProvider
{
    private const string COOKIE_NAME = ".ServerSentEvents.Guid";

    public Guid AcquireClientId(HttpContext context)
    {
        Guid clientId;

        string cookieValue = context.Request.Cookies[COOKIE_NAME];
        if (String.IsNullOrWhiteSpace(cookieValue) || !Guid.TryParse(cookieValue, out clientId))
        {
            clientId = Guid.NewGuid();

            context.Response.Cookies.Append(COOKIE_NAME, clientId.ToString());
        }

        return clientId;
    }

    public void ReleaseClientId(Guid clientId, HttpContext context)
    {
        context.Response.Cookies.Delete(COOKIE_NAME);
    }
}

Tracking Disconnected Server-Sent Events Clients

Being able to identify a client is only the first step. The second required thing is tracking the disconnected clients, so when they attempt to reconnect the server can respond with 204 No Content.

public interface IServerSentEventsNoReconnectClientsIdsStore
{
    Task AddClientId(Guid clientId);

    Task<bool> ContainsClientId(Guid clientId);

    Task RemoveClientId(Guid clientId);
}

This part doesn't seem to be complicated until you consider scaling out. When there are multiple instances behind a load balancer, there is a possibility that reconnect attempt will reach a different instance than the one to which the client has been previously connected. This is why I've decided to include two implementations of the above store in the library. One is simply keeping the identifiers in memory, while the second is backed by distributed cache.

Putting Things Together

The high-level flow which library currently performs to handle Server-Sent Events requests looks like below.

public class ServerSentEventsMiddleware<TServerSentEventsService> ...
{
    ...

    public async Task Invoke(HttpContext context, IPolicyEvaluator policyEvaluator)
    {
        if (CheckAcceptHeader(context.Request.Headers))
        {
            if (!await AuthorizeAsync(context, policyEvaluator))
            {
                return;
            }

            ...

            await context.Response.AcceptAsync(_serverSentEventsOptions.OnPrepareAccept);

            ServerSentEventsClient client = new ServerSentEventsClient(clientId, context.User, context.Response, _clientDisconnectServicesAvailable);

            ...

            await ConnectClientAsync(context.Request, client);

            await context.RequestAborted.WaitAsync();

            await DisconnectClientAsync(context.Request, client);
        }
        else
        {
            await _next(context);
        }
    }

    ...
}

To enable the disconnect capability, this flow needs to be adjusted to acquire the client identifier as soon as possible and prevent connection (by responding with 204) if the identifier represents a client which shouldn't be allowed to reconnect.

public class ServerSentEventsMiddleware<TServerSentEventsService> ...
{
    ...

    public async Task Invoke(HttpContext context, IPolicyEvaluator policyEvaluator)
    {
        if (CheckAcceptHeader(context.Request.Headers))
        {
            if (!await AuthorizeAsync(context, policyEvaluator))
            {
                return;
            }

            Guid clientId = _serverSentEventsClientIdProvider.AcquireClientId(context);

            if (await PreventReconnectAsync(clientId, context))
            {
                return;
            }

            ...

            await context.Response.AcceptAsync(_serverSentEventsOptions.OnPrepareAccept);

            ...

            await DisconnectClientAsync(context.Request, client);
        }
        else
        {
            await _next(context);
        }
    }

    ...

    private async Task PreventReconnectAsync(Guid clientId, HttpContext context)
    {
        if (!await _serverSentEventsNoReconnectClientsIdsStore.ContainsClientIdAsync(clientId))
        {
            return false;
        }

        response.StatusCode = StatusCodes.Status204NoContent;

        _serverSentEventsClientIdProvider.ReleaseClientId(clientId, context);

        await _serverSentEventsNoReconnectClientsIdsStore.RemoveClientIdAsync(clientId);

        return true;
    }

    ...
}

You can also see that as part of reconnecting prevention I'm releasing the client identifier and removing it from the "no reconnect" identifiers store. The goal is to allow any underlying implementations to clear any data and make sure that a stale identifier doesn't "stick" with the client.

One more thing which requires adjustment are operations performed when a client is being disconnected. If the client is being disconnected by a server, its identifier should be added to the store. If the disconnect happens on the client-side, the client identifier should be released to avoid staleness.

public class ServerSentEventsMiddleware<TServerSentEventsService> ...
{
    ...

    private async Task DisconnectClientAsync(HttpRequest request, ServerSentEventsClient client)
    {
        ...

        if (client.PreventReconnect)
        {
            await _serverSentEventsNoReconnectClientsIdsStore.AddClientIdAsync(client.Id);
        }
        else
        {
            _serverSentEventsClientIdProvider.ReleaseClientId(client.Id, request.HttpContext);
        }

        ...
    }

    ...
}

Coming Soon to a NuGet Feed Near You

The code is ready and I'm going to push it out with the next release of Lib.AspNetCore.ServerSentEvents. So, if you have been waiting for disconnect capabilities in the library they soon will be there. That said, it still leaves some decisions and implementation to consumers. By sharing parts of my thought process and some implementation details I wanted to make clear why it is like this.

One of the things which are certain in the case of non-trivial code running in production are errors. Sooner or later they will happen and they don't even have to result from bugs. Some resources might be down, there can be a network problem, hardware failure, and many other short-lived issues. This is why resiliency is a key aspect of every application. In this post, I want to focus on handling transient errors in a very specific context - Durable Functions.

A Word of Caution - Don’t Use Polly

The fact that Durable Functions provide their own mechanism for automatic retry on failure should be a strong suggestion that it's the way it should be done. But we are creatures of habits and we often bring those habits from other types of applications to serverless ones. One of such habits is using Polly for retries. In the case of Durable Functions (or Azure Functions in general), this can have serious consequences. The time spent at awaits is counted as execution time, this will include delays between retries. Yes, the rules are different in orchestrator functions but you can't perform I/O operations there and those are the ones you will most likely want to retry. So, the built-in automatic retry is the way to go, but sometimes it might be tricky to achieve some scenarios.

Built-in Automatic Retry

To discuss the built-in automatic retry in Durable functions I'm going to use the below activity as an example.

[FunctionName(nameof(MessageOfTheDayActivity))]
public async Task<string> MessageOfTheDayActivity([ActivityTrigger] IDurableActivityContext activityContext)
{
    HttpClient httpClient = _httpClientFactory.CreateClient();

    return await httpClient.GetStringAsync("https://host/messageOfTheDay");
}

Yes, I know that performing an HTTP request is probably not the best choice. In simple cases, it is better to use Durable Functions HTTP features. That said, HttpClient allows me to easily show various scenarios.

In the above code, I'm using GetStringAsync which will throw an exception whenever an unsuccessful response is received. If this activity will be called via CallActivityWithRetryAsync, Durable Functions will retry it according to a policy defined with RetryOptions. The minimum which must be provided is the maximum number of attempts and the first retry interval.

[FunctionName(nameof(MessageOfTheDayOrchestration))]
public async Task<string> MessageOfTheDayOrchestration([OrchestrationTrigger] IDurableOrchestrationContext orchestrationContext)
{
    string messageOfTheDay = await orchestrationContext.CallActivityWithRetryAsync<string>(
        nameof(MessageOfTheDayActivity),
        new RetryOptions(TimeSpan.FromSeconds(5), 3),
        null);

    return messageOfTheDay;
}

So this will retry all requests which have resulted in a response with an unsuccessful status code. Is this truly the correct behavior? I would say no. There are unsuccessful status codes that shouldn't be retried. A good example is 403 Forbidden. In the case of this status code, the service is telling us that we are not allowed to make the request. Can the policy be adjusted to not retry when this happens? Yes, but first we need to modify the activity so it throws an exception which propagates the information we need.

[FunctionName(nameof(MessageOfTheDayActivity))]
public async Task<string> MessageOfTheDayActivity([ActivityTrigger] IDurableActivityContext activityContext)
{
    HttpClient httpClient = _httpClientFactory.CreateClient();

    using HttpResponseMessage response = await httpClient.GetAsync("https://host/message-of-the-day");

    if (!response.IsSuccessStatusCode)
    {
        throw new MessageOfTheDayApiException(response.StatusCode);
    }

    return await response.Content.ReadAsStringAsync();
}

There is a property on RetryOptions which allows for providing a callback to determine whether an activity should be retried. The important thing to remember here is that exceptions are wrapped and to get the original one you need to look at the inner exception.

[FunctionName(nameof(MessageOfTheDayOrchestration))]
public async Task<string> MessageOfTheDayOrchestration([OrchestrationTrigger] IDurableOrchestrationContext orchestrationContext)
{
    RetryOptions retryOptions = new RetryOptions(TimeSpan.FromSeconds(5), 3)
    {
        Handle = (Exception ex) =>
        {
            MessageOfTheDayApiException messageOfTheDayApiException = ex.InnerException as MessageOfTheDayApiException;

            if (messageOfTheDayApiException != null && messageOfTheDayApiException.StatusCode == HttpStatusCode.Forbidden)
            {
                return false;
            }

            return true;
        }
    };

    string messageOfTheDay = await orchestrationContext.CallActivityWithRetryAsync<string>(
        nameof(MessageOfTheDayActivity),
        retryOptions,
        null);

    return messageOfTheDay;
}

Avoiding unnecessary retries is just one of the aspects of a mature retry policy. There are more sophisticated scenarios, which can't be handled by built-in automatic retry.

Handling More Sophisticated Cases

Another interesting HTTP response status code is 429 Too Many Requests. Receiving this status code means that you are being rate limited. You should wait and retry, but waiting is a little bit more tricky in this case. The information on how long to wait usually comes in form of the Retry-After header value. So the waiting period can be different every time and is known only after execution. How can we deal with that? First, we need to make sure that our exception propagates not only the status code but also the extracted Retry-After value (if it has been provided).

[FunctionName(nameof(MessageOfTheDayActivity))]
public async Task<string> MessageOfTheDayActivity([ActivityTrigger] IDurableActivityContext activityContext)
{
    HttpClient httpClient = _httpClientFactory.CreateClient();

    using HttpResponseMessage response = await httpClient.GetAsync("https://host/message-of-the-day");

    if (!response.IsSuccessStatusCode)
    {
        throw new MessageOfTheDayApiException(response.StatusCode, GetRetryAfter(response));
    }

    return await response.Content.ReadAsStringAsync();
}

Now, having the necessary information, we can try to do something useful with it. Of course, we can't use it as a retry interval, so built-in capabilities will not help us. What we can do is exclude this scenario from automatic retry and use two Durable Functions concepts to handle it ourselves: timers and orchestrator capability to restart itself. Once we "catch" 429, we can create a timer to wait the needed period and after that use IDurableOrchestrationContext.ContinueAsNew to restart the orchestration.

[FunctionName(nameof(MessageOfTheDayOrchestration))]
public async Task<string> MessageOfTheDayOrchestration([OrchestrationTrigger] IDurableOrchestrationContext orchestrationContext)
{
    string messageOfTheDay = String.Empty;

    RetryOptions retryOptions = new RetryOptions(TimeSpan.FromSeconds(5), 3)
    {
        Handle = (Exception ex) =>
        {
            MessageOfTheDayApiException messageOfTheDayApiException = ex.InnerException as MessageOfTheDayApiException;

            if (messageOfTheDayApiException != null && (
                messageOfTheDayApiException.StatusCode == HttpStatusCode.Forbidden ||
                messageOfTheDayApiException.StatusCode == HttpStatusCode.TooManyRequests
                ))
            {
                return false;
            }

            return true;
        }
    };

    try
    {
        messageOfTheDay = await orchestrationContext.CallActivityWithRetryAsync<string>(
            nameof(MessageOfTheDayActivity),
            retryOptions,
            null);
    }
    catch (Exception ex)
    {
        MessageOfTheDayApiException messageOfTheDayApiException = ex.InnerException as MessageOfTheDayApiException;

        if (messageOfTheDayApiException != null &&
            messageOfTheDayApiException.StatusCode == HttpStatusCode.TooManyRequests &&
            messageOfTheDayApiException.RetryAfter.HasValue)
        {
            DateTime retryAt = orchestrationContext.CurrentUtcDateTime.Add(messageOfTheDayApiException.RetryAfter.Value);
            await orchestrationContext.CreateTimer(retryAt, CancellationToken.None);

            orchestrationContext.ContinueAsNew(null);
        }
    }

    return messageOfTheDay;
}

This will allow for the proper handling of 429 Too Many Requests. This is also the way to build more complex failure handling - by leveraging Durable Functions specific mechanisms.

Be Thoughtful

Proper error handling requires consideration. You need to think about how you want to react to different situations so you don't end up with pointless retries, forever retries, or retry storms. You should also be using the right tools for the job, especially in Durable Functions which have very specific programming model and wrong patterns will cost you money or cause timeouts.

Back in 2017, I've written about WebSocket per-message compression in ASP.NET Core. Back then the built-in support was deep in backlog with no foreseeable delivery, so I've created my own hackey implementation. Luckily it's 2021 and the built-in support is about to come with ASP.NET Core 6. I was very eager to archive my repository so I've decided to quickly take a look at what changes are needed to benefit from this new capability.

Enabling and Configuring Compression Extension Negotiation

In order to allow compression extension negotiation between client and server, you need to use AcceptWebSocketAsync overload which takes WebSocketAcceptContext as parameter and set DangerousEnableCompression to true.

Let's say you have a WebSocket implementation which supports subprotocol negotiation.

WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(textSubProtocol?.SubProtocol);

The version with compression negotiation enabled would look like below.

WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync(new WebSocketAcceptContext
{
    SubProtocol = textSubProtocol?.SubProtocol,
    DangerousEnableCompression = true
});

You might be wondering why such a "scary" property name. It's not because things may fail. If the client doesn't support compressions (or doesn't support compression with specific parameters), the negotiated connection will have compression disabled. It's about security. Similarly to HTTPS, encrypted WebSocket connections are subject to CRIME/BREACH attacks. If you are using encryption, you should be very cautious and not compress sensitive messages.

Regarding parameters, you can set no context takeover and max window bits through DisableServerContextTakeover and ServerMaxWindowBits properties of WebSocketAcceptContext. The documentation states that they are advanced options that control how the compression works. I did high level description of them in my previous post and going deeper into details of sliding-window compression is beyond the scope of this post. So, without going to deep, I'm going to say that they allow for tuning between memory overhead and compression ratio.

Per-Message Compression

As we already know, using encryption and compression together is dangerous from security perspective. But what if only some specific messages are of sensitive nature? There is an option to disable compression for a specific message.

Most common used overload of SendAsync is the one which takes bool for endOfMessage parameter.

await _webSocket.SendAsync(messageSegment, messageType, true, cancellationToken);

In order to be able to disable compression on per-message basis, you need to use overload which accepts WebSocketMessageFlags instead.

await _webSocket.SendAsync(messageSegment, messageType, WebSocketMessageFlags.EndOfMessage, cancellationToken);

Now you can disable compression for specific message by adding the DisableCompression flag.

await _webSocket.SendAsync(messageSegment, messageType, WebSocketMessageFlags.EndOfMessage | WebSocketMessageFlags.DisableCompression, cancellationToken);

Of course this approach can be used anytime when you want to disable compression per-message, not only in security related scenarios.

Summary

WebSocket per-message compression in ASP.NET Core 6 is feature complete and I cannot be more happy to stop using my own implementation. If you want to play with it, I've updated my WebSocket demo to use it.

Recently I've published code and written posts about working with asynchronous streaming data sources over HTTP using NDJSON. One of the follow-up questions I've received was how does it relate to async streaming coming in ASP.NET Core 6. As the answer isn't obvious, I've decided to describe the subject in more detail.

ASP.NET Core and IAsyncEnumerable

Since ASP.NET Core 3, IAsyncEnumerable can be returned directly from controller actions.

[Route("api/[controller]")]
[ApiController]
public class WeatherForecastsController : Controller
{
    ...

    [HttpGet("weather-forecast")]
    public IAsyncEnumerable<WeatherForecast> GetWeatherForecastStream()
    {
        async IAsyncEnumerable<WeatherForecast> streamWeatherForecastsAsync()
        {
            for (int daysFromToday = 1; daysFromToday <= 10; daysFromToday++)
            {
                WeatherForecast weatherForecast = await _weatherForecaster.GetWeatherForecastAsync(daysFromToday);

                yield return weatherForecast;
            };
        };

        return streamWeatherForecastsAsync();
    }
}

The ASP.NET Core will iterate IAsyncEnumerable in an asynchronous manner, buffer the result, and send it down the wire. The gain here is no blocking of calls and no risk of thread pool starvation, but there is no streaming of data to the client. If one would like to test this by making some requests, a possible first attempt could look like this.

private static async Task ConsumeJsonStreamAsync()
{
    Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Receving weather forecasts . . .");

    using HttpClient httpClient = new();

    using HttpResponseMessage response = await httpClient.GetAsync(
        "...",
        HttpCompletionOption.ResponseHeadersRead
    ).ConfigureAwait(false);

    response.EnsureSuccessStatusCode();

    IAsyncEnumerable<WeatherForecast> weatherForecasts = await response.Content
        .ReadFromJsonAsync<IAsyncEnumerable<WeatherForecast>>().ConfigureAwait(false);

    await foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] {weatherForecast.Summary}");
    }

    Console.WriteLine($"[{DateTime.UtcNow:hh:mm:ss.fff}] Weather forecasts has been received.");
}

This will fail. Until .NET 6 System.Text.Json doesn't support IAsyncEnumerable, the only option is to use IEnumerable.

private static async Task ConsumeJsonStreamAsync()
{
    ...

    IEnumerable<WeatherForecast> weatherForecasts = await response.Content
        .ReadFromJsonAsync<IEnumerable<WeatherForecast>>().ConfigureAwait(false);

    foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        ...
    }

    ...
}

The output will look like below (assuming it takes around 100ms to generate a single forecast).

[08:12:59.184] Receving weather forecasts . . .
[08:13:01.380] Cool
[08:13:01.381] Warm
[08:13:01.381] Sweltering
[08:13:01.381] Hot
[08:13:01.381] Chilly
[08:13:01.382] Scorching
[08:13:01.382] Hot
[08:13:01.382] Freezing
[08:13:01.382] Chilly
[08:13:01.382] Bracing
[08:13:01.382] Weather forecasts has been received.

Here the gain of properly using NDJSON is clear, as in such case the output would look more like this.

[08:13:01.400] Receving weather forecasts . . .
[08:13:01.538] Mild
[08:13:01.633] Freezing
[08:13:01.755] Mild
[08:13:01.862] Warm
[08:13:01.968] Warm
[08:13:02.075] Sweltering
[08:13:02.184] Freezing
[08:13:02.294] Chilly
[08:13:02.401] Freezing
[08:13:02.506] Hot
[08:13:02.513] Weather forecasts has been received.

Async Streaming in ASP.NET Core 6

One of ASP.NET Core improvements in .NET 6 is support for async streaming of IAsyncEnumerable. In .NET 6, System.Text.Json can serialize incoming IAsyncEnumerable in asynchronous manner. Thanks to that, the ASP.NET Core no longer buffers IAsyncEnumerable at ObjectResult level, the decision is made at output formatter level and the buffering occurs only in the case of Newtonsoft.Json based one.

Also, deserialization to IAsyncEnumerable is now supported by System.Text.Json, so the client code which was failing now works.

private static async Task ConsumeJsonStreamAsync()
{
    ...

    IAsyncEnumerable<WeatherForecast> weatherForecasts = await response.Content
        .ReadFromJsonAsync<IAsyncEnumerable<WeatherForecast>>().ConfigureAwait(false);

    await foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        ...
    }

    ...
}

Unfortunately, the result of running that code is disappointing. There is no difference from deserialization to IEnumerable. That shouldn't be a surprise as DeserializeAsync method (which is being used under the hood) signature doesn't allow streaming. This is why a new API, DeserializeAsyncEnumerable method, has been introduced to handle streaming deserialization.

private static async Task ConsumeJsonStreamAsync()
{
    ...

    Stream responseStream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
    IAsyncEnumerable<WeatherForecast> weatherForecasts = JsonSerializer.DeserializeAsyncEnumerable<WeatherForecast>(
        responseStream,
        new JsonSerializerOptions
        {
            PropertyNameCaseInsensitive = true,
            DefaultBufferSize = 128
        });

    await foreach (WeatherForecast weatherForecast in weatherForecasts)
    {
        ...
    }

    ...
}

This time I was surprised because the result also didn't change. As I had no idea if my expectations about the behavior were correct, I've decided to ask. Turns out I've hit a bug. I've quickly downgraded from Preview 6 to Preview 4 and finally achieved the result I was fishing for.

[08:28:51.662] Receving weather forecasts . . .
[08:28:51.967] Cool
[08:28:52.068] Sweltering
[08:28:52.288] Cool
[08:28:52.289] Freezing
[08:28:52.396] Freezing
[08:28:52.614] Cool
[08:28:52.614] Cool
[08:28:52.723] Cool
[08:28:52.851] Cool
[08:28:52.851] Chilly
[08:28:52.854] Weather forecasts has been received.

You may have noticed that I've set DefaultBufferSize while passing JsonSerializerOptions to the DeserializeAsyncEnumerable method. This is very important if one wants to achieve streaming behavior. Internally, DeserializeAsyncEnumerable will read from the stream until the buffer is full or the stream has ended. If the buffer size is large (and the default is 16KB) there will be a significant delay in asynchronous iteration (in fact you can see irregularity in the above output resulting from exactly that).

The Conclusion

Async Streaming in ASP.NET Core 6 will allow achieving similar effects to NDJSON, if you understand your data very well and know how to configure the deserialization. That's the main difference from NDJSON. In the case of NDJSON streaming capability is a consequence of the format. It is possible to implement it on top of JSON serializers/deserializers available in different platforms and languages. In the case of async streaming, it's a consequence of serializer/deserializer internal nature. That internal nature will be different on different platforms. The first example coming to mind are browsers.

The good thing is that in ASP.NET Core 6 one doesn't have to choose between async streaming and NDJSON. Because ObjectResult is no longer buffering IAsyncEnumerable, content negotiation becomes possible. I'm showing exactly that capability in ASP.NET Core 6 branch of my demo project (which will become main after GA).

Older Posts