Rash thoughts about .NET, C#, F# and Dynamics NAV.


"Every solution will only lead to new problems."

Monday, 23. November 2009


Mapping the Reactive Framework (Rx) operators for F#

Filed under: F# — Steffen Forkmann at 13:35 Uhr

The “Reactive Extensions for .NET (Rx)” comes with lot’s of operators for using IObservable<T>. This code mimics the signature of the default F# sequence combinators and allows to use observables like sequences. It is a similar approach like Matthews Podwysocki’s blog post about mapping the IParallelEnumerable.

I will update this post from time to time to include more of the operators.

  • Update: 25.11.2009 – new operators mapped
  • Update: 21.11.2009 – Updated to new Rx release

module RxExtensions.Observable

 

open System.Linq

open System

open System.Threading

open System.Windows.Threading

 

type ‘a observable = IObservable<‘a>

type ‘a observer = IObserver<‘a>

 

/// converts a lambda in a System.Action

let asAction f = new System.Action(f)

 

/// System.Action whichs does nothing

let doNothing = asAction (fun () -> ())

 

/// Creates an observer

let createObserver next error completed =

  {new System.IObserver<_> with

      member this.OnCompleted() = completed()

      member this.OnError(e) = error e

      member this.OnNext(args) = next args}

 

/// Creates a new observable

let create f =

  Observable.Create<_>(fun x ->

    f x

    doNothing)  

 

/// Creates a observable from a async

let ofAsync async =

  create

    (fun obs ->

       Async.StartWithContinuations

         (async,obs.OnNext,obs.OnError,obs.OnError))

 

/// Gets a dispatcher Schdeuler for the current dispatcher

let getDispatcherScheduler _ =

  new DispatcherScheduler(Dispatcher.CurrentDispatcher)

 

/// Generates an observable from an IEvent

let fromEvent (event:IEvent<_,_>) = create (fun x -> event.Add x.OnNext)

 

/// Generates an empty observable

let empty<‘a> = Observable.Empty<‘a>()

 

/// Takes the head of the elements

let head = Observable.First

 

/// Merges the two observables

let mergeWith obs1 obs2 = Observable.Merge(obs2, obs1)

 

/// Merges all observables

let mergeAll (observables:IObservable<IObservable<‘a>>) =

  Observable.Merge observables

 

/// Merges all observables

let merge (observables:(IObservable<‘a>) seq) =

  Observable.Merge observables

 

/// Creates a range as an observable

let range start count = Observable.Range(start, count)

 

/// Converts a seq in an observable

let toObservable (seq: ‘a seq) = Observable.ToObservable seq

 

/// Converts a observable in a seq

let toEnumerable = Observable.ToEnumerable

 

/// Subscribes to the Observable with all 3 callbacks

let subscribeComplete next error completed (observable: ‘a observable) =

   observable.Subscribe(

     (fun x -> next x),

     (fun e -> error e),

     (fun () -> completed()))

 

/// Subscribes to the Observable with a

/// next and an error-function

let subscribeWithError next error observable =

  subscribeComplete next error (fun () -> ()) observable

 

/// Subscribes to the Observable with just a next-function

let subscribe next observable =

  subscribeWithError next ignore observable

 

/// throttles the observable for the given interval

let throttle interval observable =

  Observable.Throttle(observable,interval)   

 

/// throttles the observable scheduled on the current dispatcher

let throttleOnCurrentDispatcher interval observable =

  Observable.Throttle(

     observable,getDispatcherScheduler(),interval)

 

/// samples the observable at the given interval

let sample interval observable =

  Observable.Sample(observable,interval)   

 

/// samples the observable at the given interval

/// scheduled on the current dispatcher

let sampleOnCurrentDispatcher interval observable =

  Observable.Sample(

    observable,getDispatcherScheduler(),interval)

 

/// returns the observable sequence that reacts first.

let takeFirstOf2Reactions obs1 obs2 =

  Observable.Amb(obs1,obs2)

 

/// returns the observable sequence that reacts first.

