One of the nice new features in .NET 4.0 beta 2 is the IObservable<T>/IObserver<T> support from the Reactive Framework (“Rx Framework” or sometimes “LinqToEvents”). It is a really powerful way to use reactive programming in .NET and especially in F# developed by Erik Meijer and his team.
If you want to see some of the beautiful math behind the Reactive Framework you should definitely watch this Expert to Expert video on Channel 9. You can see Brian Beckman and Erik Meijer showing that IObservable<T> is the mathematical dual of IEnumerable<T>.
What can I do with the Rx Framework?
Consider this small sample (it is taken from Matthew Podwysocki’s blog): We want to get notified whenever a user clicks on our form and moves the mouse within a special area (XPos and YPos smaller than 100px).
First of all we define our observable by merging and filtering .NET events:
open System.Windows.Forms
let form = new Form(Visible=true, TopMost=true)
/// Creates two observables
/// – left is triggered when the left mouse button is down
and the mouse is in the area (x < 100 && y < 100)
/// – right is triggered when the right mouse button is down
and the mouse is in the area (x < 100 && y < 100)
let left,right =
|> Observable.merge form.MouseMove
|> Observable.filter (fun args ->
args.Button = MouseButtons.Left ||
args.Button = MouseButtons.Right)
|> (fun args -> args.X, args.Y, args.Button)
|> Observable.filter (fun (x,y,b) -> x < 100 && y < 100)
|> Observable.partition (fun (_,_,button) -> button = MouseButtons.Left)
Now it’s easy to subscribe a function to this observable:
let leftSubscription =
|> Observable.subscribe
(fun (x,y,_) -> printfn "Left (%d,%d)" x y)
If we want to unsubscribe we only have to dispose the object:
// unsubscribe
We couldn’t unsubscribe this way with “classic” .NET events. Remember the –= operator in C# doesn’t work with lambda expressions.
Exception handling
We have seen an easy way to subscribe and unsubscribe to complicated observables but what should we do if an error occurs? As far as I know this case is not implemented for F# at the moment, but we can easily add this functionality:
module Observable
/// Creates an observer with the given functions
let createObserver next error completed =
{new System.IObserver<_> with
member this.OnCompleted() = completed()
member this.OnError(e) = error e
member this.OnNext(args) = next args}
/// Subscribes an observer with the given functions
/// param1: OnNext (T -> unit)
/// param2: OnError (Exception -> unit)
/// param3: OnCompleted (unit -> unit)
/// param4: observable
let subscribeComplete next error completed (observable:System.IObservable<_>) =
createObserver next error completed
|> observable.Subscribe
Now we are able to create a complete IObserver<T> object
let rightSubscription =
|> Observable.subscribeComplete
(fun (x,y,_) -> printfn "Right (%d,%d)" x y)
(fun error -> printfn "Error: %s" error.Message)
(fun () -> printfn "Ready.")
