Rash thoughts about .NET, C#, F# and Dynamics NAV.


"Every solution will only lead to new problems."

Monday, 23. November 2009


Mapping the Reactive Framework (Rx) operators for F#

Filed under: F# — Steffen Forkmann at 13:35 Uhr

The “Reactive Extensions for .NET (Rx)” comes with lot’s of operators for using IObservable<T>. This code mimics the signature of the default F# sequence combinators and allows to use observables like sequences. It is a similar approach like Matthews Podwysocki’s blog post about mapping the IParallelEnumerable.

I will update this post from time to time to include more of the operators.

  • Update: 25.11.2009 – new operators mapped
  • Update: 21.11.2009 – Updated to new Rx release

module RxExtensions.Observable

 

open System.Linq

open System

open System.Threading

open System.Windows.Threading

 

type ‘a observable = IObservable<‘a>

type ‘a observer = IObserver<‘a>

 

/// converts a lambda in a System.Action

let asAction f = new System.Action(f)

 

/// System.Action whichs does nothing

let doNothing = asAction (fun () -> ())

 

/// Creates an observer

let createObserver next error completed =

  {new System.IObserver<_> with

      member this.OnCompleted() = completed()

      member this.OnError(e) = error e

      member this.OnNext(args) = next args}

 

/// Creates a new observable

let create f =

  Observable.Create<_>(fun x ->

    f x

    doNothing)  

 

/// Creates a observable from a async

let ofAsync async =

  create

    (fun obs ->

       Async.StartWithContinuations

         (async,obs.OnNext,obs.OnError,obs.OnError))

 

/// Gets a dispatcher Schdeuler for the current dispatcher

let getDispatcherScheduler _ =

  new DispatcherScheduler(Dispatcher.CurrentDispatcher)

 

/// Generates an observable from an IEvent

let fromEvent (event:IEvent<_,_>) = create (fun x -> event.Add x.OnNext)

 

/// Generates an empty observable

let empty<‘a> = Observable.Empty<‘a>()

 

/// Takes the head of the elements

let head = Observable.First

 

/// Merges the two observables

let mergeWith obs1 obs2 = Observable.Merge(obs2, obs1)

 

/// Merges all observables

let mergeAll (observables:IObservable<IObservable<‘a>>) =

  Observable.Merge observables

 

/// Merges all observables

let merge (observables:(IObservable<‘a>) seq) =

  Observable.Merge observables

 

/// Creates a range as an observable

let range start count = Observable.Range(start, count)

 

/// Converts a seq in an observable

let toObservable (seq: ‘a seq) = Observable.ToObservable seq

 

/// Converts a observable in a seq

let toEnumerable = Observable.ToEnumerable

 

/// Subscribes to the Observable with all 3 callbacks

let subscribeComplete next error completed (observable: ‘a observable) =

   observable.Subscribe(

     (fun x -> next x),

     (fun e -> error e),

     (fun () -> completed()))

 

/// Subscribes to the Observable with a

/// next and an error-function

let subscribeWithError next error observable =

  subscribeComplete next error (fun () -> ()) observable

 

/// Subscribes to the Observable with just a next-function

let subscribe next observable =

  subscribeWithError next ignore observable

 

/// throttles the observable for the given interval

let throttle interval observable =

  Observable.Throttle(observable,interval)   

 

/// throttles the observable scheduled on the current dispatcher

let throttleOnCurrentDispatcher interval observable =

  Observable.Throttle(

     observable,getDispatcherScheduler(),interval)

 

/// samples the observable at the given interval

let sample interval observable =

  Observable.Sample(observable,interval)   

 

/// samples the observable at the given interval

/// scheduled on the current dispatcher

let sampleOnCurrentDispatcher interval observable =

  Observable.Sample(

    observable,getDispatcherScheduler(),interval)

 

/// returns the observable sequence that reacts first.

let takeFirstOf2Reactions obs1 obs2 =

  Observable.Amb(obs1,obs2)

 

/// returns the observable sequence that reacts first.

let amb (obs: IObservable<‘a> seq) =

  Observable.Amb obs  

 

/// returns the observable sequence that reacts first.