let amb (obs: IObservable<‘a> seq) =

  Observable.Amb obs  

 

/// returns the observable sequence that reacts first.

let takeFirstReaction (obs: IObservable<‘a> seq) =

  Observable.Amb obs  

 

/// Matches when both observable sequences

/// have an available value.

let both obs1 obs2 = Observable.And(obs1,obs2)

 

/// Merges two observable sequences

/// into one observable sequence.

let zip obs1 obs2 =   

   Observable.Zip(obs1, obs2, Func<_,_,_>(fun a b -> a, b))

 

/// Merges two observable sequences into one observable sequence

/// whenever one of the observable sequences has a new value.

///    ==> More results than zip

let combineLatest obs1 obs2 =   

   Observable.CombineLatest(

     obs1, obs2, Func<_,_,_>(fun a b -> a, b))    

 

/// Concats the two observables to one observable

let concat observable =

  Observable.SelectMany(

    observable,

    Func<_,_>(fun (x:IObservable<‘a>) -> x))

 

/// maps the given observable with the given function

let map f observable =

  Observable.Select(observable,Func<_,_>(f))  

 

/// maps the given observable with the given function

let mapi f observable =

  Observable.Select(observable,Func<_,_,_>(fun x i ->f i x))  

 

/// Filters all elements where the given predicate is satified

let filter f observable =

  Observable.Where(observable, Func<_,_>(f))

 

/// Splits the observable into two observables

/// Containing the elements for which the predicate returns

/// true and false respectively

let partition predicate observable =

  filter predicate observable,

  filter (predicate >> not) observable

 

/// Skips n elements

let skip n observable = Observable.Skip(observable, n)

 

/// Skips elements while the predicate is satisfied

let skipWhile f observable =

  Observable.SkipWhile(observable, Func<_,_>(f))

 

/// Runs all observable sequences in parallel

/// and combines their first values.

let forkJoin (observables: (‘a observable) seq) =

  Observable.ForkJoin observables

 

/// Counts the elements

let length = Observable.Count

 

/// Takes n elements

let take n observable =

  Observable.Take(observable, n)  

 

/// Determines whether the given observable is empty 

let isEmpty observable = Observable.IsEmpty observable

 

/// Determines whether the given observable is not empty 

let isNotEmpty observable = not (Observable.IsEmpty observable)

 

/// Determines whether an observable sequence

/// contains a specified value

/// which satisfies the given predicate

let exists predicate observable =

  observable

    |> skipWhile (predicate >> not)

    |> isNotEmpty

 

/// Continues an observable sequence that is terminated

/// by an exception with the next observable sequence.

let catch (newObservable:IObservable<‘a>) failingObservable =

  Observable.Catch(failingObservable,newObservable) 

 

/// Takes elements while the predicate is satisfied

let takeWhile f observable =

  Observable.TakeWhile(observable, Func<_,_>(f))

 

/// Iterates through the observable

/// and performs the given side-effect

let perform f observable =

  Observable.Do(observable,fun x -> f x)

 

/// Invokes finallyAction after source observable

/// sequence terminates normally or by an exception.

let performFinally f observable =

  Observable.Finally(observable,fun _ -> f())

 

/// Folds the observable

let fold f seed observable =

  Observable.Aggregate(observable, seed, Func<_,_,_>(f))  

 

/// Retruns an observable from a async pattern 

let fromAsync beginF endF =

   Observable.FromAsyncPattern<_>(

     Func<_,_,_>(fun x y -> beginF(x,y)),

       (fun x -> endF x)).Invoke()

 

/// Runs all observable sequences in parallel

/// and combines their first values.

let subscribeAll next observables =

  observables |> Seq.map (subscribe next) |> Seq.toList     

 

type IObservable<‘a> with

  /// Subscribes to the Observable with just a next-function

  member this.Subscribe(next) =

    subscribe next this

 

  /// Subscribes to the Observable with a next

  /// and an error-function

  member this.Subscribe(next,error) =

    subscribeWithError next error this

 

  /// Subscribes to the Observable with all 3 callbacks

  member this.Subscribe(next,error,completed) =

    subscribeComplete next error completed this

 

