Recently I've received a question regarding my previous post on using HttpClient with Azure Functions. It made me realize that I'm long overdue with a follow-up because that's not the best approach anymore.

Moving to Dependency Injection

In my previous approach, I was creating an extension to leverage internally available dependency injection for the purpose of using HttpClientFactory. Back then it was the only way to use dependency injection in Azure Functions, but that's no longer the case. A few months ago Azure Functions has been given the first-class support for dependency injection. This means that extension is no longer needed, HttpClientFactory can be registered and used directly from the functions project level.

Registering services with Azure Functions dependency injection requires a startup class. This class must inherit from FunctionsStartup, which allows for overriding the Configure method. The Configure method will be passed an instance of IFunctionsHostBuilder by the host, with which we can register HttpClientFactory. In order for the host to find the startup class, there must be an assembly level FunctionsStartupAttribute pointing to it.

[assembly: FunctionsStartup(typeof(Startup))]

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.Services.AddHttpClient();
    }
}

Consuming a service available through dependency injection requires instance-based approach to functions implementation. This is because Azure Functions supports constructor-based dependency injection. The original version of this post suggested having HttpClient as a constructor parameter, but as pointed out in comments it doesn't work. What needs to be injected is the registered HttpClientFactory, which later can be used to create an instance of HttpClient.

public class PeriodicHealthCheckFunction
{
    private readonly IHttpClientFactory _httpClientFactory;

    public PeriodicHealthCheckFunction(IHttpClientFactory httpClientFactory)
    {
        _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
    }

    [FunctionName("PeriodicHealthCheckFunction")]
    public async Task Run(
        [TimerTrigger("0 */1 * * * *")]TimerInfo healthCheckTimer,
        ILogger log)
    {
        HttpClient httpClient = _httpClientFactory.CreateClient();

        string status = await httpClient.GetStringAsync("https://localhost:5001/healthcheck");

        log.LogInformation($"Health check performed at: {DateTime.UtcNow} | Status: {status}");
    }
}

This works exactly the same as my extension from the previous post but it's much simpler.

Greater Flexibility

This approach, in addition to being simpler, also provides greater flexibility. Using named or typed clients with the previous approach was possible, but not that easy. Now it just requires changing the registration code and constructor parameter. For example, we might want to encapsulate HttpClient and instead, use a service that exposes methods for handling specific calls (it will also allow us to avoid mentioned above issue).

public interface IHealthCheckClient
{
    Task<String> GetStatusAsync();
}

public class HealthCheckClient: IHealthCheckClient
{
    private readonly HttpClient _client;

    public HealthCheckClient(HttpClient client)
    {
        _client = client;
    }

    public Task<String> GetStatusAsync()
    {
        return _client.GetStringAsync("/");
    }
}

We can use one of many AddHttpClient overloads to register that service with some default configuration.

[assembly: FunctionsStartup(typeof(Startup))]

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.Services.AddHttpClient<IHealthCheckClient, HealthCheckClient>(client =>
        {
            client.BaseAddress = new Uri("https://localhost:5001/healthcheck");
        });
    }
}

Now we only need to change the constructor parameter to the interface of the service.

public class PeriodicHealthCheckFunction
{
    private readonly IHealthCheckClient _healthCheckClient;

    public PeriodicHealthCheckFunction(IHealthCheckClient healthCheckClient)
    {
        _healthCheckClient = healthCheckClient ?? throw new ArgumentNullException(nameof(healthCheckClient));
    }

    [FunctionName("PeriodicHealthCheckFunction")]
    public async Task Run(
        [TimerTrigger("0 */1 * * * *")]TimerInfo healthCheckTimer,
        ILogger log)
    {
        string status = await _healthCheckClient.GetStatusAsync();

        log.LogInformation($"Health check performed at: {DateTime.UtcNow} | Status: {status}");
    }
}

This is much better than the previous approach, so I'm going to obsolete/archive my GitHub repository with sample code.

I've heard about RSocket for the first time about two years ago, from this short (20 minutes) talk. If you want a quick overview, you can go and watch the recording right now, I can wait.

RSocket

If you have watched the recording, you can skip this section. If you didn't I'll try to explain RSocket in few words. RSocket is a binary protocol inspired by Reactive Manifesto. It was conceived by Netflix and Facebook in an effort to create an alternative protocol for microservices communication, which would favor asynchronous messaging and provide more interaction models than request/response. Now the main contributor seems to be Netifi, which has build an entire platform on top of it.

Since I've learned about RSocket, they've proven to be the right tool for the job several times. Unfortunately, in every one of those scenarios, we ended up using Java for particular microservice as rolling our own RSocket implementation was simply too expensive. I'm hoping this is about to change.

RSocket.NET

At the beginning of this year, Netifi started contributing .NET implementation of RSocket. I wanted to take a look at it for some time, but building a server on top of it requires providing a hosting layer. Because of that (and limited time) I was constantly postponing my exploration of this library until preview 8 of ASP.NET Core 3.0 has been released. Preview 8 of ASP.NET Core 3.0 includes new networking primitives for non-HTTP Servers, which should be a great hosting layer for RSocket.NET. I've simplified one of Java clients I already had (because it was quick and Java is the most mature implementation of RSocket) and started working on a POC server.

Hosting RSocket.Net with Networking Primitives for Non-HTTP Servers

In the case of RSocket.NET, the server logic can be implemented by subclassing RSocketServer. The subclass can call a method like Respond or Stream in its constructor, to provide business logic behind interactions. A simple echo server for request/response and request/stream interactions can look like below.

internal class EchoServer : RSocketServer
{
    public EchoServer(IRSocketTransport transport, RSocketOptions options = default, int echoes = 2)
        : base(transport, options)
    {
        // Request/Response
        Respond(
            request => request,                                 // requestTransform
            request => AsyncEnumerable.Repeat(request, echoes), // producer
            result => result                                    // resultTransform
        );

        // Request/Stream
        Stream(
            request => request,                                 // requestTransform
            request => AsyncEnumerable.Repeat(request, echoes), // producer
            result => result                                    // resultTransform
        );
    }
}

An instance of RSocketServer represents a client connected through IRSocketTransport. This is the place where RSocket.NET and networking primitives should be glued together. In case of the new primitives provided by the ASP.NET Core team, a connected client is represented by ConnectionContext. ConnectionContext provides us with PipeReader and PipeWriter for input and output. This is exactly what IRSocketTransport expects, so a wrapper should be all we need.

internal class ConnectionListenerTransport : IRSocketTransport
{
    public PipeReader Input { get; }

    public PipeWriter Output { get; }

    public ConnectionListenerTransport(ConnectionContext connection)
    {
        Input = connection.Transport.Input;
        Output = connection.Transport.Output;
    }

    public Task StartAsync(CancellationToken cancel = default)
    {
        return Task.CompletedTask;
    }

    public Task StopAsync()
    {
        return Task.CompletedTask;
    }
}

The StartAsync and StopAsync methods require an explanation. Why I'm just returning completed tasks from them? ConnectionContext represents an active connection, so StartAsync has nothing to do. The same in the case of StopAsync as the connection will be managed outside, by the hosting layer.

The hosting layer can be implemented in the same way as in ASP.NET Core 3.0 Preview 8 announcement post - as a BackgroundService.

internal class RSocketHost : BackgroundService
{
    private readonly IConnectionListenerFactory _connectionListenerFactory;
    private readonly ILogger<RSocketHost> _logger;

    private IConnectionListener _connectionListener;

    public RSocketHost(IConnectionListenerFactory connectionListenerFactory, ILogger<RSocketHost> logger)
    {
        _connectionListenerFactory = connectionListenerFactory;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _connectionListener = await _connectionListenerFactory.BindAsync(new IPEndPoint(IPAddress.Loopback, 6000), stoppingToken);

        while (true)
        {
            ConnectionContext connection = await _connectionListener.AcceptAsync(stoppingToken);

            // AcceptAsync will return null upon disposing the listener
            if (connection == null)
            {
                break;
            }

            // TODO: Ensure all accepted connections are disposed prior to completing
            _ = Accept(connection);
        }
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        await _connectionListener.DisposeAsync();
    }

    private async Task Accept(ConnectionContext connection)
    {
        try
        {
            IRSocketTransport rsocketTransport = new ConnectionListenerTransport(connection);

            RSocketServer rsocketServer = new EchoServer(rsocketTransport);
            await rsocketServer.ConnectAsync();

            _logger.LogInformation("Connection {ConnectionId} connected", connection.ConnectionId);

            await connection.ConnectionClosed.WaitAsync();
        }
        catch (Exception e)
        {
            _logger.LogError(e, "Connection {ConnectionId} threw an exception", connection.ConnectionId);
        }
        finally
        {
            await connection.DisposeAsync();

            _logger.LogInformation("Connection {ConnectionId} disconnected", connection.ConnectionId);
        }
    }
}