let takeFirstReaction (obs: IObservable<‘a> seq) =

  Observable.Amb obs  

 

/// Matches when both observable sequences

/// have an available value.

let both obs1 obs2 = Observable.And(obs1,obs2)

 

/// Merges two observable sequences

/// into one observable sequence.

let zip obs1 obs2 =   

   Observable.Zip(obs1, obs2, Func<_,_,_>(fun a b -> a, b))

 

/// Merges two observable sequences into one observable sequence

/// whenever one of the observable sequences has a new value.

///    ==> More results than zip

let combineLatest obs1 obs2 =   

   Observable.CombineLatest(

     obs1, obs2, Func<_,_,_>(fun a b -> a, b))    

 

/// Concats the two observables to one observable

let concat observable =

  Observable.SelectMany(

    observable,

    Func<_,_>(fun (x:IObservable<‘a>) -> x))

 

/// maps the given observable with the given function

let map f observable =

  Observable.Select(observable,Func<_,_>(f))  

 

/// maps the given observable with the given function

let mapi f observable =

  Observable.Select(observable,Func<_,_,_>(fun x i ->f i x))  

 

/// Filters all elements where the given predicate is satified

let filter f observable =

  Observable.Where(observable, Func<_,_>(f))

 

/// Splits the observable into two observables

/// Containing the elements for which the predicate returns

/// true and false respectively

let partition predicate observable =

  filter predicate observable,

  filter (predicate >> not) observable

 

/// Skips n elements

let skip n observable = Observable.Skip(observable, n)

 

/// Skips elements while the predicate is satisfied

let skipWhile f observable =

  Observable.SkipWhile(observable, Func<_,_>(f))

 

/// Runs all observable sequences in parallel

/// and combines their first values.

let forkJoin (observables: (‘a observable) seq) =

  Observable.ForkJoin observables

 

/// Counts the elements

let length = Observable.Count

 

/// Takes n elements

let take n observable =

  Observable.Take(observable, n)  

 

/// Determines whether the given observable is empty 

let isEmpty observable = Observable.IsEmpty observable

 

/// Determines whether the given observable is not empty 

let isNotEmpty observable = not (Observable.IsEmpty observable)

 

/// Determines whether an observable sequence

/// contains a specified value

/// which satisfies the given predicate

let exists predicate observable =

  observable

    |> skipWhile (predicate >> not)

    |> isNotEmpty

 

/// Continues an observable sequence that is terminated

/// by an exception with the next observable sequence.

let catch (newObservable:IObservable<‘a>) failingObservable =

  Observable.Catch(failingObservable,newObservable) 

 

/// Takes elements while the predicate is satisfied

let takeWhile f observable =

  Observable.TakeWhile(observable, Func<_,_>(f))

 

/// Iterates through the observable

/// and performs the given side-effect

let perform f observable =

  Observable.Do(observable,fun x -> f x)

 

/// Invokes finallyAction after source observable

/// sequence terminates normally or by an exception.

let performFinally f observable =

  Observable.Finally(observable,fun _ -> f())

 

/// Folds the observable

let fold f seed observable =

  Observable.Aggregate(observable, seed, Func<_,_,_>(f))  

 

/// Retruns an observable from a async pattern 

let fromAsync beginF endF =

   Observable.FromAsyncPattern<_>(

     Func<_,_,_>(fun x y -> beginF(x,y)),

       (fun x -> endF x)).Invoke()

 

/// Runs all observable sequences in parallel

/// and combines their first values.

let subscribeAll next observables =

  observables |> Seq.map (subscribe next) |> Seq.toList     

 

type IObservable<‘a> with

  /// Subscribes to the Observable with just a next-function

  member this.Subscribe(next) =

    subscribe next this

 

  /// Subscribes to the Observable with a next

  /// and an error-function

  member this.Subscribe(next,error) =

    subscribeWithError next error this

 

  /// Subscribes to the Observable with all 3 callbacks

  member this.Subscribe(next,error,completed) =

    subscribeComplete next error completed this

 

open System.Net

 