open System.Net

 

type WebRequest with

  member this.GetRequestStreamAsync() =

    fromAsync

     this.BeginGetRequestStream

     this.EndGetRequestStream

 

  member this.GetResponseAsync() =

    fromAsync

      this.BeginGetResponse

      this.EndGetResponse

 

  member this.GetResponseStreamAsync() =

    fromAsync

      this.BeginGetRequestStream

      this.EndGetRequestStream

 

type Async<‘a> with

  member this.ToObservable() = ofAsync this

Tags: , , ,

Friday, 24. October 2008


Using PLINQ in F# – Parallel Map and Reduce (Fold) functions – part 2

Filed under: .NET 3.0,English posts,F#,Informatik,PLINQ — Steffen Forkmann at 18:00 Uhr

Last time I showed how it is possible to use parallel map and fold functions to compute the sum of all factorials between 1 and 3000. The result was a nearly perfect load balancing for this task on a two processor machine. This time I will derive a generic function that computes partial results in parallel and folds them to a final result.

Let’s consider our F# example:

let add a b = a + b  
let fac (x:bigint) = 
  [1I..x] |> List.fold_left (*) 1I
let sequential() =
  [1I..3000I]
   |> List.map fac
   |> List.fold_left add 0I

This is the same as:

let calcFactorialSum min max =
  [min..max] 
   |> List.map fac
   |> List.fold_left add 0I  
 
let f1() = calcFactorialSum    1I 2000I
let f2() = calcFactorialSum 2001I 2200I
let f3() = calcFactorialSum 2201I 2400I
let f4() = calcFactorialSum 2401I 2600I
let f5() = calcFactorialSum 2601I 2800I
let f6() = calcFactorialSum 2801I 3000I
 
let sequential2() =
  f1() + f2() + f3() + f4() + f5() + f6()

We spitted the summation into 6 independent tasks and computed the sum of the partial results. This has nearly no bearing on the runtime.

But with the help of PLINQ we can compute each task in parallel:

