Monday, 23. November 2009

Mapping the Reactive Framework (Rx) operators for F#

The “Reactive Extensions for .NET (Rx)” comes with lot’s of operators for using IObservable<T>. This code mimics the signature of the default F# sequence combinators and allows to use observables like sequences. It is a similar approach like Matthews Podwysocki’s blog post about mapping the IParallelEnumerable.

I will update this post from time to time to include more of the operators.

  • Update: 25.11.2009 – new operators mapped
  • Update: 21.11.2009 – Updated to new Rx release

module RxExtensions.Observable


open System.Linq

open System

open System.Threading

open System.Windows.Threading


type ‘a observable = IObservable<‘a>

type ‘a observer = IObserver<‘a>


/// converts a lambda in a System.Action

let asAction f = new System.Action(f)


/// System.Action whichs does nothing

let doNothing = asAction (fun () -> ())


/// Creates an observer

let createObserver next error completed =

  {new System.IObserver<_> with

      member this.OnCompleted() = completed()

      member this.OnError(e) = error e

      member this.OnNext(args) = next args}


/// Creates a new observable

let create f =

  Observable.Create<_>(fun x ->

    f x



/// Creates a observable from a async

let ofAsync async =


    (fun obs ->




/// Gets a dispatcher Schdeuler for the current dispatcher

let getDispatcherScheduler _ =

  new DispatcherScheduler(Dispatcher.CurrentDispatcher)


/// Generates an observable from an IEvent

let fromEvent (event:IEvent<_,_>) = create (fun x -> event.Add x.OnNext)


/// Generates an empty observable

let empty<‘a> = Observable.Empty<‘a>()


/// Takes the head of the elements

let head = Observable.First


/// Merges the two observables

let mergeWith obs1 obs2 = Observable.Merge(obs2, obs1)


/// Merges all observables

let mergeAll (observables:IObservable<IObservable<‘a>>) =

  Observable.Merge observables


/// Merges all observables

let merge (observables:(IObservable<‘a>) seq) =

  Observable.Merge observables


/// Creates a range as an observable

let range start count = Observable.Range(start, count)


/// Converts a seq in an observable

let toObservable (seq: ‘a seq) = Observable.ToObservable seq


/// Converts a observable in a seq

let toEnumerable = Observable.ToEnumerable


/// Subscribes to the Observable with all 3 callbacks

let subscribeComplete next error completed (observable: ‘a observable) =


     (fun x -> next x),

     (fun e -> error e),

     (fun () -> completed()))


/// Subscribes to the Observable with a

/// next and an error-function

let subscribeWithError next error observable =

  subscribeComplete next error (fun () -> ()) observable


/// Subscribes to the Observable with just a next-function

let subscribe next observable =

  subscribeWithError next ignore observable


/// throttles the observable for the given interval

let throttle interval observable =



/// throttles the observable scheduled on the current dispatcher

let throttleOnCurrentDispatcher interval observable =




/// samples the observable at the given interval

let sample interval observable =



/// samples the observable at the given interval

/// scheduled on the current dispatcher

let sampleOnCurrentDispatcher interval observable =




/// returns the observable sequence that reacts first.

let takeFirstOf2Reactions obs1 obs2 =



/// returns the observable sequence that reacts first.

let amb (obs: IObservable<‘a> seq) =

  Observable.Amb obs  


/// returns the observable sequence that reacts first.

let takeFirstReaction (obs: IObservable<‘a> seq) =

  Observable.Amb obs  


/// Matches when both observable sequences

/// have an available value.

let both obs1 obs2 = Observable.And(obs1,obs2)


/// Merges two observable sequences

/// into one observable sequence.

let zip obs1 obs2 =   

   Observable.Zip(obs1, obs2, Func<_,_,_>(fun a b -> a, b))


/// Merges two observable sequences into one observable sequence

/// whenever one of the observable sequences has a new value.

///    ==> More results than zip

let combineLatest obs1 obs2 =   


     obs1, obs2, Func<_,_,_>(fun a b -> a, b))    


/// Concats the two observables to one observable

let concat observable =



    Func<_,_>(fun (x:IObservable<‘a>) -> x))


/// maps the given observable with the given function

let map f observable =



/// maps the given observable with the given function

let mapi f observable =

  Observable.Select(observable,Func<_,_,_>(fun x i ->f i x))  


/// Filters all elements where the given predicate is satified

let filter f observable =

  Observable.Where(observable, Func<_,_>(f))


/// Splits the observable into two observables