The key part is the Accept method. In that method, every new connection is wrapped by ConnectionListenerTransport and then used to instantiate our EchoServer. Then the method waits for a connection to terminate.

This implementation (as it's only a POC) doesn't take care of graceful server shutdown. A production-ready implementation should make sure that, when StopAync on BackgroundService is called, all connections are disposed before ExecuteAsync exits.

The last thing to do is registering above BackgroundService and one of IConnectionListenerFactory implementations with a host.

public class Program
{
    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.TryAddSingleton<IConnectionListenerFactory, SocketTransportFactory>();

                services.AddHostedService<RSocketHost>();
            });
}

Now we can hit F5 and test it.

It Didn't Work...

The client didn't receive any response. From logs I could see that connection has been established. I could also verify that the client has sent a request (in my case a request/stream interaction), but there was no response from the server. I could give up there, but I really wanted this to work. So I've forked the RSocket.NET repository, opened the protocol specification, and started debugging. At the end (I think) I was able to identify the issue (typical for network-related code big endian to little endian omission), fix it, and see the responses being logged by the client. Yes!

Conclusion

The new networking primitives for non-HTTP servers look great. They open interesting capabilities and are easy to use. I'm not that happy with RSocket.NET. It seems to be missing some functionality and have bugs lurking around. I hope this is because it's still young. I would love to have RSocket available in my toolbox for .NET based microservices, so I will be keeping an eye on this project. Hopefully, the project will continue to grow and mature. If it does, I will be happy to contribute to it as well. If it doesn't, maybe I'll decide to work on my own implementation.

For those who want to play by themselves with what I've described here, I've put the demo project and client gist on GitHub.

Recently I've been tasked with building a PoC of Azure Functions based GraphQL service. I like tasks like this, especially if I can share my experience. I hope someone will benefit from this one.

Probably the most popular implementation of GraphQL for .NET is graphql-dotnet. It doesn't have a ready to use integration with Azure Functions, but it has a ready to use integration with ASP.NET Core. Taking into consideration that Azure Functions are built on top of ASP.NET Core and recently have been given support for dependency injection, that's the next best thing.

Building a GraphQL Azure Function

The core part of graphql-dotnet server implementation is available as a separated package: GraphQL.Server.Core. This is great. It gives us all the needed services without the strictly ASP.NET Core specific stuff (like middleware). This means that GraphQL set up for Azure Functions can be done in the same way as for ASP.NET Core (by registering dependency resolver, schema, services, and types).

[assembly: FunctionsStartup(typeof(Demo.Azure.Functions.GraphQL.Startup))]

namespace Demo.Azure.Functions.GraphQL
{
    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            builder.Services.AddScoped<IDependencyResolver>(serviceProvider => new FuncDependencyResolver(serviceProvider.GetRequiredService));
            builder.Services.AddScoped<StarWarsSchema>();

            builder.Services.AddGraphQL(options =>
            {
                options.ExposeExceptions = true;
            })
            .AddGraphTypes(ServiceLifetime.Scoped);
        }
    }
}

This makes the IGraphQLExecuter service available to us. This is the heart of GraphQL.Server.Core. If we provide an operation name, query, and variables to the ExecuteAsync method of this service, it will take care of all the processing. So, we can inject that service and build our function based on it.

public class GraphQLFunction
{
    private readonly IGraphQLExecuter<StarWarsSchema> _graphQLExecuter;

    public GraphQLFunction(IGraphQLExecuter<StarWarsSchema> graphQLExecuter)
    {
        _graphQLExecuter = graphQLExecuter ?? throw new ArgumentNullException(nameof(graphQLExecuter));
    }

    [FunctionName("graphql")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
        ILogger logger)
    {
        ...

        ExecutionResult executionResult = await _graphQLExecuter.ExecuteAsync(
            operationName,
            query,
            variables?.ToInputs(),
            null,
            req.HttpContext.RequestAborted
        );

        ...
    }
}

Now we are faced with two challenges. One is getting operation name, query, and variables out of request. The other is writing ExecutionResult to the response.

The challenging part of getting operation name, query, and variables out of request is that, depending on request method and content type, there are four different ways to do that. Putting all that code into the function would be unnecessary noise. This is why I've decided to extract that code into an extension method. This allows the function to be very clean.

public class GraphQLFunction
{
    ...

    [FunctionName("graphql")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
        ILogger logger)
    {
        ExecutionResult executionResult = await _graphQLExecuter.ExecuteAsync(request);

        ...
    }
}

I'm not putting the code of the extension method here, as there is nothing really special about it. You can find it on GitHub.

When it comes to writing ExecutionResult to the response, graphql-dotnet gives us IDocumentWriter service. We could use this service in our function to write directly to the response and then return something weird like null or EmptyResult, but again this would be quite ugly. It's better to write dedicated ActionResult. There is no problem with accessing services from ActionResult, so entire logic can be nicely encapsulated.

internal class GraphQLExecutionResult : ActionResult
{
    private const string CONTENT_TYPE = "application/json";

    private readonly ExecutionResult _executionResult;

    public GraphQLExecutionResult(ExecutionResult executionResult)
    {
        _executionResult = executionResult ?? throw new ArgumentNullException(nameof(executionResult));
    }

    public override Task ExecuteResultAsync(ActionContext context)
    {
        if (context == null)
        {
            throw new ArgumentNullException(nameof(context));
        }

        IDocumentWriter documentWriter = context.HttpContext.RequestServices.GetRequiredService<IDocumentWriter>();

        HttpResponse response = context.HttpContext.Response;
        response.ContentType = CONTENT_TYPE;
        response.StatusCode = StatusCodes.Status200OK;

        return documentWriter.WriteAsync(response.Body, _executionResult);
    }
}

In result, the function is only a couple lines long (even with some basic errors logging).

public class GraphQLFunction
{
    ...

    [FunctionName("graphql")]
    public async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get", "post", Route = null)] HttpRequest req,
        ILogger logger)
    {
        ExecutionResult executionResult = await _graphQLExecuter.ExecuteAsync(request);

        if (executionResult.Errors != null)
        {
            logger.LogError("GraphQL execution error(s): {Errors}", executionResult.Errors);
        }

        return new GraphQLExecutionResult(executionResult);
    }
}

Nice, let's see if it works. In my case, it didn't...

After preparing a request in Postman and sending it, as a response I received following error.

GraphQL.ExecutionError: Object reference not set to an instance of an object. ---> System.NullReferenceException: Object reference not set to an instance of an object.
    at GraphQL.DocumentExecuter.ExecuteAsync(ExecutionOptions options)

Well, this doesn't say much. I've spent some time going through DocumentExecuter code and concluded that the most likely cause of NullReferenceException is one if it's constructor dependencies being null. The DocumentExecuter has two constructors. One allows for providing dependencies and the other uses defaults. The AddGraphQL registers DocumentExecuter, but none of its dependencies. It would suggest that it expects that the parameterless constructor will be used, but it's not what is happening. As AddGraphQL uses TryAddSingleton to register DocumentExecuter, it should be enough to register it earlier and enforce the correct constructor.

[assembly: FunctionsStartup(typeof(Demo.Azure.Functions.GraphQL.Startup))]

namespace Demo.Azure.Functions.GraphQL
{
    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            ...

            builder.Services.AddSingleton<IDocumentExecuter>(new DocumentExecuter());
            builder.Services.AddGraphQL(options =>
            {
                options.ExposeExceptions = true;
            })
            .AddGraphTypes(ServiceLifetime.Scoped);
        }
    }
}

Another attempt at sending a request and I saw the desired response. It works!

Adding Cosmos DB to the Mix

A database which goes well with Azure Functions is Azure Cosmos DB. It would be nice if we could use it in our GraphQL service. After all, it should only be a matter of registering DocumentClient as a service. It would also be nice if we could reuse the Azure Cosmos DB bindings configuration. I ended up with the below method.

public class Startup : FunctionsStartup
{
    public override void Configure(IFunctionsHostBuilder builder)
    {
        builder.Services.AddSingleton<IDocumentClient>(serviceProvider => {
            DbConnectionStringBuilder cosmosDBConnectionStringBuilder = new DbConnectionStringBuilder
            {
                ConnectionString = serviceProvider.GetRequiredService<IConfiguration>()["CosmosDBConnection"]
            };

            if (cosmosDBConnectionStringBuilder.TryGetValue("AccountKey", out object accountKey)
                && cosmosDBConnectionStringBuilder.TryGetValue("AccountEndpoint", out object accountEndpoint))
            {
                return new DocumentClient(new Uri(accountEndpoint.ToString()), accountKey.ToString());

            }

            return null;
        });

        ...
    }
}

