Reactive Neo4j using .NET

Version 4.0 of Neo4j is being actively worked on, and aside from the new things in the database itself, the drivers get an update as well – and one of the big updates is the addition of a Reactive way to develop against the DB.

Now – I’ve not done reactive programming for a long time, I think I did play around with it when .NET 4 was first released, but I have no idea where that blog post has gone – so I may as well start as new.

I found it! Not the post, but the application – MousePath – which is now on GitHub: MousePath – aside from it ‘working’ it’s not performant in any way.

What is Rx/Reactive?

Reactive in .NET is all about the IObservable<T>/IObserver<T> interfaces. They’ve been around since .NET 4, but personally I’ve never really used them. They allow application code to react to data being pushed to it, rather than the more traditional way of requesting the data.

There’s a good book (Intro to Rx) which I will been using to work this out, which is freely available online: http://introtorx.com/ .

Starting off

For this project, we’re going to need the nuget package – which in this case isn’t Neo4j.Driver – but Neo4j.Driver.Reactive. When we add this to our project – and create a driver in the normal way- we can see we now have an ‘RxSession‘ which is an extension method of the IDriver.

So let’s create a reactive session and see what we can see.

RxSession

We get IObservable as opposed to the AsyncSession giving us Tasks

AsyncSession

Doing a Run-ner

So, back to our RxSession, lets do a basic version, just using Run

var session = Driver.RxSession();
var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title");
rxStatementResult
    .Records()
    .Subscribe(
        record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>())
    );

In here, we’re hooking up to the ol’ classic Movies database, and simply writing the titles to the screen. NB – Driver is a static property of type IDriver I have defined elsewhere.

The first two lines look pretty much like our normal code – the only real difference being the use of the ‘RxSession‘ as opposed to just ‘Session‘.

Run on an RxSession returns an IRxStatementResult – which has 3 methods we’re interested in, (well actually only 1 at the moment) – Records(), Consume() and Keys().

Records() gets us the records from the database, so the stuff we want to do things with, Consume() whips through those records so we can get an IResultSummary telling us what is going on, and Keys() gets us the keys that are returned, in the simple statement I’ve done – ‘m.title‘ is the only key.

Records() is what we’re using, as we want to deal with the data, Records() return us an IObservable<IRecord> and being IObservable – we need to Subscribe() to it to get the data. Subscribing means we will provide an IObserver that will be notified whenever an IRecord arrives.

In this case, we have the contents being written to the console. Aces.

Quitting

Being a console app – doing tiny amounts of work – I largely don’t need to worry about disposing of my resources, but let’s imagine resource usage is something we do care about. How do you go about disposing of your resources?

IDisposable? INosable! – the IRxSession doesn’t implement IDisposable, instead we have to Close<T>() it – and this is where things have got a little fuzzy for me – I’m not entirely sure I’m closing it correctly.

var session = Driver.RxSession();
var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title");
rxStatementResult
    .Records()
    .Subscribe(
        record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>()));

session.Close<IRecord>();

Now, I expect to get either no results, or a smaller subset (depending on the speed of the code running) – what I get is the close being called, but still getting the full amount of data – I suspect I misunderstand what is going on here.

Let’s say I do want a smaller subset – or to quit – how do I do it? Well, the Subscribe() method actually returns an IObservable<IRecord> – which is also IDisposable – so we ‘unsubscribe’ by disposing of our subscriber:

var session = Driver.RxSession();
var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title");
var subscription = rxStatementResult
    .Records()
    .Subscribe(record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>()));

await Task.Delay(220);
subscription.Dispose();
session.Close<IRecord>();

The ‘delay’ magic number there is enough time to get some records, but not all, anything less than that gave no results, anything more – all the results. ¯\_(ツ)_/¯

Ooook So – Why Rx?

It seems more complex right? Subscribe(), unsubscribe – no foreach in sight! What’s the point?

My understanding – and this could be/probably is wrong – is that by using Rx – we’re reducing our overheads – i.e. instead of streaming everything, we can just stream what we’re consuming at the time.

The other key benefits come from things like ‘.Buffer‘ and the other commands (Skip, Last, etc) allowing you to stream things in a better way.

One nice thing about Rx in .NET is that it’s not the same as async – you don’t have to have your entire stack in Rx to get the benefits – you can do bits and pieces where you need to – if you’ve got a lot of data maybe it makes sense for a given query.