Tuesday 30 March 2010

Reactive Extensions, Silverlight and WCF

Here I will illustrate how Reactive Extensions (Rx) can be applied to asynchronous calls to a WCF web service in a Silverlight application.

We start with a simple customer database consisting of a single table of first and last names.

ID  First   Last

1   Fred    McFarlane
4   Susan   McFarlane
6   David   Green
10  Joe     Bloggs

Then we set up a WCF service to return a collection of customers. I used LINQ to SQL to generate the CustomersDemo DataContext instance.

[ServiceContract(Namespace = "")]
[AspNetCompatibilityRequirements(
RequirementsMode =
AspNetCompatibilityRequirementsMode.Allowed)
]
public class CustomersService
{
private string connection =
@"[My connection]";

[OperationContract]
public List<Customers> GetCustomers()
{
CustomersDemo db = new CustomersDemo(connection);
var customers = from customer in db.Customers
select customer;
return customers.ToList();
}
}

Asynchronous Access Without Rx


To consume the service in the Silverlight client…


Make the asynchronous call in the Page constructor.

public Page()
{
InitializeComponent();

CustomersServiceClient client =
new CustomersServiceClient();
client.GetCustomersCompleted +=
new EventHandler<GetCustomersCompletedEventArgs>(
client_GetCustomersCompleted
);
client.GetCustomersAsync();
}

Handle the completed event and display customers in a data grid.

void client_GetCustomersCompleted(
object sender,
GetCustomersCompletedEventArgs e
)
{
if (e.Error == null)
{
var customers = e.Result;
dg.ItemsSource = customers;
}
}

Results:


image


Asynchronous Access With Rx


In this case we dispense with the completed handler and centralise the code in the Page constructor. Instead of having a separate handler we create an IObservable from the GetCustomersCompleted event using the Observable.FromEvent method. The appended Take(1) is for returning a single value from the start of the observable sequence. It also implicitly unsubscribes the observer.

public Page()
{
InitializeComponent();

CustomersServiceClient client =
new CustomersServiceClient();
IObservable<IEvent<GetCustomersCompletedEventArgs>> observable =
Observable.FromEvent<GetCustomersCompletedEventArgs>(
client, "GetCustomersCompleted"
).Take(1);

observable.Subscribe(
e =>
{
if (e.EventArgs.Error == null)
{
var customers = e.EventArgs.Result;
dg.ItemsSource = customers;
}
});
client.GetCustomersAsync();
}
This produces the same results:

image

Concurrent File Processing Using Reactive Extensions

This follows on from my first post on Reactive Extensions for .NET back in January. We now consider a simple scenario of performing a number of operations on a file concurrently. This example was inspired by a post from functional programming guru, Matthew Podwysocki. Here I just flesh out his suggestion at the end of that post.

Here is a simple text file – test.txt – consisting of two lines (there is a new line after the first period)

We're proud to announce the availability of Reactive Extensions for JavaScript.
This port brings the power of Reactive programming to JavaScript.

I use a short file so we can easily check the results.

Now let’s perform three concurrent asynchronous operations on it.

  1. Count the words
  2. Count the letters
  3. Count the vowels

This seems quite simple but was in fact a bit trickier than meets the eye. The first thing to do is load the contents of the file into an enumerable collection. We do this so that we can subsequently convert the IEnumerable into an IObservable to which we’ll subscribe three times, once for each operation. There are various ways of reading the file but it turned out to be easiest to read it into a single item collection.

Then create an observable

List<string> text = new List<string>();
text.Add(File.ReadAllText(@"test.txt"));
var observable = text.ToObservable();

Count the words

private static void CountWords(string text)
{
string[] separator =
new string[] { " ", ",", ".", "\r\n"};

string[] words =
text.Split(
separator,
StringSplitOptions.RemoveEmptyEntries
);
int count = words.Count();
Console.WriteLine("Number of words = {0}", count);
}

Count the letters

private static void CountLetters(string text)
{
string[] separator =
new string[] { " ", ",", ".", "\r\n" };

string[] words =
text.Split(
separator,
StringSplitOptions.RemoveEmptyEntries
);
int count = words.Sum(word => word.Length);
Console.WriteLine("Number of letters = {0}", count);
}

Count the vowels

private static void CountVowels(string text)
{
char[] vowels = {'a', 'e', 'i', 'o', 'u'};
char[] chars = text.ToCharArray();
int count =
chars.Count(
c => vowels.Contains(Char.ToLowerInvariant(c))
);
Console.WriteLine("Number of vowels = {0}", count);
}

Putting it all together…

// Read file and store as single item string collection
List<string> text = new List<string>();
text.Add(File.ReadAllText(@"test.txt"));

// Create an observable from our collection
var observable = text.ToObservable();

// Print some file stats concurrently
using (observable.Subscribe(CountWords))
using (observable.Subscribe(CountLetters))
using (observable.Subscribe(CountVowels))

Results:

Number of words = 21
Number of letters = 123
Number of vowels = 47