let asParallel (list: 'a list) = 
  list.AsParallel<'a>()

let runParallel functions = 
    ParallelEnumerable.Select(
      asParallel functions, (fun f ->  f() ) )
 
let pFold foldF seed (data:IParallelEnumerable<'a>)=
  ParallelEnumerable.Aggregate<'a,'b>(
    data, seed, new Func<'b,'a,'b>(foldF))
 

let calcFactorialsParallel() =
  [f1; f2; f3; f4; f5; f6]
    |> runParallel
    |> pFold add 0I

This time we build a list of functions (f1, f2, f3, f4, f5, f6) and run them in parallel. "runParallel” gives us back a list of the partial results, which we can fold with the function “add” to get the final result.

On my Core 2 Duo E6550 with 2.33 GHz and 3.5 GB RAM I get the following results:

Time Normal: 26.576s

Time Sequential2: 26.205s (Ratio: 0.99)

Time “Parallel Functions”: 18.426s (Ratio: 0.69)

Time PLINQ: 14.990s (Ratio: 0.56) (Last post)

Same Results: true

We can see that the parallel computation of the functions f1 – f6 is much faster than the sequential.

But why is the PLINQ-version (see last post) still faster? We can easily see that each partial function needs a different runtime (e.g. it’s much harder to calculate the factorials between 2800 and 3000 than between 2000 and 2200). On my machine I get:

Time F1: 8.738s

Time F2: 2.663s

Time F3: 3.119s

Time F4: 3.492s

Time F5: 3.889s

Time F6: 4.442s

The problem is that the Parallel Framework can only guess each runtime amount in advance. So the load balancing for 2 processors will not be optimal in every case. In the original PLINQ-version there are only small tasks, and the difference between each runtime is smaller. So it is easier to compute the load balancing.

But of course we can do better if we split f1 into two functions f7 and f8:

let f7() = calcFactorialSum    1I 1500I
let f8() = calcFactorialSum 1501I 2000I

So we can get a better load balancing:

Time F1: 8.721s

Time F7: 4.753s

Time F8: 4.829s

Time Normal: 26.137s

Time “Parallel Functions”: 16.138s (Ratio: 0.62)

Same Results: true

Tags: , , , , , ,

Thursday, 23. October 2008


Using PLINQ in F# – Parallel Map and Reduce (Fold) functions – part 1

Filed under: .NET 3.0,C#,F# — Steffen Forkmann at 18:25 Uhr

If your wondering how Google computes query results in such a short time you have to read the famous “MapReduce”-Paper by Jeffrey Dean and Sanjay Ghemawat (2004). It shows how one can split large tasks into a mapping and a reduce step which could then be processed in parallel.

With PLINQ (part of the Parallel Extensions to the .NET Framework) you can easily use “MapReduce”-pattern in .NET and especially F#. PLINQ will take care of all the MultiThreading and load balancing stuff. You only have to give PLINQ a map and a reduce (or fold) function.

Lets consider a small example. Someone wants to compute the sum of the factorials of all integers from 1 to 3000. With List.map and List.fold_left this is a very easy task in F#:

#light
open System

let add a b = a + b
let fac (x:bigint) = [1I..x] |> List.fold_left (*) 1I

let sum =
  [1I..3000I]
    |> List.map fac
    |> List.fold_left add 0I

printfn "Sum of Factorials: %A" sum

Of course you could do much much better if you don’t compute every factorial on its own (I will show this in one of the next parts) – but for this time I need an easy function that is time consuming.

This simple Task needs 27 sec. on my Core 2 Duo E6550 with 2.33 GHz and 3.5 GB RAM.

But we can do better if we use parallel map and fold functions with help of PLINQ:

let pMap (mapF:'a -> 'b) (data:IParallelEnumerable<'a>) =
  ParallelEnumerable.Select(data, mapF)

let pFold foldF seed (data:IParallelEnumerable<'a>)=
  ParallelEnumerable.Aggregate<'a,'b>(
    data, seed, new Func<'b,'a,'b>(foldF))

Now we can easily transform our calculation to a parallel version:

let sum =
  [1I..3000I].AsParallel<bigint>()
    |> pMap fac 
    |> pFold add 0I

Putting all together we can write a small test application:

#light 
open System
open System.Linq
open System.Diagnostics

let testRuntime f =
  let watch = new Stopwatch()
  watch.Start()
  (f(),watch.Elapsed)

let add a b = a + b
let fac (x:bigint) = [1I..x] |> List.fold_left (*) 1I

let list = [1I..3000I]

let pMap (mapF:'a -> 'b) (data:IParallelEnumerable<'a>)=
  ParallelEnumerable.Select(data, mapF)

let pFold foldF seed (data:IParallelEnumerable<'a>)=
  ParallelEnumerable.Aggregate<'a,'b>(
    data, seed, new Func<'b,'a,'b>(foldF))

let PLINQ() =
  list.AsParallel<bigint>()
    |> pMap fac
    |> pFold add 0I

let sequential() =
  list
   |> List.map fac
   |> List.fold_left add 0I

let (sumSequential,timeSequential) =
  testRuntime sequential
printfn "Time Normal: %.3fs" timeSequential.TotalSeconds

let (sumPLINQ,timePLINQ) =
  testRuntime PLINQ
printfn "Time PLINQ: %.3fs" timePLINQ.TotalSeconds

timePLINQ.TotalSeconds / timeSequential.TotalSeconds
  |> printfn "Ratio: %.2f"

sumSequential = sumPLINQ
  |> printfn "Same Results: %A"

On my machine I get the following results:

Time Normal: 27.955s

Time PLINQ: 15.505s

Ratio: 0.55

Same Results: true

This means I get nearly a perfect load balancing on my two processors for this task.

In part II I describe how one can compute a series of functions in parallel.

Tags: , , , , , , ,