It grabs the setting with connection string from IConfiguration, uses DbConnectionStringBuilder to get the needed attributes, and creates a new instance of DocumentClient. Now DocumentClient can be injected into query class and used to fetch the data.

internal class PlanetQuery: ObjectGraphType
{
    ...

    public PlanetQuery(IDocumentClient documentClient)
    {
        Field<ListGraphType<PlanetType>>(
            "planets",
            resolve: context => documentClient.CreateDocumentQuery<Planet>(_planetsCollectionUri, _feedOptions)
        );
    }
}

What Else?

As part of my PoC I've also experimented with a data loader, as it's crucial for GraphQL performance. I didn't play with more complex queries and mutations, but it doesn't seem like there should be any issue. You can find the complete demo here.

Recently I've stumbled upon a blog post describing how to use Fetch API, Streams API, and NDJSON to improve user experience by streaming JSON objects from server to client. I immediately thought about a couple of places in projects I'm working on, which could benefit from that. In result, I've decided to create a POC with ASP.NET Core as a backend.

The mentioned blog post describes the client side pretty well, so I won't be focusing on it too much. If you want to get familiar with it before reading about the server side, go ahead, I'll wait.

Initial Scenario

The thing I usually find to be hard when starting work on a POC is a scenario which doesn't obfuscate the actual task. In this case I've decided to use weather forecasts idea from "ASP.NET Core application with Angular" template. Imagine a controller which returns the weather forecast for the next couple of days.

[Route("api/[controller]")]
[ApiController]
public class WeatherForecastsController : Controller
{
    public class WeatherForecast
    {
        ...
    }

    [HttpGet]
    public async Task<IEnumerable<WeatherForecast>> Get()
    {
        List<WeatherForecast> weatherForecasts = new List<WeatherForecast>();

        Random rng = new Random();

        for (int index = 1; index <= 10; index++)
        {
            await Task.Delay(100);

            weatherForecasts.Add(CreateWeatherForecast(index, rng));
        };

        return weatherForecasts;
    }

    private static WeatherForecast CreateWeatherForecast(int index, Random rng)
    {
        return new WeatherForecast
        {
            ...
        };
    }
}

I've omitted some details, which are not so important (don't worry, I will provide a link to complete demo at the end). The Task.Delay call simulates expensive server-side processing needed to generate every forecast.

On the client-side, there is a function which fetches the data from the above action and renders a table based on them.

function fetchWeatherForecasts() {
    clearWeatherForecasts();

    fetch('api/WeatherForecasts')
        .then(function (response) {
            return response.json();
        })
        .then(function (weatherForecasts) {
            weatherForecasts.forEach(appendWeatherForecast);
        });
}

This function is being called on a click of a button. The result is easy to guess. First, the current content of the table is cleared, and then the new content takes about a second to appear. Improving the user experience regarding that one-second delay is the goal of this POC.

Streaming NDJSON

How should an API for streaming objects from controller action look like? There is one similar scenario in ASP.NET Core - streaming from SignalR hub. Attempting to provide similar API in a controller could look like this.

...
public class WeatherForecastsController : Controller
{
    ...

    [HttpGet("stream")]
    public NdjsonStreamResult GetStream()
    {
        NdjsonStreamResult result = new NdjsonStreamResult();

        _ = StreamAsync(result);

        return result;
    }

    private async Task StreamAsync(NdjsonStreamResult result)
    {
        Random rng = new Random();

        for (int index = 1; index <= 10; index++)
        {
            await Task.Delay(100);

            await result.WriteAsync(CreateWeatherForecast(index, rng));
        };

        result.Complete();
    }

    ...
}

In this approach, writing an object to the NdjsonStreamResult should result in that object being immediately serialized and sent to the client. Once all objects are written, the result needs to be notified that it should complete. Pretty simple, the only challenge is NdjsonStreamResult implementation.

Typically, in ASP.NET Core, ActionResult implementations are offloading the actual processing to executors, which are usually singleton services. This case is different. There is a response specific state and constant interaction. It seems better to make the result an orchestrator. The thing that result will be orchestrating should be a writer capable of serializing the objects to the response. Instantiation of that writer is not a trivial task, so it would be nice to separate it as well, for example by introduction of a factory. Let's take a look at those three objects, starting from the bottom.

The writer should be a wrapper around response stream writer with NDJSON serialization capabilities. NDJSON is still JSON, it just serializes every object separately, puts a new line between them, and flushes after every object.

public class NdjsonTextWriter : INdjsonTextWriter
{
    private readonly TextWriter _textResponseStreamWriter;
    private readonly JsonTextWriter _jsonResponseStreamWriter;
    private readonly JsonSerializer _jsonSerializer;

    public NdjsonTextWriter(TextWriter textResponseStreamWriter, JsonSerializerSettings jsonSerializerSettings,
        JsonArrayPool<char> jsonArrayPool)
    {
        _textResponseStreamWriter = textResponseStreamWriter;

        _jsonResponseStreamWriter = new JsonTextWriter(textResponseStreamWriter)
        {
            ArrayPool = jsonArrayPool,
            CloseOutput = false,
            AutoCompleteOnClose = false
        };

        _jsonSerializer = JsonSerializer.Create(jsonSerializerSettings);
    }

    public async Task WriteAsync(object value)
    {
        _jsonSerializer.Serialize(_jsonResponseStreamWriter, value);
        await _textResponseStreamWriter.WriteAsync("\n");
        await _textResponseStreamWriter.FlushAsync();
    }

    public void Dispose()
    {
        _textResponseStreamWriter?.Dispose();
        ((IDisposable)_jsonResponseStreamWriter)?.Dispose();
    }
}

You might be wondering what JsonArrayPool is. It's an ASP.NET Core implementation of JSON.NET IArrayPool. After all, this is still JSON serialization and ASP.NET Core has a lot of infrastructure for it. It would be a waste not to use it. This also means that the factory should make use of MvcJsonOptions so serialization configuration is shared.

internal class NdjsonTextWriterFactory : INdjsonTextWriterFactory
{
    private static readonly string DEFAULT_CONTENT_TYPE = new MediaTypeHeaderValue("application/x-ndjson")
    {
        Encoding = Encoding.UTF8
    }.ToString();

    private readonly IHttpResponseStreamWriterFactory _httpResponseStreamWriterFactory;
    private readonly MvcJsonOptions _options;
    private readonly JsonArrayPool<char> _jsonArrayPool;

    public NdjsonTextWriterFactory(IHttpResponseStreamWriterFactory httpResponseStreamWriterFactory, IOptions<MvcJsonOptions> options,
        ArrayPool<char> innerJsonArrayPool)
    {
        // Null checks removed for brevity
        ...

        _jsonArrayPool = new JsonArrayPool<char>(innerJsonArrayPool);
    }

    public INdjsonTextWriter CreateWriter(ActionContext context, NdjsonStreamResult result)
    {
        // Null checks removed for brevity
        ...

        HttpResponse response = context.HttpContext.Response;

        ResponseContentTypeHelper.ResolveContentTypeAndEncoding(result.ContentType, response.ContentType, DEFAULT_CONTENT_TYPE,
            out var resolvedContentType, out var resolvedContentTypeEncoding);
        response.ContentType = resolvedContentType;

        if (result.StatusCode != null)
        {
            response.StatusCode = result.StatusCode.Value;
        }

        DisableResponseBuffering(context.HttpContext);

        return new NdjsonTextWriter(_httpResponseStreamWriterFactory.CreateWriter(response.Body, resolvedContentTypeEncoding),
            _options.SerializerSettings, _jsonArrayPool);
    }

    private static void DisableResponseBuffering(HttpContext context)
    {
        IHttpBufferingFeature bufferingFeature = context.Features.Get<IHttpBufferingFeature>();
        if (bufferingFeature != null)
        {
            bufferingFeature.DisableResponseBuffering();
        }
    }
}

The CreateWriter method does what you've probably already expected. It grabs the response, resolves the value of Content-Type header, sets a status code, with help of IHttpResponseStreamWriterFactory creates an instance of response stream writer, and instantiates NdjsonTextWriter. There is one extra thing - disabling response buffering. This is important, we don't want objects to stuck in response buffer instead of being sent to the client immediately.

