Random Ramblings

Getting to grips with RX

Reactive Extensions 

Rx for short - is a pretty neat little framework by Erik Meijer. I've sort of been avoiding it - don't really know why, but Copenhagen .Net User Group (CNUG) recently had Tamir Dresher do an introduction to the tech (and blatantly plug his upcoming book on the subject), and it was a bit of an eye opener, so I've decided to dedicate some time to get to the bottom of this - and blog about it.

The Limitations of IEnumerable

We all know IEnumerable and it's trusty sidekicks, foreach(var foo in bar) and .Select/.Where/.Any/.All/.etc. It's wonderfully convenient to have an interface, where going through a collection/set/array/list is a matter of letting the compiler do its magic.

foreach(var foo in bar)
    // do stuff with foo

But what if bar contains something exotic like a bunch of Task<T>'s that are currently crunching away on healthy portions of data. We can't really be sure when any one of them will return, but we can be 100% sure, that they won't return in the same order we enumerate them, so we will be wasting time - even if we employ Parallel.ForEach.

Ideally we would like to be able to respond to the tasks completing as events, such that when the first Task completes, we immediately respond to it. We would also like to know when the last Task completes, in order to be able to shutdown gracefully.

Another scenario: What if bar is in fact not a collection with a fixed number of elements. Think a mailbox or maybe a performance counter. New elements keep popping into it, which makes foreach-ing over the collection soft of impossible. We're forced to employ a different strategy probably involving queues and maybe OnNew-events which we need to subscribe to - and which threads are now accessing which parts of the application.

IObservable to the Rescue

Rx is an implementation of the Reactor-pattern  (hence Reactive) in Linq (which are eXtensions). IObservable is mathematically dual to IEnumerable, which could be translated into something like 'The same but seen from the other side'. In stead of pulling foos out of bar, let bar push foos out to you.

In the case of an IObservable of Task<T>, the result would be to get the Tasks in the order they complete. In the case of the mailbox or performance counter, the IObservable implementation would simply respond to additions by emitting them to anyone listening without further notice.

bar.Subscribe(bar => // doStuff with bar)

Oh, and don't get me wrong, it's not really that simple - but the gist of it is this: Rx provides a pattern for solving concurrency-issues in a short and elegant way.

I'll be examining Rx in detail in the next couple of weeks

A few links






Comments are closed

Copyright © 2020 - Design by Francis