/// Containing the elements for which the predicate returns

/// true and false respectively

let partition predicate observable =

  filter predicate observable,

  filter (predicate >> not) observable


/// Skips n elements

let skip n observable = Observable.Skip(observable, n)


/// Skips elements while the predicate is satisfied

let skipWhile f observable =

  Observable.SkipWhile(observable, Func<_,_>(f))


/// Runs all observable sequences in parallel

/// and combines their first values.

let forkJoin (observables: (‘a observable) seq) =

  Observable.ForkJoin observables


/// Counts the elements

let length = Observable.Count


/// Takes n elements

let take n observable =

  Observable.Take(observable, n)  


/// Determines whether the given observable is empty 

let isEmpty observable = Observable.IsEmpty observable


/// Determines whether the given observable is not empty 

let isNotEmpty observable = not (Observable.IsEmpty observable)


/// Determines whether an observable sequence

/// contains a specified value

/// which satisfies the given predicate

let exists predicate observable =


    |> skipWhile (predicate >> not)

    |> isNotEmpty


/// Continues an observable sequence that is terminated

/// by an exception with the next observable sequence.

let catch (newObservable:IObservable<‘a>) failingObservable =



/// Takes elements while the predicate is satisfied

let takeWhile f observable =

  Observable.TakeWhile(observable, Func<_,_>(f))


/// Iterates through the observable

/// and performs the given side-effect

let perform f observable =

  Observable.Do(observable,fun x -> f x)


/// Invokes finallyAction after source observable

/// sequence terminates normally or by an exception.

let performFinally f observable =

  Observable.Finally(observable,fun _ -> f())


/// Folds the observable

let fold f seed observable =

  Observable.Aggregate(observable, seed, Func<_,_,_>(f))  


/// Retruns an observable from a async pattern 

let fromAsync beginF endF =


     Func<_,_,_>(fun x y -> beginF(x,y)),

       (fun x -> endF x)).Invoke()


/// Runs all observable sequences in parallel

/// and combines their first values.

let subscribeAll next observables =

  observables |> Seq.map (subscribe next) |> Seq.toList     


type IObservable<‘a> with

  /// Subscribes to the Observable with just a next-function

  member this.Subscribe(next) =

    subscribe next this


  /// Subscribes to the Observable with a next

  /// and an error-function

  member this.Subscribe(next,error) =

    subscribeWithError next error this


  /// Subscribes to the Observable with all 3 callbacks

  member this.Subscribe(next,error,completed) =

    subscribeComplete next error completed this


open System.Net


type WebRequest with

  member this.GetRequestStreamAsync() =





  member this.GetResponseAsync() =





  member this.GetResponseStreamAsync() =





type Async<‘a> with

  member this.ToObservable() = ofAsync this

  1. […] I showed how we can map some of the Rx operators to an API which looks more like the F# base classes. Today I wanted to use these mapped operators in a WPF-application written in […]

    Pingback by Generating an IObservable<T> from an IEvent in F# » Rash thoughts about .NET, C#, F# and Dynamics NAV. — Tuesday, 24. November 2009 um 12:39 Uhr

  2. […] one of my lasts posts I showed how we can transform some of the operators from the Reactive Framework for an easier use in F#. This time I will demonstrate how we can use this API and asynchronous workflows to download a […]

    Pingback by Observing asynchronous downloads with F# and the Reactive Extensions for .NET » Rash thoughts about .NET, C#, F# and Dynamics NAV. — Monday, 21. December 2009 um 18:01 Uhr

  3. […] FSharp.Reactive (this is mostly from Steffen Forkmann) […]

    Pingback by A Diversion | Wizards of Smart — Friday, 23. April 2010 um 16:51 Uhr

  4. As of .NET 4 RTM, you can replace your implementation of fromEvent with this much more efficient version:

    let inline fromEvent (event:IEvent) =
    event :> IObservable

    Comment by Joel — Tuesday, 27. April 2010 um 22:47 Uhr

  5. Bah, it ate my angle brackets.

    Comment by Joel — Tuesday, 27. April 2010 um 22:48 Uhr

  6. Hi,

    What is the link between this and the Fsharpx extensions ?
    I imagine this is outdated.

    How would you change it ?

    Comment by nicolas — Thursday, 29. March 2012 um 9:48 Uhr

  7. Hi nicloas,

    most of the stuff is included in https://github.com/panesofglass/FSharp.Reactive


    Comment by Steffen Forkmann — Thursday, 29. March 2012 um 11:39 Uhr