With writer and factory in place, it's time to put everything together by implementing the NdjsonStreamResult. There are some traps here. First is waiting for information about completion. The ExecuteResultAsync method cannot finish before it's certain that all objects have been sent. The second trap is synchronization. It is possible that first call to WriteAsync will happen before the ExecuteResultAsync has been called or that it didn't finish preparing the writer. In such a case, WriteAsync should wait. So there are two places where we need to wait for something. As we want to make the whole implementation thread pool friendly, it might be a good idea to use TaskCompletionSource for both.

public class NdjsonStreamResult : ActionResult, IStatusCodeActionResult
{
    private INdjsonTextWriter _ndjsonTextWriter;
    private readonly TaskCompletionSource<bool> _readyTaskCompletionSource = new TaskCompletionSource<bool>();
    private readonly TaskCompletionSource<bool> _completeTaskCompletionSource = new TaskCompletionSource<bool>();

    public string ContentType { get; set; }

    public int? StatusCode { get; set; }

    public override async Task ExecuteResultAsync(ActionContext context)
    {
        // Null checks removed for brevity
        ...

        INdjsonTextWriterFactory ndjsonTextWriterFactory = context.HttpContext.RequestServices.GetRequiredService<INdjsonTextWriterFactory>();
        using (_ndjsonTextWriter = ndjsonTextWriterFactory.CreateWriter(context, this))
        {
            _readyTaskCompletionSource.SetResult(true);

            await _completeTaskCompletionSource.Task;
        }
    }

    public async Task WriteAsync(object value)
    {
        if (!_readyTaskCompletionSource.Task.IsCompletedSuccessfully)
        {
            await _readyTaskCompletionSource.Task;
        }

        await _ndjsonTextWriter.WriteAsync(value);
    }

    public void Complete()
    {
        _completeTaskCompletionSource.SetResult(true);
    }
}

This should do the trick. The last thing left is modifying the client side code.

function fetchWeatherForecastsStream() {
    clearWeatherForecasts();

    fetch('api/WeatherForecasts/stream')
        .then(function (response) {
            const weatherForecasts = response.body
                .pipeThrough(new TextDecoderStream())
                .pipeThrough(parseNDJSON());

            readWeatherForecastsStream(weatherForecasts.getReader());
        });
}

function parseNDJSON() {
    let ndjsonBuffer = '';

    return new TransformStream({
        transform: function(ndjsonChunk, controller) {
            ndjsonBuffer += ndjsonChunk;

            const jsonValues = ndjsonBuffer.split('\n');
            jsonValues.slice(0, -1).forEach(function (jsonValue) { controller.enqueue(JSON.parse(jsonValue)); });

            ndjsonBuffer = jsonValues[jsonValues.length - 1];
        },
        flush: function(controller) {
            if (ndjsonBuffer) {
                controller.enqueue(JSON.parse(ndjsonBuffer));
            }
        }
    });
}

function readWeatherForecastsStream(weatherForecastsStreamReader) {
    weatherForecastsStreamReader.read()
        .then(function (result) {
            if (!result.done) {
                appendWeatherForecast(result.value);

                readWeatherForecastsStream(weatherForecastsStreamReader);
            }
        });
}

The code above is entirely based on the blog post I've mentioned in the beginning.

So what happens when the fetchWeatherForecastsStream function is called on a click of a button? It still takes about a second for the entire table to appear, but it starts appearing almost immediately. The user sees the data appearing row by row and is able to start reading them before the whole processing is done. This improves the user experience and perceived performance of the application.

This Is Cool

Yes, this is cool. Of course, it shouldn't be blindly used everywhere. The use case must be suitable (sometimes we need to wait for all the data) and there must be a place for improvement. The streaming approach might bring some improvements if the response is simply large (by avoiding long serialization and deserialization). It will shine in scenarios with noticeable per item fetching or processing. But for small, quick to generate response it probably be an overkill.

The above implementation is a POC. There is probably a place for improvements. There also might be a bug hiding somewhere. But if you want to play with it (which I strongly encourage), the complete source code is available on GitHub.

When using ASP.NET Core SignalR, we can perform invocations which don't return anything, invocations which return a result, and invocations which results in a stream of results. Unfortunately, invocations which return a result or a stream of results are available only when invoking server from a client. If a server wants to invoke a client, the only option is invocation which doesn't return anything. ASP.NET Core SignalR is supposed to bring streaming from client to server but again only as part of an invocation from client to a server. Sometimes there are scenarios where we would like a client to be able to respond to invocation - a server-to-client remote procedure call (RPC) with a result.

What We Want

ASP.NET Core has a concept of strongly typed hubs which allows for representing client methods as an interface. For a server-to-client RPC with a result scenario, such an interface should look like below.

public interface IRpc
{
    Task<MethodResponse> MethodCall(MethodParams methodParams);
}

With the following, corresponding, strongly typed hub.

public class RpcHub : Hub<IRpc>
{
    ...
}

The hub doesn't need to have any methods. Of course, we want to handle connection events to maintain a list of currently connected users, but if a trigger is something from our infrastructure (message from a queue, change from a database, etc.) that's all we need. Let's assume that's exactly the case, that clients methods are being invoked from a BackgroundService which is listening for that trigger. Below is a code of such BackgroundService limited to SignalR aspects.

public class RpcCallerBackgroundService : BackgroundService
{
    private readonly IHubContext<RpcHub, IRpc> _rpcHubContext;

    public RpcCallerBackgroundService(IHubContext<RpcHub, IRpc> rpcHubContext)
    {
        _rpcHubContext = rpcHubContext;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (...)
        {
            ...

            MethodResponse response = await _rpcHubContext.Clients.User(userId).MethodCall(new MethodParams
            {
                ...
            });
        }
    }
}

All the code so far we can write right now. It will even compile. Sadly it won't work. If you would put this code into your application, you would get a System.InvalidOperationException upon startup with a message more or less like this:

Cannot generate proxy implementation for 'IRpc.MethodCall'. All client proxy methods must return 'System.Threading.Tasks.Task'.

So, on the server side, we can write the code we want but it will result in an exception. How about the client side?

Imagine that a client is a generic host based worker service, which is using a BackgroundService to maintain a SignalR connection with a server. Implementation of such BackgroundService for our IRpc interface could be something like this.

public class RpcClientBackgroundService : BackgroundService
{
    private readonly HubConnection _rpcResponseHubConnection;

    public RpcClientBackgroundService()
    {
        _rpcResponseHubConnection = new HubConnectionBuilder()
            .WithUrl("...")
            .Build();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        IDisposable _methodCallHandler = _rpcResponseHubConnection.On(nameof(IRpc.MethodCall), new[] { typeof(MethodParams) },
            async (methodParamsArray) =>
            {
                ...
            });

        await _rpcResponseHubConnection.StartAsync();

        await WaitForCancellationAsync(stoppingToken);

        _methodCallHandler.Dispose();

        await _rpcResponseHubConnection.DisposeAsync();
    }

    ...
}

Here we can't even implement what we want. The only overload of On method which takes a Func, forces return type to be a Task. Other overloads take an Action.

I've spent a lot of lines on proving that we can't have what we want. Is there something we can have?

What We Can Have

The only way for a client to return something to a server is to invoke it back. So the only possible approach, from SignalR perspective, is to have two invocations, which we can represent by two interfaces.

public interface IRpcCalls
{
    Task MethodCall(MethodParams methodParams);
}

public interface IRpcResponseHandlers
{
    Task MethodResponseHandler(MethodResponse response);
}

Another consequence is that we will have to correlate those invocations. We need something in payloads that will allow us to do that.

public class MethodParams
{
    public Guid MethodCallId { get; set; }

    ...
}

public class MethodResponse
{
    public Guid MethodCallId { get; set; }

    ...
}

Now we can change the client BackgroundService to perform an invocation of MethodResponseHandler when the processing of MethodCall is finished.

public class RpcBackgroundService : BackgroundService
{
    ...

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        IDisposable _methodCallHandler = _rpcResponseHubConnection.On<MethodParams>(nameof(IRpcCalls.MethodCall),
            methodParams =>
            {
                ...

                _rpcResponseHubConnection.InvokeAsync(nameof(IRpcResponseHandlers.MethodResponseHandler), new MethodResponse
                {
                    MethodCallId = methodParams.MethodCallId,
                    ...
                });
            });

        ...
    }

    ...
}

That was the easy part. Changing the server part is a little bit harder. The BackgroundService on the server side will not be able to use IHubContext directly, we need to introduce something that will hide the split into two invocations. We want that something to be similar to IHubContext in a way that it's a generic, where the type of hub is used as a type parameter. This way we can enforce that hub to implement IRpcResponseHandlers interface and to be a strongly typed one with client methods represented by IRpcCalls interface.

