Server-To-Client RPC with Response in ASP.NET Core SignalR

When using ASP.NET Core SignalR, we can perform invocations which don't return anything, invocations which return a result, and invocations which results in a stream of results. Unfortunately, invocations which return a result or a stream of results are available only when invoking server from a client. If a server wants to invoke a client, the only option is invocation which doesn't return anything. ASP.NET Core SignalR is supposed to bring streaming from client to server but again only as part of an invocation from client to a server. Sometimes there are scenarios where we would like a client to be able to respond to invocation - a server-to-client remote procedure call (RPC) with a result.

What We Want

ASP.NET Core has a concept of strongly typed hubs which allows for representing client methods as an interface. For a server-to-client RPC with a result scenario, such an interface should look like below.

public interface IRpc
{
    Task<MethodResponse> MethodCall(MethodParams methodParams);
}

With the following, corresponding, strongly typed hub.

public class RpcHub : Hub<IRpc>
{
    ...
}

The hub doesn't need to have any methods. Of course, we want to handle connection events to maintain a list of currently connected users, but if a trigger is something from our infrastructure (message from a queue, change from a database, etc.) that's all we need. Let's assume that's exactly the case, that clients methods are being invoked from a BackgroundService which is listening for that trigger. Below is a code of such BackgroundService limited to SignalR aspects.

public class RpcCallerBackgroundService : BackgroundService
{
    private readonly IHubContext<RpcHub, IRpc> _rpcHubContext;

    public RpcCallerBackgroundService(IHubContext<RpcHub, IRpc> rpcHubContext)
    {
        _rpcHubContext = rpcHubContext;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (...)
        {
            ...

            MethodResponse response = await _rpcHubContext.Clients.User(userId).MethodCall(new MethodParams
            {
                ...
            });
        }
    }
}

All the code so far we can write right now. It will even compile. Sadly it won't work. If you would put this code into your application, you would get a System.InvalidOperationException upon startup with a message more or less like this:

Cannot generate proxy implementation for 'IRpc.MethodCall'. All client proxy methods must return 'System.Threading.Tasks.Task'.

So, on the server side, we can write the code we want but it will result in an exception. How about the client side?

Imagine that a client is a generic host based worker service, which is using a BackgroundService to maintain a SignalR connection with a server. Implementation of such BackgroundService for our IRpc interface could be something like this.

public class RpcClientBackgroundService : BackgroundService
{
    private readonly HubConnection _rpcResponseHubConnection;

    public RpcClientBackgroundService()
    {
        _rpcResponseHubConnection = new HubConnectionBuilder()
            .WithUrl("...")
            .Build();
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        IDisposable _methodCallHandler = _rpcResponseHubConnection.On(nameof(IRpc.MethodCall), new[] { typeof(MethodParams) },
            async (methodParamsArray) =>
            {
                ...
            });

        await _rpcResponseHubConnection.StartAsync();

        await WaitForCancellationAsync(stoppingToken);

        _methodCallHandler.Dispose();

        await _rpcResponseHubConnection.DisposeAsync();
    }

    ...
}

Here we can't even implement what we want. The only overload of On method which takes a Func, forces return type to be a Task. Other overloads take an Action.

I've spent a lot of lines on proving that we can't have what we want. Is there something we can have?

What We Can Have

The only way for a client to return something to a server is to invoke it back. So the only possible approach, from SignalR perspective, is to have two invocations, which we can represent by two interfaces.

public interface IRpcCalls
{
    Task MethodCall(MethodParams methodParams);
}

public interface IRpcResponseHandlers
{
    Task MethodResponseHandler(MethodResponse response);
}

Another consequence is that we will have to correlate those invocations. We need something in payloads that will allow us to do that.

public class MethodParams
{
    public Guid MethodCallId { get; set; }

    ...
}

public class MethodResponse
{
    public Guid MethodCallId { get; set; }

    ...
}

Now we can change the client BackgroundService to perform an invocation of MethodResponseHandler when the processing of MethodCall is finished.

public class RpcBackgroundService : BackgroundService
{
    ...

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        IDisposable _methodCallHandler = _rpcResponseHubConnection.On<MethodParams>(nameof(IRpcCalls.MethodCall),
            methodParams =>
            {
                ...

                _rpcResponseHubConnection.InvokeAsync(nameof(IRpcResponseHandlers.MethodResponseHandler), new MethodResponse
                {
                    MethodCallId = methodParams.MethodCallId,
                    ...
                });
            });

        ...
    }

    ...
}

