Trying to Run RSocket Server with RSocket.NET and ASP.NET Core Primitives for Non-HTTP Servers
I've heard about RSocket for the first time about two years ago, from this short (20 minutes) talk. If you want a quick overview, you can go and watch the recording right now, I can wait.
RSocket
If you have watched the recording, you can skip this section. If you didn't I'll try to explain RSocket in few words. RSocket is a binary protocol inspired by Reactive Manifesto. It was conceived by Netflix and Facebook in an effort to create an alternative protocol for microservices communication, which would favor asynchronous messaging and provide more interaction models than request/response. Now the main contributor seems to be Netifi, which has build an entire platform on top of it.
Since I've learned about RSocket, they've proven to be the right tool for the job several times. Unfortunately, in every one of those scenarios, we ended up using Java for particular microservice as rolling our own RSocket implementation was simply too expensive. I'm hoping this is about to change.
RSocket.NET
At the beginning of this year, Netifi started contributing .NET implementation of RSocket. I wanted to take a look at it for some time, but building a server on top of it requires providing a hosting layer. Because of that (and limited time) I was constantly postponing my exploration of this library until preview 8 of ASP.NET Core 3.0 has been released. Preview 8 of ASP.NET Core 3.0 includes new networking primitives for non-HTTP Servers, which should be a great hosting layer for RSocket.NET. I've simplified one of Java clients I already had (because it was quick and Java is the most mature implementation of RSocket) and started working on a POC server.
Hosting RSocket.Net with Networking Primitives for Non-HTTP Servers
In the case of RSocket.NET, the server logic can be implemented by subclassing RSocketServer
. The subclass can call a method like Respond
or Stream
in its constructor, to provide business logic behind interactions. A simple echo server for request/response and request/stream interactions can look like below.
internal class EchoServer : RSocketServer
{
public EchoServer(IRSocketTransport transport, RSocketOptions options = default, int echoes = 2)
: base(transport, options)
{
// Request/Response
Respond(
request => request, // requestTransform
request => AsyncEnumerable.Repeat(request, echoes), // producer
result => result // resultTransform
);
// Request/Stream
Stream(
request => request, // requestTransform
request => AsyncEnumerable.Repeat(request, echoes), // producer
result => result // resultTransform
);
}
}
An instance of RSocketServer
represents a client connected through IRSocketTransport
. This is the place where RSocket.NET and networking primitives should be glued together. In case of the new primitives provided by the ASP.NET Core team, a connected client is represented by ConnectionContext
. ConnectionContext
provides us with PipeReader
and PipeWriter
for input and output. This is exactly what IRSocketTransport
expects, so a wrapper should be all we need.
internal class ConnectionListenerTransport : IRSocketTransport
{
public PipeReader Input { get; }
public PipeWriter Output { get; }
public ConnectionListenerTransport(ConnectionContext connection)
{
Input = connection.Transport.Input;
Output = connection.Transport.Output;
}
public Task StartAsync(CancellationToken cancel = default)
{
return Task.CompletedTask;
}
public Task StopAsync()
{
return Task.CompletedTask;
}
}
The StartAsync
and StopAsync
methods require an explanation. Why I'm just returning completed tasks from them? ConnectionContext
represents an active connection, so StartAsync
has nothing to do. The same in the case of StopAsync
as the connection will be managed outside, by the hosting layer.
The hosting layer can be implemented in the same way as in ASP.NET Core 3.0 Preview 8 announcement post - as a BackgroundService
.
internal class RSocketHost : BackgroundService
{
private readonly IConnectionListenerFactory _connectionListenerFactory;
private readonly ILogger<RSocketHost> _logger;
private IConnectionListener _connectionListener;
public RSocketHost(IConnectionListenerFactory connectionListenerFactory, ILogger<RSocketHost> logger)
{
_connectionListenerFactory = connectionListenerFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_connectionListener = await _connectionListenerFactory.BindAsync(new IPEndPoint(IPAddress.Loopback, 6000), stoppingToken);
while (true)
{
ConnectionContext connection = await _connectionListener.AcceptAsync(stoppingToken);
// AcceptAsync will return null upon disposing the listener
if (connection == null)
{
break;
}
// TODO: Ensure all accepted connections are disposed prior to completing
_ = Accept(connection);
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
{
await _connectionListener.DisposeAsync();
}
private async Task Accept(ConnectionContext connection)
{
try
{
IRSocketTransport rsocketTransport = new ConnectionListenerTransport(connection);
RSocketServer rsocketServer = new EchoServer(rsocketTransport);
await rsocketServer.ConnectAsync();
_logger.LogInformation("Connection {ConnectionId} connected", connection.ConnectionId);
await connection.ConnectionClosed.WaitAsync();
}
catch (Exception e)
{
_logger.LogError(e, "Connection {ConnectionId} threw an exception", connection.ConnectionId);
}
finally
{
await connection.DisposeAsync();
_logger.LogInformation("Connection {ConnectionId} disconnected", connection.ConnectionId);
}
}
}
The key part is the Accept
method. In that method, every new connection is wrapped by ConnectionListenerTransport
and then used to instantiate our EchoServer
. Then the method waits for a connection to terminate.
This implementation (as it's only a POC) doesn't take care of graceful server shutdown. A production-ready implementation should make sure that, when StopAync
on BackgroundService
is called, all connections are disposed before ExecuteAsync
exits.
The last thing to do is registering above BackgroundService
and one of IConnectionListenerFactory
implementations with a host.
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.TryAddSingleton<IConnectionListenerFactory, SocketTransportFactory>();
services.AddHostedService<RSocketHost>();
});
}
Now we can hit F5 and test it.
It Didn't Work...
The client didn't receive any response. From logs I could see that connection has been established. I could also verify that the client has sent a request (in my case a request/stream interaction), but there was no response from the server. I could give up there, but I really wanted this to work. So I've forked the RSocket.NET repository, opened the protocol specification, and started debugging. At the end (I think) I was able to identify the issue (typical for network-related code big endian to little endian omission), fix it, and see the responses being logged by the client. Yes!
Conclusion
The new networking primitives for non-HTTP servers look great. They open interesting capabilities and are easy to use. I'm not that happy with RSocket.NET. It seems to be missing some functionality and have bugs lurking around. I hope this is because it's still young. I would love to have RSocket available in my toolbox for .NET based microservices, so I will be keeping an eye on this project. Hopefully, the project will continue to grow and mature. If it does, I will be happy to contribute to it as well. If it doesn't, maybe I'll decide to work on my own implementation.
For those who want to play by themselves with what I've described here, I've put the demo project and client gist on GitHub.