public interface IRpcCaller<THub> : IRpc, IRpcResponseHandlers
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{ }

public class RpcCaller<THub> : IRpcCaller
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    private readonly IHubContext<THub, IRpcCalls> _hubContext;

    public RpcCaller(IHubContext<THub, IRpcCalls> hubContext)
    {
        _hubContext = hubContext;
    }

    ...
}

Why IRpcCaller is composed of IRpc and IRpcResponseHandler? The IRpc will be needed by the BackgroundService, but we also need a way for the hub to pass the invocation response to IRpcCaller.

Before we can start implementation, we also need to make a change to IRpc interface. As the implementation will hide IHubContext, the user identifier needs to be passed as a parameter.

public interface IRpc
{
    Task<MethodResponse> MethodCall(string userId, MethodParams methodParams);
}

So, how do we synchronize those two invocations and make the method wait until the response is available? One wait to implement TAP (Task-based Asynchronous Pattern) is to do it manually with the help of TaskCompletionSource. As we can have a number of simultaneous calls, we will need a ConcurrentDictionary of those. Whenever a new call is made we will create a TaskCompletionSource, add it to the dictionary, and use its Task.

public class RpcCaller<THub> : IRpcCaller
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    ...
    private readonly ConcurrentDictionary<Guid, TaskCompletionSource<MethodResponse>> _pendingMethodCalls =
        new ConcurrentDictionary<Guid, TaskCompletionSource<MethodResponse>>();

    ...

    public async Task<MethodResponse> MethodCall(string userId, MethodParams methodParams)
    {
        methodParams.MethodCallId = Guid.NewGuid();

        TaskCompletionSource<MethodResponse> methodCallCompletionSource = new TaskCompletionSource<MethodResponse>();
        if (_pendingMethodCalls.TryAdd(methodParams.MethodCallId, methodCallCompletionSource))
        {
            await _hubContext.Clients.User(userId).MethodCall(methodParams);

            return await methodCallCompletionSource.Task;
        }

        throw new Exception("Couldn't call the method.");
    }

    ...
}

Here comes the IRpcResponseHandler part. Calling the MethodResponseHandler will remove the TaskCompletionSource from the dictionary and will set its result.

public class RpcCaller<THub> : IRpcCaller
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    ...

    public Task MethodResponseHandler(MethodResponse response)
    {
        if (_pendingMethodCalls.TryRemove(response.MethodCallId, out TaskCompletionSource<MethodResponse> methodCallCompletionSource))
        {
            methodCallCompletionSource.SetResult(response);
        }

        return Task.CompletedTask;
    }
}

One very important thing is to remember that we must be sure that the user with the specified identifier is connected. SignalR does nothing when User(userId).MethodCall is called for user which is not connected. It will not return any error or throw an exception. In this scenario, this might be a source of potential issue. In such a case, the TaskCompletionSource will be created and its Task used, but that Task will never complete.

With IRpcCaller ready, the hub can be changed to comply with requirements.

public class RpcHub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    private readonly IRpcCaller<RpcHub> _rpcCaller;

    public RpcHub(IRpcCaller<RpcHub> rpcCaller)
    {
        _rpcCaller = rpcCaller;
    }

    public Task MethodResponseHandler(MethodResponse response)
    {
        return _rpcCaller.MethodResponseHandler(response);
    }

    ...
}

What's remaining is changes in server BackgroundService, which essentially means replacing IHubContext with IRpcCaller.

public class RpcCallerBackgroundService : BackgroundService
{
    private readonly IRpcCaller<RpcHub> _rpcCaller;

    public RpcCallerBackgroundService(IRpcCaller<RpcHub> rpcCaller)
    {
        _rpcCaller = rpcCaller;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (...)
        {
            ...

            MethodResponse response = await _rpcCaller.MethodCall(userId, new MethodParams
            {
                ...
            });
        }
    }
}

This is what we can have.

We Can't Have Nice Things

Is this a nice thing? Well not really. In order to enforce as much as possible, it's more complicated than it should be. And it still leaves room for human errors. But it gets the job done and in scenarios where enforcing interfaces implementation is not that important (for example interfaces are highly unlikely to change), it can be simplified.

It's been a while since I've written my last post in Push Notifications and ASP.NET Core series. Since that time I've received a number of questions about more uncommon aspects of Push Notifications, so I've decided I need to write another one.

In this post, I'm going to focus on special cases which a production-ready application using push notifications should be able to handle.

Handling Out-Of-Control Subscription Changes on the Client Side

There are several things which can happen to a push subscription outside of the application's control. For example, it can be refreshed, lost, expired or permissions can be revoked. When something like this happens, the browser should fire a pushsubscriptionchange event on service worker registration, to inform it about the change. Service worker should use information provided by this event to update the server.

The pushsubscriptionchange event is often misused. This is because its definition has changed. In the beginning, the event was defined to fire when a subscription has expired. Because of that, many sample implementations available on the web are limited to resubscribe attempt. Currently, this is not the correct approach. The event has two properties: newSubscription and oldSubscription. Depending on those properties values, different actions should be taken.

self.addEventListener('pushsubscriptionchange', function (event) {
    const handlePushSubscriptionChangePromise = Promise.resolve();

    if (event.oldSubscription) { }

    if (event.newSubscription) { }

    if (!event.newSubscription) { }

    event.waitUntil(handlePushSubscriptionChangePromise);
});

The value of oldSubscription represents a push subscription that is no longer valid. The subscription should be removed from the server. This is not a bulletproof mechanism, the value may be null if the browser was not able to provide the full set of details.

self.addEventListener('pushsubscriptionchange', function (event) {
    const handlePushSubscriptionChangePromise = Promise.resolve();

    if (event.oldSubscription) {
        handlePushSubscriptionChangePromise = handlePushSubscriptionChangePromise.then(function () {
            return fetch('push-notifications-api/subscriptions?endpoint=' + encodeURIComponent(event.oldSubscriptio.endpoint), {
                method: 'DELETE'
            });
        });
    }

    ...
});

The value of newSubscription represents a new valid push subscription. If the value is there, it should be sent to the server. But, similar to oldSubscription, the value may be null. This means that the browser didn't establish a new subscription. At this point, the code can attempt to resubscribe after retrieving public VAPID key from the server (the key change is often why browser couldn't establish new subscription). The attempt may fail (for example the reason for triggering the event was user revoking the permissions). In such a case, there is nothing more that can be done.

self.addEventListener('pushsubscriptionchange', function (event) {
    const handlePushSubscriptionChangePromise = Promise.resolve();

    ...

    if (event.newSubscription) {
        handlePushSubscriptionChangePromise = handlePushSubscriptionChangePromise.then(function () {
            return fetch('push-notifications-api/subscriptions', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify(pushSubscription)
            });
        });
    }

    if (!event.newSubscription) {
        handlePushSubscriptionChangePromise = handlePushSubscriptionChangePromise.then(function () {
            return fetch('push-notifications-api/public-key').then(function (response) {
                if (response.ok) {
                    return response.text().then(function (applicationServerPublicKeyBase64) {
                        return urlB64ToUint8Array(applicationServerPublicKeyBase64);
                    });
                } else {
                    return Promise.reject(response.status + ' ' + response.statusText);
                }
            }).then(function (applicationServerPublicKey) {
                return pushServiceWorkerRegistration.pushManager.subscribe({
                    userVisibleOnly: true,
                    applicationServerKey: applicationServerPublicKey
                }).then(function (pushSubscription) {
                    return fetch('push-notifications-api/subscriptions', {
                        method: 'POST',
                        headers: { 'Content-Type': 'application/json' },
                        body: JSON.stringify(pushSubscription)
                    });
                });
            });
        });
    }

    event.waitUntil(handlePushSubscriptionChangePromise);
});

Handling Out-Of-Control Subscription Changes on the Server Side

The pushsubscriptionchange event is not a silver bullet to all potential issues. Even if it's triggered, it may not provide the oldSubscription. In general, it may happen that application will send a notification to removed, expired or otherwise invalid subscription. If that happens, Push Service will respond with 410 or 404 status. The application must be prepared for such a response and discard the subscription. In case of Lib.Net.Http.WebPush failed requests end up as PushServiceClientException. It's enough to catch this exception, check StatusCode property and act accordingly.

internal class PushServicePushNotificationService : IPushNotificationService
{
    ...

    public async Task SendNotificationAsync(PushSubscription subscription, PushMessage message, CancellationToken cancellationToken)
    {
        try
        {
            await _pushClient.RequestPushMessageDeliveryAsync(subscription, message, cancellationToken);
        }
        catch (Exception ex)
        {
            await HandlePushMessageDeliveryExceptionAsync(ex, subscription);
        }
    }

