Reactive Neo4j using .NET
Version 4.0 of Neo4j is being actively worked on, and aside from the new things in the database itself, the drivers get an update as well – and one of the big updates is the addition of a Reactive way to develop against the DB.
Now – I’ve not done reactive programming for a long time, I think I did play around with it when .NET 4 was first released, but I have no idea where that blog post has gone – so I may as well start as new.
I found it! Not the post, but the application – MousePath – which is now on GitHub: MousePath – aside from it ‘working’ it’s not performant in any way.
What is Rx/Reactive?
Reactive in .NET is all about the
IObservable<T>/IObserver<T> interfaces. They’ve been around since .NET 4, but personally I’ve never really used them. They allow application code to react to data being pushed to it, rather than the more traditional way of requesting the data.
There’s a good book (Intro to Rx) which I will been using to work this out, which is freely available online: http://introtorx.com/ .
For this project, we’re going to need the nuget package – which in this case isn’t Neo4j.Driver – but Neo4j.Driver.Reactive. When we add this to our project – and create a driver in the normal way- we can see we now have an ‘
RxSession‘ which is an extension method of the
So let’s create a reactive session and see what we can see.
IObservable as opposed to the
AsyncSession giving us Tasks
Doing a Run-ner
So, back to our
RxSession, lets do a basic version, just using
var session = Driver.RxSession(); var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title"); rxStatementResult .Records() .Subscribe( record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>()) );
In here, we’re hooking up to the ol’ classic Movies database, and simply writing the titles to the screen. NB –
Driver is a static property of type
IDriver I have defined elsewhere.
The first two lines look pretty much like our normal code – the only real difference being the use of the ‘
RxSession‘ as opposed to just ‘
Run on an
RxSession returns an
IRxStatementResult – which has 3 methods we’re interested in, (well actually only 1 at the moment) –
Records() gets us the records from the database, so the stuff we want to do things with,
Consume() whips through those records so we can get an
IResultSummary telling us what is going on, and
Keys() gets us the keys that are returned, in the simple statement I’ve done – ‘
m.title‘ is the only key.
Records() is what we’re using, as we want to deal with the data,
Records() return us an
IObservable<IRecord> and being
IObservable – we need to
Subscribe() to it to get the data. Subscribing means we will provide an
IObserver that will be notified whenever an
In this case, we have the contents being written to the console. Aces.
Being a console app – doing tiny amounts of work – I largely don’t need to worry about disposing of my resources, but let’s imagine resource usage is something we do care about. How do you go about disposing of your resources?
IDisposable? INosable! – the
IRxSession doesn’t implement
IDisposable, instead we have to
Close<T>() it – and this is where things have got a little fuzzy for me – I’m not entirely sure I’m closing it correctly.
var session = Driver.RxSession(); var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title"); rxStatementResult .Records() .Subscribe( record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>())); session.Close<IRecord>();
Now, I expect to get either no results, or a smaller subset (depending on the speed of the code running) – what I get is the close being called, but still getting the full amount of data – I suspect I misunderstand what is going on here.
Let’s say I do want a smaller subset – or to quit – how do I do it? Well, the
Subscribe() method actually returns an
IObservable<IRecord> – which is also
IDisposable – so we ‘unsubscribe’ by disposing of our subscriber:
var session = Driver.RxSession(); var rxStatementResult = session.Run("MATCH (m:Movie) RETURN m.title"); var subscription = rxStatementResult .Records() .Subscribe(record => Console.WriteLine("Got: " + record.Values["m.title"].As<string>())); await Task.Delay(220); subscription.Dispose(); session.Close<IRecord>();
The ‘delay’ magic number there is enough time to get some records, but not all, anything less than that gave no results, anything more – all the results. ¯\_(ツ)_/¯
Ooook So – Why Rx?
It seems more complex right? Subscribe(), unsubscribe – no foreach in sight! What’s the point?
My understanding – and this could be/probably is wrong – is that by using Rx – we’re reducing our overheads – i.e. instead of streaming everything, we can just stream what we’re consuming at the time.
The other key benefits come from things like ‘
.Buffer‘ and the other commands (
Last, etc) allowing you to stream things in a better way.
One nice thing about Rx in .NET is that it’s not the same as async – you don’t have to have your entire stack in Rx to get the benefits – you can do bits and pieces where you need to – if you’ve got a lot of data maybe it makes sense for a given query.