Microsoft quietly pushed out Reactive Extensions (aka LINQ to Events) for .NET 3.5 SP1 and .NET 4.0 before Christmas. Actually it wasn't done that quietly but there hasn't been that much fanfare about it compared to other features of .NET 4.0. It's a DevLabs project and official documentation is rather scant at the moment. The main sources are videos, blog posts and an MSDN forum.
Reactive Extensions (Rx) used to be called Reactive Framework. An earlier incarnation was quietly slipped out with the Silverlight 3 Toolkit in mid-2009. The purpose of Rx is to make asynchronous programming easier. Conceptually, I'm not sure whether it does but it certainly makes such code expressively more elegant.
If you want to see a reasonably organised set of code snippets that you can run then I recommend the Rx Wiki which hosts 101 Rx Samples. The Rx Wiki also lists the available Channel 9 videos.
Getting Started
Watch the short Getting Started video.
I downloaded the .NET 3.5 SP1 version. For a bit of conceptual background try reading the first few sections of Introducing Rx. It does contain a few typos though.
Let's start as simple as possible. We're going to call the following method from a Console application and execute it synchronously, then asynchronously the old way, then asynchronously the "reactive" way. We'll make some observations as we go along.
Example 1 - Running Code Synchronously
private static void DoSomething()
{
Console.WriteLine("Calculating...");
Thread.Sleep(2000);
Console.WriteLine("Done.");
}
private static void RunCodeSynchronously()
{
DoSomething();
}
Result:
Calculating...
Done.
Example 2 - Running Code Asynchronously the Old Way
We define a delegate for our DoSomething method and use BeginInvoke to fire it asynchronously. We also annotate the main and asynchronous threads for clarity.private delegate void DoSomethingDelegate();
private static void DoSomething()
{
Console.WriteLine("Async thread {0}.",
Thread.CurrentThread.GetHashCode());
Console.WriteLine("Calculating...");
Thread.Sleep(2000);
Console.WriteLine("Done.");
}
private static void DoSomethingCompleted(IAsyncResult result)
{
}
private static void RunCodeAsynchronouslyOldWay()For illustration purposes we aren't interested in the AsyncCallback delegate so it's just a no-op. (The Console.ReadKey is introduced to allow the asynchronous thread to complete before the main thread. )
{
Console.WriteLine("Main thread {0}.",
Thread.CurrentThread.GetHashCode());
DoSomethingDelegate d =
new DoSomethingDelegate(DoSomething);
d.BeginInvoke(
new AsyncCallback(DoSomethingCompleted),
null
);
Console.ReadKey();
Console.WriteLine("Back in main thread.");
}
Result:
Main thread 1.
Async thread 3.
Calculating...
Done.
Back in main thread.
Example 3 - Running Code Asynchronously using Rx
private static void RunCodeAsynchronouslyRx()
{
Console.WriteLine("Main thread {0}.",
Thread.CurrentThread.GetHashCode());
// Pass an Action delegate representing our operation
// and invoke asynchronously
var o = Observable.Start(DoSomething);
o.Subscribe();
Console.ReadKey();
Console.WriteLine("Back in main thread.");
}
Result:
Main thread 1.
Async thread 3.
Calculating...
Done.
Back in main thread.
Here the asynchronous code becomes an observable to which we subscribe. It "pushes" its operations to the observer which is our invoking code.
Example 4 - Running Code Asynchronously the Old Way With an Error
RunCodeAsynchronouslyOldWay remains unchanged from above. DoSomething changes to:private static void DoSomething()
{
try
{
Console.WriteLine(
"Async thread {0}.",
Thread.CurrentThread.GetHashCode()
);
Console.WriteLine("Calculating...");
Thread.Sleep(1000);
// Introduce a divide-by-zero error
int a = 1;
int b = 0;
int x = a / b;
Console.WriteLine("Done.");
}
catch (Exception ex)
{
Console.WriteLine("Error: " + ex.Message);
}
}
Result:
Main thread 1.
Async thread 3.
Calculating...
Error: Attempted to divide by zero.
Back in main thread.
Example 5 - Running Code Asynchronously using Rx With an Error
This time we don't require a try-catch block in DoSomething.private static void DoSomething()Instead we subscribe to the error from our invoking code.
{
Console.WriteLine("Async thread {0}.",
Thread.CurrentThread.GetHashCode());
Console.WriteLine("Calculating...");
Thread.Sleep(1000);
int a = 1;
int b = 0;
int x = a / b;
Console.WriteLine("Done.");
}
private static void RunCodeAsynchronouslyRx()
{
Console.WriteLine("Main thread {0}.",
Thread.CurrentThread.GetHashCode());
// Pass an Action delegate representing some operation
// and invoke asynchronously
var o = Observable.Start(DoSomething);
o.Subscribe(
x => {},
ex => Console.WriteLine("Error: " + ex.Message)
);
Console.ReadKey();
Console.WriteLine("Back in main thread.");
}
Result:
Main thread 1.
Async thread 3.
Calculating...
Error: Attempted to divide by zero.
Back in main thread.
The Subscribe method is overloaded. In the overload above we are using a value handler and an exception handler. Think of the value handler as corresponding to the DoSomethingCompleted handler from Example 2. It's code we wish to run after the asynchronous code has completed. The value handler is an Action<T> delegate. We wish to do nothing so I've just supplied a no-op.
One last thing, Subscribe returns an IDisposable so we should call Dispose when we're done or else wrap it in a using statement. I left out those details for simplicity of exposition.
Well, this is just the briefest of introductions. The idea was to use as simple logic as possible so that we can focus on what's new. Often when new concepts are introduced the business logic of the examples is so non-trivial that it obscures the concepts. Reactive Extensions packs in a lot. The Observable class, for example, is huge. I will continue to explore it and hope to post more in due course.
No comments:
Post a Comment