type WebRequest with

  member this.GetRequestStreamAsync() =

    fromAsync

     this.BeginGetRequestStream

     this.EndGetRequestStream

 

  member this.GetResponseAsync() =

    fromAsync

      this.BeginGetResponse

      this.EndGetResponse

 

  member this.GetResponseStreamAsync() =

    fromAsync

      this.BeginGetRequestStream

      this.EndGetRequestStream

 

type Async<‘a> with

  member this.ToObservable() = ofAsync this

Tags: , , ,

Thursday, 22. October 2009


IObservable/IObserver – Using the Reactive Framework with F# – part II

Filed under: F# — Steffen Forkmann at 17:23 Uhr

In the last article I showed how to filter and combine events via the Reactive Framework and how to deal with errors. This time we will create our own observables.

I got the idea for this sample from a very good Expert to Expert video (“Reactive Framework (Rx) Under the Hood”) with Erik Meijer and Wes Dyer.

We want to implement an asynchronous dictionary lookup. Whenever sometimes types something into the Textbox, our application starts looking into a dictionary and searches for words starting with the given prefix.

Async Dict

Let’s start with generating this simple form:

open System.Windows.Forms

 

let form = new Form(Visible=true, Text="Async dict",

                       TopMost=true)

let textBox1 =

  new TextBox(

    Location = new System.Drawing.Point(12, 12),

    Size = new System.Drawing.Size(260, 20))

form.Controls.Add textBox1

 

let resultsBox =

  new ListBox(

    Location = new System.Drawing.Point(13, 39),

    Size = new System.Drawing.Size(259, 211))

form.Controls.Add resultsBox

 

// create a list with common words

// this might be very large

let data =

  ["hell"; "Hello"; "Halle"; "Html";

      "Bonn"; "Bonjour"; "Steffen"

 

// create observable for text changes

let textChanged =

  textBox1.TextChanged

    |> Observable.map (fun _ -> textBox1.Text)

Now we have to define a base class for observables. This class will help our dictionary lookup function to use the IObservable<T> interface:

module Observable

 

/// A Observable base class which notifies

/// all observers in parallel 

type ‘a Observable() =

  let mutable observers = []

 

  /// Notifies all observers in parallel about the new value

  let notifyObservers f =

    observers

      |> Seq.map (fun (observer:IObserver<‘a>) –>

                         async { return f observer})

      |> Async.Parallel

      |> Async.RunSynchronously

      |> ignore

 

  interface IObservable<‘a> with

    member observable.Subscribe(observer)  =

      // subscribe observer

      observers <- observer :: observers

      // create Disposable to unsubscribe observer later

      {new IDisposable with

         member this.Dispose() =

            observers <-

                observers |> List.filter ((<>) observer)}

 

  /// Notifies all observers in parallel about the new value

  member observable.OnNext value =

   notifyObservers (fun observer -> observer.OnNext value)

 

  /// Notifies all observers in parallel about the error

  /// and finishes all observations

  member observable.OnError error =

    notifyObservers (fun observer -> observer.OnError error)

    observers <- []

 

  /// Notifies all observers in parallel about the completion

  /// and finishes all observations

  member observable.Completed =

    notifyObservers (fun observer -> observer.OnCompleted())

    observers <- [] 

I hope there will be a similar base class in the .NET Framework 4.0 RTM.

Now we are able to use this class and to build our dictionary lookup observable:

let wordsObservable = new Observable.Observable<_>()

 

let findWords prefix =

  if prefix <> "" then

    let prefix’ = prefix.ToUpper()

    for word in data do

      if word.ToUpper().StartsWith(prefix’) then

        wordsObservable.OnNext (prefix,word)

The last step is to create observers and subscribe them to the observables:

// create observers

let clean =

  textChanged

    |> Observable.subscribe (fun _ -> resultsBox.Items.Clear())

 

let searchForWords =

  // Every time the text changes

  // we start our wordsObservable to push words

  textChanged

    |> Observable.subscribe (fun text -> findWords text)

 

let wordFound =

  // subscribe to the "word found"-event

  wordsObservable   

    |> Observable.subscribe

        (fun (_,word) -> resultsBox.Items.Add word |> ignore)

Tags: , , ,