Monday, 21. December 2009

Observing asynchronous downloads with F# and the Reactive Extensions for .NET

In one of my lasts posts I showed how we can transform some of the operators from the Reactive Framework for an easier use in F#. This time I will demonstrate how we can use this API and asynchronous workflows to download a couple of websites asynchronous and in parallel.

First of all we create a synchronous download function:

let SyncHttp (url:string) =

  let request = WebRequest.Create url

  let response = request.GetResponse()

  use stream = response.GetResponseStream()

  use reader = new StreamReader(stream)

  let result = reader.ReadToEnd()


In F# we can easily make this function asynchronously:

let AsyncHttp (url:string) =

  async { // using asynchronous workflows

    let request = WebRequest.Create url

    // async call of GetResponse

    let! response = request.AsyncGetResponse()

    use stream = response.GetResponseStream()

    use reader = new StreamReader(stream)

    // async call of ReadToEnd

    let! result = reader.AsyncReadToEnd()

    return url,result} 

Now we use this AsyncHttp function to create a list of async downloads:

/// Async<string * string> list

let downloads =






    |> List.map AsyncHttp

The next step is to convert the list of async calls into a list of observables and to merge this list into a single IObservable. The effect is that whenever one download is completed we will be notified about the result:

/// IObservable<string * string>

let observableDownloads =


    |> Seq.map Observable.ofAsync

    |> Observable.merge




  |> Observable.subscribe (fun (url,result) ->

       printfn "%A: %d" url result.Length)


If you want to learn more about Observable.merge and see the marble diagram you should watch this video.