    private async Task HandlePushMessageDeliveryExceptionAsync(Exception exception, PushSubscription subscription)
    {
        PushServiceClientException pushServiceClientException = exception as PushServiceClientException;

        if (pushServiceClientException is null)
        {
            _logger?.LogError(exception, "Failed requesting push message delivery to {0}.", subscription.Endpoint);
        }
        else
        {
            if ((pushServiceClientException.StatusCode == HttpStatusCode.NotFound)
                || (pushServiceClientException.StatusCode == HttpStatusCode.Gone))
            {
                // Remove subcription from store
                ...

                _logger?.LogInformation("Subscription has expired or is no longer valid and has been removed.");
            }
        }
    }
}

The implementation may get a little bit complicated. Notifications are usually sent outside of request scope, from singleton services, and often with a fire-and-forget approach. If storage service is a scoped one (which is typical when Entity Framework is being used), this will enforce additional code to manage scope and storage service instance. This strongly depends on the particular project approach, but usually can be solved without too much trouble.

Dealing with Rate Limiting

One more common problem when using push notifications is reaching a rate limit with a push service. When this happens push service responds with 429 Too Many Requests which should include a Retry-After header. What application should do is wait given period of time and resend the notification. I believe this kind of functionality should be built into a client. This is why I've added it to Lib.Net.Http.WebPush. Lib.Net.Http.WebPush determines if an attempt to resend should be made based on status code and Retry-After header presence.

private bool ShouldRetryAfter(HttpResponseMessage pushMessageDeliveryRequestResponse, out TimeSpan delay)
{
    delay = TimeSpan.MinValue;

    if ((pushMessageDeliveryRequestResponse.StatusCode != (HttpStatusCode)429) || !AutoRetryAfter)
    {
        return false;
    }

    if ((pushMessageDeliveryRequestResponse.Headers.RetryAfter is null)
        || (!pushMessageDeliveryRequestResponse.Headers.RetryAfter.Date.HasValue && !pushMessageDeliveryRequestResponse.Headers.RetryAfter.Delta.HasValue))
    {
        return false;
    }

    if (pushMessageDeliveryRequestResponse.Headers.RetryAfter.Delta.HasValue)
    {
        delay = pushMessageDeliveryRequestResponse.Headers.RetryAfter.Delta.Value;
    }

    if (pushMessageDeliveryRequestResponse.Headers.RetryAfter.Date.HasValue)
    {
        delay = pushMessageDeliveryRequestResponse.Headers.RetryAfter.Date.Value.Subtract(DateTimeOffset.UtcNow);
    }

    return true;
}

The check is used to create an optional waiting loop while sending a notification.

public async Task RequestPushMessageDeliveryAsync(PushSubscription subscription, PushMessage message, VapidAuthentication authentication,
    VapidAuthenticationScheme authenticationScheme, CancellationToken cancellationToken)
{
    HttpRequestMessage pushMessageDeliveryRequest =
        PreparePushMessageDeliveryRequest(subscription, message, authentication, authenticationScheme);

    HttpResponseMessage pushMessageDeliveryRequestResponse =
        await _httpClient.SendAsync(pushMessageDeliveryRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken);

    while (ShouldRetryAfter(pushMessageDeliveryRequestResponse, out TimeSpan delay))
    {
        await Task.Delay(delay, cancellationToken);

        pushMessageDeliveryRequest =
            SetAuthentication(pushMessageDeliveryRequest, subscription, authentication ?? DefaultAuthentication, authenticationScheme);

        pushMessageDeliveryRequestResponse =
            await _httpClient.SendAsync(pushMessageDeliveryRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken);
    }

    HandlePushMessageDeliveryRequestResponse(pushMessageDeliveryRequest, pushMessageDeliveryRequestResponse, cancellationToken);
}

There is one small, important detail here. For every retry, the authentication is set again. This avoids a situation where JWT has expired while waiting for retry.

Footnote

I've updated my demo project with all the above changes, you can grab it here. I hope it will help you to use push notifications in your own applications.

One of my talks is talk about native real-time technologies like WebSockets, Server-Sent Events, and Web Push in ASP.NET Core. In that talk, I briefly go through the subject of scaling applications based on those technologies. For Web Push, I mention that it can be scaled with help of microservices or functions, because it doesn't require an active HTTP request/response. After the talk, I'm frequently asked for some samples of how to do that. This post is me finally putting one together.

Sending Web Push Notifications from Azure Functions

I've written a series of posts about Web Push Notifications. In general, it's good to understand one level deeper than what you're working at (so I encourage you to go through that series), but for this post, it's enough to know that you can grab a client from NuGet. This client is all that is needed to send a notification from Azure Function, so let's quickly create one.

The function will use Azure Cosmos DB as a data source. Whenever an insert or update happens in NotificationsCollection, the function will be triggered.

public static class SendNotificationFunction
{
    [FunctionName("SendNotification")]
    public static void Run(
        [CosmosDBTrigger("PushNotifications", "NotificationsCollection",
        LeaseCollectionName = "NotificationsLeaseCollection", CreateLeaseCollectionIfNotExists = true,
        ConnectionStringSetting = "CosmosDBConnection")]
        IReadOnlyList<PushMessage> notifications)
    {

    }
}

You may notice that it's using an extension for Azure Cosmos DB Trigger described in my previous post, which allows for taking a collection of POCOs as an argument.

The second piece of data kept in Cosmos DB is subscriptions. The function will need to query all subscriptions to send notifications. This is best done by using DocumentClient.

public static class SendNotificationFunction
{
    private static readonly Uri _subscriptionsCollectionUri =
        UriFactory.CreateDocumentCollectionUri("PushNotifications", "SubscriptionsCollection");

    [FunctionName("SendNotification")]
    public static void Run(
        ...,
        [CosmosDB("PushNotifications", "SubscriptionsCollection", ConnectionStringSetting = "CosmosDBConnection")]
        DocumentClient client)
    {
        if (notifications != null)
        {
            IDocumentQuery<PushSubscription> subscriptionQuery =
                client.CreateDocumentQuery<PushSubscription>(_subscriptionsCollectionUri, new FeedOptions
                {
                    EnableCrossPartitionQuery = true,
                    MaxItemCount = -1
                }).AsDocumentQuery();
        }
    }
}

Now the function is almost ready to send notifications. Last missing part is PushServiceClient. An instance of PushServiceClient is internally holding an instance of HttpClient. This means that improper instantiation antipattern must be taken into consideration. The simplest approach is to create a static instance.

public static class SendNotificationFunction
{
    ...
    private static readonly PushServiceClient _pushClient = new PushServiceClient
    {
        DefaultAuthentication = new VapidAuthentication(
            "<Application Server Public Key>",
            "<Application Server Private Key>")
        {
            Subject = "<Subject>"
        }
    };

    [FunctionName("SendNotification")]
    public static void Run(...)
    {
        ...
    }
}

Sending notifications is now nothing more than calling RequestPushMessageDeliveryAsync for every combination of subscription and notification.

public static class SendNotificationFunction
{
    ...

    [FunctionName("SendNotification")]
    public static async Task Run(...)
    {
        if (notifications != null)
        {
            ...

            while (subscriptionQuery.HasMoreResults)
            {
                foreach (PushSubscription subscription in await subscriptionQuery.ExecuteNextAsync())
                {
                    foreach (PushMessage notification in notifications)
                    {
                        // Fire-and-forget
                        _pushClient.RequestPushMessageDeliveryAsync(subscription, notification);
                    }
                }
            }
        }
    }
}

And that's it. A very simple Azure Function taking care of broadcast notifications.

Improvements, Improvements ...

The code above can be better. I'm not thinking about the Cosmos DB part (although there are ways to better utilize partitioning or implement fan-out approach with help of durable functions). The usage of PushServiceClient is far from perfect. Static instance makes configuration awkward and may cause issues due to static HttpClient not respecting DNS changes. In the past, I've written about using HttpClientFactory and making HttpClient injectable in Azure Functions. The exact same approach can be used here. I will skip the boilerplate code, it's described in that post. I'll focus only on things specific to PushServiceClient.

First thing is an attribute. It will allow taking PushServiceClient instance as a function parameter. It will also help solve the configuration problem, by providing properties for names of settings with needed values.

[Binding]
[AttributeUsage(AttributeTargets.Parameter)]
public sealed class PushServiceAttribute : Attribute
{
    [AppSetting]
    public string PublicKeySetting { get; set; }

    [AppSetting]
    public string PrivateKeySetting { get; set; }

    [AppSetting]
    public string SubjectSetting { get; set; }
}