That was the easy part. Changing the server part is a little bit harder. The BackgroundService on the server side will not be able to use IHubContext directly, we need to introduce something that will hide the split into two invocations. We want that something to be similar to IHubContext in a way that it's a generic, where the type of hub is used as a type parameter. This way we can enforce that hub to implement IRpcResponseHandlers interface and to be a strongly typed one with client methods represented by IRpcCalls interface.

public interface IRpcCaller<THub> : IRpc, IRpcResponseHandlers
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{ }

public class RpcCaller<THub> : IRpcCaller
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    private readonly IHubContext<THub, IRpcCalls> _hubContext;

    public RpcCaller(IHubContext<THub, IRpcCalls> hubContext)
    {
        _hubContext = hubContext;
    }

    ...
}

Why IRpcCaller is composed of IRpc and IRpcResponseHandler? The IRpc will be needed by the BackgroundService, but we also need a way for the hub to pass the invocation response to IRpcCaller.

Before we can start implementation, we also need to make a change to IRpc interface. As the implementation will hide IHubContext, the user identifier needs to be passed as a parameter.

public interface IRpc
{
    Task<MethodResponse> MethodCall(string userId, MethodParams methodParams);
}

So, how do we synchronize those two invocations and make the method wait until the response is available? One wait to implement TAP (Task-based Asynchronous Pattern) is to do it manually with the help of TaskCompletionSource. As we can have a number of simultaneous calls, we will need a ConcurrentDictionary of those. Whenever a new call is made we will create a TaskCompletionSource, add it to the dictionary, and use its Task.

public class RpcCaller<THub> : IRpcCaller
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    ...
    private readonly ConcurrentDictionary<Guid, TaskCompletionSource<MethodResponse>> _pendingMethodCalls =
        new ConcurrentDictionary<Guid, TaskCompletionSource<MethodResponse>>();

    ...

    public async Task<MethodResponse> MethodCall(string userId, MethodParams methodParams)
    {
        methodParams.MethodCallId = Guid.NewGuid();

        TaskCompletionSource<MethodResponse> methodCallCompletionSource = new TaskCompletionSource<MethodResponse>();
        if (_pendingMethodCalls.TryAdd(methodParams.MethodCallId, methodCallCompletionSource))
        {
            await _hubContext.Clients.User(userId).MethodCall(methodParams);

            return await methodCallCompletionSource.Task;
        }

        throw new Exception("Couldn't call the method.");
    }

    ...
}

Here comes the IRpcResponseHandler part. Calling the MethodResponseHandler will remove the TaskCompletionSource from the dictionary and will set its result.

public class RpcCaller<THub> : IRpcCaller
    where THub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    ...

    public Task MethodResponseHandler(MethodResponse response)
    {
        if (_pendingMethodCalls.TryRemove(response.MethodCallId, out TaskCompletionSource<MethodResponse> methodCallCompletionSource))
        {
            methodCallCompletionSource.SetResult(response);
        }

        return Task.CompletedTask;
    }
}

One very important thing is to remember that we must be sure that the user with the specified identifier is connected. SignalR does nothing when User(userId).MethodCall is called for user which is not connected. It will not return any error or throw an exception. In this scenario, this might be a source of potential issue. In such a case, the TaskCompletionSource will be created and its Task used, but that Task will never complete.

With IRpcCaller ready, the hub can be changed to comply with requirements.

public class RpcHub : Hub<IRpcCalls>, IRpcResponseHandlers
{
    private readonly IRpcCaller<RpcHub> _rpcCaller;

    public RpcHub(IRpcCaller<RpcHub> rpcCaller)
    {
        _rpcCaller = rpcCaller;
    }

    public Task MethodResponseHandler(MethodResponse response)
    {
        return _rpcCaller.MethodResponseHandler(response);
    }

    ...
}

What's remaining is changes in server BackgroundService, which essentially means replacing IHubContext with IRpcCaller.

public class RpcCallerBackgroundService : BackgroundService
{
    private readonly IRpcCaller<RpcHub> _rpcCaller;

    public RpcCallerBackgroundService(IRpcCaller<RpcHub> rpcCaller)
    {
        _rpcCaller = rpcCaller;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        while (...)
        {
            ...

            MethodResponse response = await _rpcCaller.MethodCall(userId, new MethodParams
            {
                ...
            });
        }
    }
}

This is what we can have.

We Can't Have Nice Things

Is this a nice thing? Well not really. In order to enforce as much as possible, it's more complicated than it should be. And it still leaves room for human errors. But it gets the job done and in scenarios where enforcing interfaces implementation is not that important (for example interfaces are highly unlikely to change), it can be simplified.