Rash thoughts about .NET, C#, F# and Dynamics NAV.


"Every solution will only lead to new problems."

Thursday, 22. October 2009


IObservable/IObserver – Using the Reactive Framework with F# – part II

Filed under: F# — Steffen Forkmann at 17:23 Uhr

In the last article I showed how to filter and combine events via the Reactive Framework and how to deal with errors. This time we will create our own observables.

I got the idea for this sample from a very good Expert to Expert video (“Reactive Framework (Rx) Under the Hood”) with Erik Meijer and Wes Dyer.

We want to implement an asynchronous dictionary lookup. Whenever sometimes types something into the Textbox, our application starts looking into a dictionary and searches for words starting with the given prefix.

Async Dict

Let’s start with generating this simple form:

open System.Windows.Forms

 

let form = new Form(Visible=true, Text="Async dict",

                       TopMost=true)

let textBox1 =

  new TextBox(

    Location = new System.Drawing.Point(12, 12),

    Size = new System.Drawing.Size(260, 20))

form.Controls.Add textBox1

 

let resultsBox =

  new ListBox(

    Location = new System.Drawing.Point(13, 39),

    Size = new System.Drawing.Size(259, 211))

form.Controls.Add resultsBox

 

// create a list with common words

// this might be very large

let data =

  ["hell"; "Hello"; "Halle"; "Html";

      "Bonn"; "Bonjour"; "Steffen"

 

// create observable for text changes

let textChanged =

  textBox1.TextChanged

    |> Observable.map (fun _ -> textBox1.Text)

Now we have to define a base class for observables. This class will help our dictionary lookup function to use the IObservable<T> interface:

module Observable

 

/// A Observable base class which notifies

/// all observers in parallel 

type ‘a Observable() =

  let mutable observers = []

 

  /// Notifies all observers in parallel about the new value

  let notifyObservers f =

    observers

      |> Seq.map (fun (observer:IObserver<‘a>) –>

                         async { return f observer})

      |> Async.Parallel

      |> Async.RunSynchronously

      |> ignore

 

  interface IObservable<‘a> with

    member observable.Subscribe(observer)  =

      // subscribe observer

      observers <- observer :: observers

      // create Disposable to unsubscribe observer later

      {new IDisposable with

         member this.Dispose() =

            observers <-

                observers |> List.filter ((<>) observer)}

 

  /// Notifies all observers in parallel about the new value

  member observable.OnNext value =

   notifyObservers (fun observer -> observer.OnNext value)

 

  /// Notifies all observers in parallel about the error

  /// and finishes all observations

  member observable.OnError error =

    notifyObservers (fun observer -> observer.OnError error)

    observers <- []

 

  /// Notifies all observers in parallel about the completion

  /// and finishes all observations

  member observable.Completed =

    notifyObservers (fun observer -> observer.OnCompleted())

    observers <- [] 

I hope there will be a similar base class in the .NET Framework 4.0 RTM.

Now we are able to use this class and to build our dictionary lookup observable:

let wordsObservable = new Observable.Observable<_>()

 

let findWords prefix =

  if prefix <> "" then

    let prefix’ = prefix.ToUpper()

    for word in data do

      if word.ToUpper().StartsWith(prefix’) then

        wordsObservable.OnNext (prefix,word)

The last step is to create observers and subscribe them to the observables:

// create observers

let clean =

  textChanged

    |> Observable.subscribe (fun _ -> resultsBox.Items.Clear())

 

let searchForWords =

  // Every time the text changes

  // we start our wordsObservable to push words

  textChanged

    |> Observable.subscribe (fun text -> findWords text)

 

let wordFound =

  // subscribe to the "word found"-event

  wordsObservable   

    |> Observable.subscribe

        (fun (_,word) -> resultsBox.Items.Add word |> ignore)

Tags: , , ,

No Comments »

No comments yet.

RSS feed for comments on this post. | TrackBack URI

Leave a comment

XHTML ( You can use these tags): <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong> .