The second thing is a converter which will use HttpClientFactory and instance of PushServiceAttribute to create a properly initialized PushServiceClient. Luckily PushServiceClient has a constructor which takes HttpClient instance as a parameter, so it's quite simple.

internal class PushServiceClientConverter : IConverter<PushServiceAttribute, PushServiceClient>
{
    private readonly IHttpClientFactory _httpClientFactory;

    public PushServiceClientConverter(IHttpClientFactory httpClientFactory)
    {
        _httpClientFactory = httpClientFactory ?? throw new ArgumentNullException(nameof(httpClientFactory));
    }

    public PushServiceClient Convert(PushServiceAttribute attribute)
    {
        return new PushServiceClient(_httpClientFactory.CreateClient())
        {
            DefaultAuthentication = new VapidAuthentication(attribute.PublicKeySetting, attribute.PrivateKeySetting)
            {
                Subject = attribute.SubjectSetting
            }
        };
    }
}

The last thing is IExtensionConfigProvider which will add a binding rule for PushServiceAttribute and tell the Azure Functions runtime to use PushServiceClientConverter for providing PushServiceClient instance.

[Extension("PushService")]
internal class PushServiceExtensionConfigProvider : IExtensionConfigProvider
{
    private readonly IHttpClientFactory _httpClientFactory;

    public PushServiceExtensionConfigProvider(IHttpClientFactory httpClientFactory)
    {
        _httpClientFactory = httpClientFactory;
    }

    public void Initialize(ExtensionConfigContext context)
    {
        if (context == null)
        {
            throw new ArgumentNullException(nameof(context));
        }

        //PushServiceClient Bindings
        var bindingAttributeBindingRule = context.AddBindingRule<PushServiceAttribute>();
        bindingAttributeBindingRule.AddValidator(ValidateVapidAuthentication);

        bindingAttributeBindingRule.BindToInput<PushServiceClient>(typeof(PushServiceClientConverter), _httpClientFactory);
    }

    ...
}

I've skipped the code which reads and validates settings. If you're interested you can find it on GitHub.

With the above extension in place, the static instance of PushServiceClient is no longer needed.

public static class SendNotificationFunction
{
    ...

    [FunctionName("SendNotification")]
    public static async Task Run(
        ...,
        [PushService(PublicKeySetting = "ApplicationServerPublicKey",
        PrivateKeySetting = "ApplicationServerPrivateKey", SubjectSetting = "ApplicationServerSubject")]
        PushServiceClient pushServiceClient)
    {
        if (notifications != null)
        {
            ...

            while (subscriptionQuery.HasMoreResults)
            {
                foreach (PushSubscription subscription in await subscriptionQuery.ExecuteNextAsync())
                {
                    foreach (PushMessage notification in notifications)
                    {
                        // Fire-and-forget
                        pushServiceClient.RequestPushMessageDeliveryAsync(subscription, notification);
                    }
                }
            }
        }
    }
}

You Don't Have to Reimplement This by Yourself

The PushServiceClient binding extension is available on NuGet. I have also pushed the demo project to GitHub. Hopefully it will help you to get the best out of Web Push Notifications.

This is my fourth post about Azure Functions extensibility. So far I've written about triggers, inputs, and outputs (maybe I should devote some more time to outputs, but that's something for another time). In this post, I want to focus on something I haven't mentioned yet - extending existing extensions. As usual, I intend to do it in a practical context.

Common problem with the Azure Cosmos DB Trigger

There is a common problem with the Azure Cosmos DB Trigger for Azure Functions. Let's consider the sample from the documentation.

public class ToDoItem
{
    public string Id { get; set; }
    public string Description { get; set; }
}
public static class CosmosTrigger
{
    [FunctionName("CosmosTrigger")]
    public static void Run([CosmosDBTrigger(
        databaseName: "ToDoItems",
        collectionName: "Items",
        ConnectionStringSetting = "CosmosDBConnection",
        LeaseCollectionName = "leases",
        CreateLeaseCollectionIfNotExists = true)]IReadOnlyList<Document> documents, 
        ILogger log)
    {
        if (documents != null && documents.Count > 0)
        {
            log.LogInformation($"Documents modified: {documents.Count}");
            log.LogInformation($"First document Id: {documents[0].Id}");
        }
    }
}

It looks ok, but it avoids the problem by not being interested in the content of the documents. The trigger has a limitation, it works only with IReadOnlyList. This means that accessing values may not be as easy as one could expect. There is GetPropertyValue method, which helps with retrieving a single property value.

public static class CosmosTrigger
{
    [FunctionName("CosmosTrigger")]
    public static void Run([CosmosDBTrigger(
        ...)]IReadOnlyList<Document> documents, 
        ILogger log)
    {
        foreach(Document document in documents)
        {
            log.LogInformation($"ToDo: {document.GetPropertyValue<string>("Description")}");
        }
    }
}

This is not perfect. What if there is a lot of properties? If what is truly needed is the entire POCO, then a conversion from Document to POCO must be added. One way is to cast through dynamic.

public static class CosmosTrigger
{
    [FunctionName("CosmosTrigger")]
    public static void Run([CosmosDBTrigger(
        ...)]IReadOnlyList<Document> documents, 
        ILogger log)
    {
        foreach(Document document in documents)
        {
            ToDoItem item = (dynamic)document;
            log.LogInformation($"ToDo: {item.Description}");
            ...
        }
    }
}

Another way is to use JSON deserialization.

public static class CosmosTrigger
{
    [FunctionName("CosmosTrigger")]
    public static void Run([CosmosDBTrigger(
        ...)]IReadOnlyList<Document> documents, 
        ILogger log)
    {
        foreach(Document document in documents)
        {
            ToDoItem item = JsonConvert.DeserializeObject(document.ToString());
            log.LogInformation($"ToDo: {item.Description}");
            ...
        }
    }
}

All this is still not perfect, especially if there are many functions like this in a project. What would be perfect is being able to take a collection of POCOs as an argument.

public static class CosmosTrigger
{
    [FunctionName("CosmosTrigger")]
    public static void Run([CosmosDBTrigger(
        ...)]IReadOnlyList<ToDoItem> items, 
        ILogger log)
    {
        foreach(ToDoItem item in items)
        {
            log.LogInformation($"ToDo: {item.Description}");
            ...
        }
    }
}

Can this be achieved?

Extending the Azure Cosmos DB Trigger

As you may already know, the heart of Azure Functions extensibility is ExtensionConfigContext. It allows registering bindings by adding binding rules, but it also allows adding converters. Typically converters are added as part of the binding rule in the original extension, but this is not the only way. The truth is that the converter manager is centralized and shared across extensions. That means it's possible to add a converter for a type which is supported by different extension. The problem with the Azure Cosmos DB Trigger can be solved by a converter from IReadOnlyList to IReadOnlyList. But first, some standard boilerplate code is needed.

[assembly: WebJobsStartup(typeof(CosmosDBExtensionsWebJobsStartup))]

public class CosmosDBExtensionsWebJobsStartup : IWebJobsStartup
{
    public void Configure(IWebJobsBuilder builder)
    {
        builder.AddExtension<CosmosDBExtensionExtensionsConfigProvider>();
    }
}
[Extension("CosmosDBExtensions")]
internal class CosmosDBExtensionExtensionsConfigProvider : IExtensionConfigProvider
{
    public void Initialize(ExtensionConfigContext context)
    {
        if (context == null)
        {
            throw new ArgumentNullException("context");
        }


    }
}

Now it's time to add the converter. There are two methods for adding converters: AddConverter<TSource, TDestination> and AddOpenConverter<TSource, TDestination>. The AddConverter<TSource, TDestination> can be used to add a converter from one concrete type to another, while AddOpenConverter<TSource, TDestination> can be used to add a converter with support for generics. But how to define TDestination when Initialize method is not generic? For this purpose, the SDK provides a sentinel type OpenType which serves as a placeholder for a generic type.

[Extension("CosmosDBExtensions")]
internal class CosmosDBExtensionExtensionsConfigProvider : IExtensionConfigProvider
{
    public void Initialize(ExtensionConfigContext context)
    {
        ...

        context.AddOpenConverter<IReadOnlyList<Document>, IReadOnlyList<OpenType>>(typeof(GenericDocumentConverter<>));
    }
}

The converter itself is just an implementation of IConverter<TInput, TOutput>.

internal class GenericDocumentConverter<T> : IConverter<IReadOnlyList<Document>, IReadOnlyList<T>>
{
    public IReadOnlyList<T> Convert(IReadOnlyList<Document> input)
    {
        List<T> output = new List<T>(input.Count);

        foreach(Document item in input)
        {
            output.Add(Convert(item));
        }

        return output.AsReadOnly();
    }

