In the last article I showed how to filter and combine events via the Reactive Framework and how to deal with errors. This time we will create our own observables.
I got the idea for this sample from a very good Expert to Expert video (“Reactive Framework (Rx) Under the Hood”) with Erik Meijer and Wes Dyer.
We want to implement an asynchronous dictionary lookup. Whenever sometimes types something into the Textbox, our application starts looking into a dictionary and searches for words starting with the given prefix.
Let’s start with generating this simple form:
open System.Windows.Forms
let form = new Form(Visible=true, Text="Async dict",
TopMost=true)
let textBox1 =
new TextBox(
Location = new System.Drawing.Point(12, 12),
Size = new System.Drawing.Size(260, 20))
form.Controls.Add textBox1
let resultsBox =
new ListBox(
Location = new System.Drawing.Point(13, 39),
Size = new System.Drawing.Size(259, 211))
form.Controls.Add resultsBox
// create a list with common words
// this might be very large
let data =
["hell"; "Hello"; "Halle"; "Html";
"Bonn"; "Bonjour"; "Steffen"]
// create observable for text changes
let textChanged =
textBox1.TextChanged
|> Observable.map (fun _ -> textBox1.Text)
Now we have to define a base class for observables. This class will help our dictionary lookup function to use the IObservable<T> interface:
module Observable
/// A Observable base class which notifies
/// all observers in parallel
type ‘a Observable() =
let mutable observers = []
/// Notifies all observers in parallel about the new value
let notifyObservers f =
observers
|> Seq.map (fun (observer:IObserver<‘a>) –>
async { return f observer})
|> Async.Parallel
|> Async.RunSynchronously
|> ignore
interface IObservable<‘a> with
member observable.Subscribe(observer) =
// subscribe observer
observers <- observer :: observers
// create Disposable to unsubscribe observer later
{new IDisposable with
member this.Dispose() =
observers <-
observers |> List.filter ((<>) observer)}
/// Notifies all observers in parallel about the new value
member observable.OnNext value =
notifyObservers (fun observer -> observer.OnNext value)
/// Notifies all observers in parallel about the error
/// and finishes all observations
member observable.OnError error =
notifyObservers (fun observer -> observer.OnError error)
observers <- []
/// Notifies all observers in parallel about the completion
/// and finishes all observations
member observable.Completed =
notifyObservers (fun observer -> observer.OnCompleted())
observers <- []
I hope there will be a similar base class in the .NET Framework 4.0 RTM.
Now we are able to use this class and to build our dictionary lookup observable:
let wordsObservable = new Observable.Observable<_>()
let findWords prefix =
if prefix <> "" then
let prefix’ = prefix.ToUpper()
for word in data do
if word.ToUpper().StartsWith(prefix’) then
wordsObservable.OnNext (prefix,word)
The last step is to create observers and subscribe them to the observables:
// create observers
let clean =
textChanged
|> Observable.subscribe (fun _ -> resultsBox.Items.Clear())
let searchForWords =
// Every time the text changes
// we start our wordsObservable to push words
textChanged
|> Observable.subscribe (fun text -> findWords text)
let wordFound =
// subscribe to the "word found"-event
wordsObservable
|> Observable.subscribe
(fun (_,word) -> resultsBox.Items.Add word |> ignore)