Trying to Run RSocket Server with RSocket.NET and ASP.NET Core Primitives for Non-HTTP Servers

I've heard about RSocket for the first time about two years ago, from this short (20 minutes) talk. If you want a quick overview, you can go and watch the recording right now, I can wait.

RSocket

If you have watched the recording, you can skip this section. If you didn't I'll try to explain RSocket in few words. RSocket is a binary protocol inspired by Reactive Manifesto. It was conceived by Netflix and Facebook in an effort to create an alternative protocol for microservices communication, which would favor asynchronous messaging and provide more interaction models than request/response. Now the main contributor seems to be Netifi, which has build an entire platform on top of it.

Since I've learned about RSocket, they've proven to be the right tool for the job several times. Unfortunately, in every one of those scenarios, we ended up using Java for particular microservice as rolling our own RSocket implementation was simply too expensive. I'm hoping this is about to change.

RSocket.NET

At the beginning of this year, Netifi started contributing .NET implementation of RSocket. I wanted to take a look at it for some time, but building a server on top of it requires providing a hosting layer. Because of that (and limited time) I was constantly postponing my exploration of this library until preview 8 of ASP.NET Core 3.0 has been released. Preview 8 of ASP.NET Core 3.0 includes new networking primitives for non-HTTP Servers, which should be a great hosting layer for RSocket.NET. I've simplified one of Java clients I already had (because it was quick and Java is the most mature implementation of RSocket) and started working on a POC server.

Hosting RSocket.Net with Networking Primitives for Non-HTTP Servers

In the case of RSocket.NET, the server logic can be implemented by subclassing RSocketServer. The subclass can call a method like Respond or Stream in its constructor, to provide business logic behind interactions. A simple echo server for request/response and request/stream interactions can look like below.

internal class EchoServer : RSocketServer
{
    public EchoServer(IRSocketTransport transport, RSocketOptions options = default, int echoes = 2)
        : base(transport, options)
    {
        // Request/Response
        Respond(
            request => request,                                 // requestTransform
            request => AsyncEnumerable.Repeat(request, echoes), // producer
            result => result                                    // resultTransform
        );

        // Request/Stream
        Stream(
            request => request,                                 // requestTransform
            request => AsyncEnumerable.Repeat(request, echoes), // producer
            result => result                                    // resultTransform
        );
    }
}

An instance of RSocketServer represents a client connected through IRSocketTransport. This is the place where RSocket.NET and networking primitives should be glued together. In case of the new primitives provided by the ASP.NET Core team, a connected client is represented by ConnectionContext. ConnectionContext provides us with PipeReader and PipeWriter for input and output. This is exactly what IRSocketTransport expects, so a wrapper should be all we need.

internal class ConnectionListenerTransport : IRSocketTransport
{
    public PipeReader Input { get; }

    public PipeWriter Output { get; }

    public ConnectionListenerTransport(ConnectionContext connection)
    {
        Input = connection.Transport.Input;
        Output = connection.Transport.Output;
    }

    public Task StartAsync(CancellationToken cancel = default)
    {
        return Task.CompletedTask;
    }

    public Task StopAsync()
    {
        return Task.CompletedTask;
    }
}

The StartAsync and StopAsync methods require an explanation. Why I'm just returning completed tasks from them? ConnectionContext represents an active connection, so StartAsync has nothing to do. The same in the case of StopAsync as the connection will be managed outside, by the hosting layer.

The hosting layer can be implemented in the same way as in ASP.NET Core 3.0 Preview 8 announcement post - as a BackgroundService.

internal class RSocketHost : BackgroundService
{
    private readonly IConnectionListenerFactory _connectionListenerFactory;
    private readonly ILogger<RSocketHost> _logger;

    private IConnectionListener _connectionListener;

    public RSocketHost(IConnectionListenerFactory connectionListenerFactory, ILogger<RSocketHost> logger)
    {
        _connectionListenerFactory = connectionListenerFactory;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _connectionListener = await _connectionListenerFactory.BindAsync(new IPEndPoint(IPAddress.Loopback, 6000), stoppingToken);

        while (true)
        {
            ConnectionContext connection = await _connectionListener.AcceptAsync(stoppingToken);

            // AcceptAsync will return null upon disposing the listener
            if (connection == null)
            {
                break;
            }

            // TODO: Ensure all accepted connections are disposed prior to completing
            _ = Accept(connection);
        }
    }

    public override async Task StopAsync(CancellationToken cancellationToken)
    {
        await _connectionListener.DisposeAsync();
    }

    private async Task Accept(ConnectionContext connection)
    {
        try
        {
            IRSocketTransport rsocketTransport = new ConnectionListenerTransport(connection);

            RSocketServer rsocketServer = new EchoServer(rsocketTransport);
            await rsocketServer.ConnectAsync();

            _logger.LogInformation("Connection {ConnectionId} connected", connection.ConnectionId);

            await connection.ConnectionClosed.WaitAsync();
        }
        catch (Exception e)
        {
            _logger.LogError(e, "Connection {ConnectionId} threw an exception", connection.ConnectionId);
        }
        finally
        {
            await connection.DisposeAsync();

            _logger.LogInformation("Connection {ConnectionId} disconnected", connection.ConnectionId);
        }
    }
}

The key part is the Accept method. In that method, every new connection is wrapped by ConnectionListenerTransport and then used to instantiate our EchoServer. Then the method waits for a connection to terminate.

This implementation (as it's only a POC) doesn't take care of graceful server shutdown. A production-ready implementation should make sure that, when StopAync on BackgroundService is called, all connections are disposed before ExecuteAsync exits.

The last thing to do is registering above BackgroundService and one of IConnectionListenerFactory implementations with a host.

public class Program
{
    public static void Main(string[] args)
    {
        CreateHostBuilder(args).Build().Run();
    }

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.TryAddSingleton<IConnectionListenerFactory, SocketTransportFactory>();

                services.AddHostedService<RSocketHost>();
            });
}

Now we can hit F5 and test it.

It Didn't Work...

The client didn't receive any response. From logs I could see that connection has been established. I could also verify that the client has sent a request (in my case a request/stream interaction), but there was no response from the server. I could give up there, but I really wanted this to work. So I've forked the RSocket.NET repository, opened the protocol specification, and started debugging. At the end (I think) I was able to identify the issue (typical for network-related code big endian to little endian omission), fix it, and see the responses being logged by the client. Yes!

Conclusion

The new networking primitives for non-HTTP servers look great. They open interesting capabilities and are easy to use. I'm not that happy with RSocket.NET. It seems to be missing some functionality and have bugs lurking around. I hope this is because it's still young. I would love to have RSocket available in my toolbox for .NET based microservices, so I will be keeping an eye on this project. Hopefully, the project will continue to grow and mature. If it does, I will be happy to contribute to it as well. If it doesn't, maybe I'll decide to work on my own implementation.

For those who want to play by themselves with what I've described here, I've put the demo project and client gist on GitHub.