Rash thoughts about .NET, C#, F# and Dynamics NAV.


"Every solution will only lead to new problems."

Monday, 21. December 2009


Observing asynchronous downloads with F# and the Reactive Extensions for .NET

Filed under: F# — Steffen Forkmann at 18:01 Uhr

In one of my lasts posts I showed how we can transform some of the operators from the Reactive Framework for an easier use in F#. This time I will demonstrate how we can use this API and asynchronous workflows to download a couple of websites asynchronous and in parallel.

First of all we create a synchronous download function:

let SyncHttp (url:string) =

  let request = WebRequest.Create url

  let response = request.GetResponse()

  use stream = response.GetResponseStream()

  use reader = new StreamReader(stream)

  let result = reader.ReadToEnd()

  url,result

In F# we can easily make this function asynchronously:

let AsyncHttp (url:string) =

  async { // using asynchronous workflows

    let request = WebRequest.Create url

    // async call of GetResponse

    let! response = request.AsyncGetResponse()

    use stream = response.GetResponseStream()

    use reader = new StreamReader(stream)

    // async call of ReadToEnd

    let! result = reader.AsyncReadToEnd()

    return url,result} 

Now we use this AsyncHttp function to create a list of async downloads:

/// Async<string * string> list

let downloads =

  ["http://www.google.com";

   "http://www.twitter.com";

   "http://www.nytimes.com/";

   "http://www.navision-blog.de/";

   "http://www.nba.com/"

    |> List.map AsyncHttp

The next step is to convert the list of async calls into a list of observables and to merge this list into a single IObservable. The effect is that whenever one download is completed we will be notified about the result:

/// IObservable<string * string>

let observableDownloads =

  downloads

    |> Seq.map Observable.ofAsync

    |> Observable.merge

 

 

observableDownloads

  |> Observable.subscribe (fun (url,result) ->

       printfn "%A: %d" url result.Length)

 

If you want to learn more about Observable.merge and see the marble diagram you should watch this video.

Tags: , ,

Friday, 27. November 2009


Observing the FileSystem

Filed under: Diverses — Steffen Forkmann at 10:38 Uhr

The following class is a wrapper for the System.IO.FileSystemWatcher and converts the FileSystem events into observables. You need to download and reference the Reactive Extensions for .NET (Rx) to use this code:

public class FileSystemObservable

{

  private readonly FileSystemWatcher _fileSystemWatcher;

 

 

  public FileSystemObservable(string directory,

    string filter, bool includeSubdirectories)

  {

    _fileSystemWatcher =

      new FileSystemWatcher(directory, filter)

        {

          EnableRaisingEvents = true,

          IncludeSubdirectories = includeSubdirectories

        };

 

 

    ChangedFiles =

      Observable.FromEvent<FileSystemEventHandler,

                             FileSystemEventArgs>

      (h => h.Invoke,

       h => _fileSystemWatcher.Changed += h,

       h => _fileSystemWatcher.Changed -= h)

      .Select(x => x.EventArgs);

 

    CreatedFiles =

      Observable.FromEvent<FileSystemEventHandler,

                             FileSystemEventArgs>

      (h => h.Invoke,

       h => _fileSystemWatcher.Created += h,

       h => _fileSystemWatcher.Created -= h)

      .Select(x => x.EventArgs);

 

    DeletedFiles =

      Observable.FromEvent<FileSystemEventHandler,

                             FileSystemEventArgs>

      (h => h.Invoke,

       h => _fileSystemWatcher.Deleted += h,

       h => _fileSystemWatcher.Deleted -= h)

      .Select(x => x.EventArgs);

 

    RenamedFiles =

      Observable.FromEvent<RenamedEventHandler,

                             RenamedEventArgs>

      (h => h.Invoke,

       h => _fileSystemWatcher.Renamed += h,

       h => _fileSystemWatcher.Renamed -= h)

      .Select(x => x.EventArgs);

 

    Errors =

      Observable.FromEvent<ErrorEventHandler, ErrorEventArgs>

      (h => h.Invoke,

       h => _fileSystemWatcher.Error += h,

       h => _fileSystemWatcher.Error -= h)

      .Select(x => x.EventArgs);

  }

 

  /// <summary>

  /// Gets or sets the errors.

  /// </summary>

  /// <value>The errors.</value>

  public IObservable<ErrorEventArgs> Errors

           { get; private set; }

 

  /// <summary>

  /// Gets the changed files.

  /// </summary>

  /// <value>The changed files.</value>

  public IObservable<FileSystemEventArgs> ChangedFiles

           { get; private set; }

 

  /// <summary>

  /// Gets the created files.

  /// </summary>

  /// <value>The created files.</value>

  public IObservable<FileSystemEventArgs> CreatedFiles

           { get; private set; }

 

  /// <summary>

  /// Gets the deleted files.

  /// </summary>

  /// <value>The deleted files.</value>

  public IObservable<FileSystemEventArgs> DeletedFiles

           { get; private set; }

 

  /// <summary>

  /// Gets the renamed files.

  /// </summary>

  /// <value>The renamed files.</value>

  public IObservable<RenamedEventArgs> RenamedFiles

           { get; private set; }

}

Now we can use the observable and can easily create meta-events:

IDisposable writer =

    new FileSystemObservable(@"d:\Test\", "*.*", false)

        .CreatedFiles

        .Where(x => (new FileInfo(x.FullPath)).Length > 0)

          // … you can do much more with the combinators

        .Select(x => x.Name)

        .Subscribe(Console.WriteLine);

Tags: ,

Thursday, 22. October 2009


IObservable/IObserver – Using the Reactive Framework with F# – part II

Filed under: F# — Steffen Forkmann at 17:23 Uhr

In the last article I showed how to filter and combine events via the Reactive Framework and how to deal with errors. This time we will create our own observables.

I got the idea for this sample from a very good Expert to Expert video (“Reactive Framework (Rx) Under the Hood”) with Erik Meijer and Wes Dyer.

We want to implement an asynchronous dictionary lookup. Whenever sometimes types something into the Textbox, our application starts looking into a dictionary and searches for words starting with the given prefix.

Async Dict

Let’s start with generating this simple form:

open System.Windows.Forms

 

let form = new Form(Visible=true, Text="Async dict",

                       TopMost=true)

let textBox1 =

  new TextBox(

    Location = new System.Drawing.Point(12, 12),

    Size = new System.Drawing.Size(260, 20))

form.Controls.Add textBox1

 

let resultsBox =

  new ListBox(

    Location = new System.Drawing.Point(13, 39),

    Size = new System.Drawing.Size(259, 211))

form.Controls.Add resultsBox

 

// create a list with common words

// this might be very large

let data =

  ["hell"; "Hello"; "Halle"; "Html";

      "Bonn"; "Bonjour"; "Steffen"

 

// create observable for text changes

let textChanged =

  textBox1.TextChanged

    |> Observable.map (fun _ -> textBox1.Text)

Now we have to define a base class for observables. This class will help our dictionary lookup function to use the IObservable<T> interface:

module Observable

 

/// A Observable base class which notifies

/// all observers in parallel 

type ‘a Observable() =

  let mutable observers = []

 

  /// Notifies all observers in parallel about the new value

  let notifyObservers f =

    observers

      |> Seq.map (fun (observer:IObserver<‘a>) –>

                         async { return f observer})

      |> Async.Parallel

      |> Async.RunSynchronously

      |> ignore

 

  interface IObservable<‘a> with

    member observable.Subscribe(observer)  =

      // subscribe observer

      observers <- observer :: observers

      // create Disposable to unsubscribe observer later

      {new IDisposable with

         member this.Dispose() =

            observers <-

                observers |> List.filter ((<>) observer)}

 

  /// Notifies all observers in parallel about the new value

  member observable.OnNext value =

   notifyObservers (fun observer -> observer.OnNext value)

 

  /// Notifies all observers in parallel about the error

  /// and finishes all observations

  member observable.OnError error =

    notifyObservers (fun observer -> observer.OnError error)

    observers <- []

 

  /// Notifies all observers in parallel about the completion

  /// and finishes all observations

  member observable.Completed =

    notifyObservers (fun observer -> observer.OnCompleted())

    observers <- [] 

I hope there will be a similar base class in the .NET Framework 4.0 RTM.

Now we are able to use this class and to build our dictionary lookup observable:

let wordsObservable = new Observable.Observable<_>()

 

let findWords prefix =

  if prefix <> "" then

    let prefix’ = prefix.ToUpper()

    for word in data do

      if word.ToUpper().StartsWith(prefix’) then

        wordsObservable.OnNext (prefix,word)

The last step is to create observers and subscribe them to the observables:

// create observers

let clean =

  textChanged

    |> Observable.subscribe (fun _ -> resultsBox.Items.Clear())

 

let searchForWords =

  // Every time the text changes

  // we start our wordsObservable to push words

  textChanged

    |> Observable.subscribe (fun text -> findWords text)

 

let wordFound =

  // subscribe to the "word found"-event

  wordsObservable   

    |> Observable.subscribe

        (fun (_,word) -> resultsBox.Items.Add word |> ignore)

Tags: , , ,

Tuesday, 20. October 2009


IObservable/IObserver – Using the Reactive Framework with F#

Filed under: F# — Steffen Forkmann at 16:50 Uhr

One of the nice new features in .NET 4.0 beta 2 is the IObservable<T>/IObserver<T> support from the Reactive Framework (“Rx Framework” or sometimes “LinqToEvents”). It is a really powerful way to use reactive programming in .NET and especially in F# developed by Erik Meijer and his team.

If you want to see some of the beautiful math behind the Reactive Framework you should definitely watch this Expert to Expert video on Channel 9. You can see Brian Beckman and Erik Meijer showing that IObservable<T> is the mathematical dual of IEnumerable<T>.

What can I do with the Rx Framework?

Consider this small sample (it is taken from Matthew Podwysocki’s blog): We want to get notified whenever a user clicks on our form and moves the mouse within a special area (XPos and YPos smaller than 100px).

First of all we define our observable by merging and filtering .NET events:

open System.Windows.Forms

 

let form = new Form(Visible=true, TopMost=true)

 

/// Creates two observables

///  – left is triggered when the left mouse button is down

         and the mouse is in the area (x < 100 && y < 100)

///  – right is triggered when the right mouse button is down

         and the mouse is in the area (x < 100 && y < 100)

let left,right =

  form.MouseDown

    |> Observable.merge form.MouseMove

    |> Observable.filter (fun args -> 

          args.Button = MouseButtons.Left ||

          args.Button = MouseButtons.Right)

    |> Observable.map (fun args -> args.X, args.Y, args.Button)

    |> Observable.filter (fun (x,y,b) -> x < 100 && y < 100)

    |> Observable.partition (fun (_,_,button) -> button = MouseButtons.Left)

Now it’s easy to subscribe a function to this observable:

let leftSubscription =

  left

    |> Observable.subscribe

         (fun (x,y,_) -> printfn "Left (%d,%d)" x y)

If we want to unsubscribe we only have to dispose the object:

// unsubscribe

leftSubscription.Dispose()

We couldn’t unsubscribe this way with “classic” .NET events. Remember the –= operator in C# doesn’t work with lambda expressions.

Exception handling

We have seen an easy way to subscribe and unsubscribe to complicated observables but what should we do if an error occurs? As far as I know this case is not implemented for F# at the moment, but we can easily add this functionality:

module Observable

 

/// Creates an observer with the given functions

let createObserver next error completed =

    {new System.IObserver<_> with

        member this.OnCompleted() = completed()

        member this.OnError(e) = error e

        member this.OnNext(args) = next args}

 

/// Subscribes an observer with the given functions

///   param1: OnNext        (T -> unit)

///   param2: OnError       (Exception -> unit)

///   param3: OnCompleted   (unit -> unit)

///   param4: observable

let subscribeComplete next error completed (observable:System.IObservable<_>) =

  createObserver next error completed

    |> observable.Subscribe

Now we are able to create a complete IObserver<T> object and register the 3 functions:

let rightSubscription =

  right

    |> Observable.subscribeComplete

         (fun (x,y,_) -> printfn "Right (%d,%d)" x y)

         (fun error   -> printfn "Error: %s" error.Message)

         (fun ()      -> printfn "Ready.")

Tags: , ,