    private static T Convert(Document document)
    {
        return JsonConvert.DeserializeObject<T>(document.ToString());
    }
}

That's it. This extension will allow the Azure Cosmos DB Trigger to be used with POCO collections.

This pattern can be reused with any other extension.

I was prompted to write this post by this question. In general, the question is about using ASP.NET Core built-in authorization to restrict access to a middleware. In ASP.NET Core the authorization mechanism is well exposed for MVC (through AuthorizeAttribute), but for middleware it's a manual job (at least for now). The reason for that might be the fact that there is no too many terminal middleware.

This was not the first time I've received this question, so I've quickly responded with typical code to achieve the task. But, after some thinking, I've decided I will put a detailed answer here.

Policy-based authorization

At its core, the authorization in ASP.NET Core is based on policies. Other available ways of specifying requirements (roles, claims) are in the end evaluated to policies. This means that it is enough to be able to validate a policy for the current user. This can be easily done with help of IAuthorizationService. All one needs is a policy name and HttpContext. Following authorization middleware gets the job done.

public class AuthorizationMiddleware
{
    private readonly RequestDelegate _next;
    private readonly string _policyName;

    public AuthorizationMiddleware(RequestDelegate next, string policyName)
    {
        _next = next;
        _policyName = policyName;
    }

    public async Task Invoke(HttpContext httpContext, IAuthorizationService authorizationService)
    {
        AuthorizationResult authorizationResult =
            await authorizationService.AuthorizeAsync(httpContext.User, null, _policyName);

        if (!authorizationResult.Succeeded)
        {
            await httpContext.ChallengeAsync();
            return;
        }

        await _next(httpContext);
    }
}

Of course, middleware registration can be encapsulated in an extensions method for easier use.

public static class AuthorizationApplicationBuilderExtensions
{
    public static IApplicationBuilder UseAuthorization(this IApplicationBuilder app, string policyName)
    {
        // Null checks removed for brevity
        ...

        return app.UseMiddleware(policyName);
    }
}

The only thing left is to put this middleware in front of middleware which should have restricted access (it can be placed multiple times if multiple policies need to be validated).

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        ...

        services.AddAuthorization(options =>
        {
            options.AddPolicy("PolicyName", ...);
        });
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        ...

        app.UseAuthentication();

        app.Map("/policy-based-authorization", branchedApp =>
        {
            branchedApp.UseAuthorization("PolicyName");

            ...
        });

        ...
    }
}

Simple and effective. Goal achieved, right?

Simple authorization, roles and schemes

Despite being my go-to solution, the above approach is far from perfect. It doesn't expose full capabilities and is not user-friendly. Something more similar to AuthorizeAttribute would be a lot better. This means making full use of policies, roles, and schemes. At first, this might sound like some serious work, but the truth is that all the hard work is done for us, we just need to go beyond Microsoft.AspNetCore.Authorization and use some services from Microsoft.AspNetCore.Authorization.Policy package. But before that can be done, a user-friendly way of defining restrictions is needed. This is no challenge, as ASP.NET Core has an interface for that.

internal class AuthorizationOptions : IAuthorizeData
{
    public string Policy { get; set; }

    public string Roles { get; set; }

    public string AuthenticationSchemes { get; set; }
}

This options class is very similar to AuthorizeAttribute. This isn't a surprise as AuthorizeAttribute also implements IAuthorizeData.

Implementing IAuthorizeData allows class to be transformed into AuthorizationPolicy with help of IAuthorizationPolicyProvider.

public class AuthorizationMiddleware
{
    private readonly RequestDelegate _next;
    private readonly IAuthorizeData[] _authorizeData;
    private readonly IAuthorizationPolicyProvider _policyProvider;
    private AuthorizationPolicy _authorizationPolicy;

    public AuthorizationMiddleware(RequestDelegate next,
        IAuthorizationPolicyProvider policyProvider,
        IOptions authorizationOptions)
    {
        // Null checks removed for brevity
        _next = next;
        _authorizeData = new[] { authorizationOptions.Value };
        _policyProvider = policyProvider;
    }

    public async Task Invoke(HttpContext httpContext, IPolicyEvaluator policyEvaluator)
    {
        if (_authorizationPolicy is null)
        {
            _authorizationPolicy =
                await AuthorizationPolicy.CombineAsync(_policyProvider, _authorizeData);
        }

        ...

        await _next(httpContext);
    }

    ...
}

The policy needs to be evaluated. This requires two calls to IPolicyEvaluator, one for authentication and one for authorization.

public class AuthorizationMiddleware
{
    ...

    public async Task Invoke(HttpContext httpContext, IPolicyEvaluator policyEvaluator)
    {
        ...

        AuthenticateResult authenticateResult =
            await policyEvaluator.AuthenticateAsync(_authorizationPolicy, httpContext);
        PolicyAuthorizationResult authorizeResult =
            await policyEvaluator.AuthorizeAsync(_authorizationPolicy, authenticateResult, httpContext, null);

        if (authorizeResult.Challenged)
        {
            await ChallengeAsync(httpContext);
            return;
        }
        else if (authorizeResult.Forbidden)
        {
            await ForbidAsync(httpContext);
            return;
        }

        await _next(httpContext);
    }

    ...
}

The last thing is handling Challenged and Forbidden scenarios. There are ready to use HttpContext extension methods which do that, but it's important to remember to make use of schemes if they have been provided.

public class AuthorizationMiddleware
{
    ...

    private async Task ChallengeAsync(HttpContext httpContext)
    {
        if (_authorizationPolicy.AuthenticationSchemes.Count > 0)
        {
            foreach (string authenticationScheme in _authorizationPolicy.AuthenticationSchemes)
            {
                await httpContext.ChallengeAsync(authenticationScheme);
            }
        }
        else
        {
            await httpContext.ChallengeAsync();
        }
    }

    private async Task ForbidAsync(HttpContext httpContext)
    {
        if (_authorizationPolicy.AuthenticationSchemes.Count > 0)
        {
            foreach (string authenticationScheme in _authorizationPolicy.AuthenticationSchemes)
            {
                await httpContext.ForbidAsync(authenticationScheme);
            }
        }
        else
        {
            await httpContext.ForbidAsync();
        }
    }
}

Now the registration method can be modified. An important thing to note here is that not setting any of the AuthorizationOptions properties will result in using default policy (same as decorating action or controller with [Authorize]). This case might be worth an overload.

public static class AuthorizationApplicationBuilderExtensions
{
    public static IApplicationBuilder UseAuthorization(this IApplicationBuilder app)
    {
        return app.UseAuthorization(new AuthorizationOptions());
    }

    public static IApplicationBuilder UseAuthorization(this IApplicationBuilder app,
        AuthorizationOptions authorizationOptions)
    {
        if (app == null)
        {
            throw new ArgumentNullException(nameof(app));
        }

        if (authorizationOptions == null)
        {
            throw new ArgumentNullException(nameof(authorizationOptions));
        }

        return app.UseMiddleware<AuthorizationMiddleware>(Options.Create(authorizationOptions));
    }
}

This makes all capabilities provided by AuthorizeAttribute available to middleware pipeline. If the application is not using MVC it's important to remember about adding policy services.

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        ...

        services.AddAuthorization(options =>
        {
            options.AddPolicy("PolicyName", ...);
        })
        .AddAuthorizationPolicyEvaluator();
    }

    public void Configure(IApplicationBuilder app, IHostingEnvironment env)
    {
        ...

        app.UseAuthentication();

        app.Map("/simple-authorization", branchedApp =>
        {
            branchedApp.UseAuthorization();

            ...
        });

        app.Map("/role-based-authorization", branchedApp =>
        {
            branchedApp.UseAuthorization(new AuthorizationOptions { Roles = "Employee" });

            ...
        });

        app.Map("/policy-based-authorization", branchedApp =>
        {
            branchedApp.UseAuthorization(new AuthorizationOptions { Policy = "EmployeeOnly" });

            ...
        });

        ...
    }
}

All the code above is a copy-paste solution when one wants to restrict middleware from outside, but it can also be easily adapted to put inside a middleware (which in the end I decided to do in case of my Server-Sent Events middleware).

Small note about the future

The state of authorization in the middleware pipeline should be expected to change. ASP.NET Core 3.0 is supposed to make Endpoint Routing available outside of MVC and it comes with support for authorization. In ASP.NET Core 2.2 there is already an authorization middleware (quite similar to the one above) which restricts endpoints based on IAuthorizeData from metadata. This means that in 3.0 it may be possible to define a restricted endpoint pointing to a middleware.

Older Posts