
Reactive Extensions (Rx) is a new library for composing asynchronous and event-based programs using observable collections.
Why Reactive Extensions?
When currently working with data sources one will use an IEnumerable<T> to work with this source. IEnumerable<T> is synchronous, meaning that you as a developer have to ask the source to get the current element or to move to the next element (Current property and MoveNext method). The implication of this is that your application could get stuck whilst performing these actions.
IEnumerable<T> has some great advantages like the ability to query collections using LINQ, the fact remains that when dealing with asynchronous data it gets a lot more complicated. IObservable<T> would be the solution to this problem, since it’s the dual form of IEnumerable<T>. To make this duality complete, IObservable<T> should be queryable using LINQ as well. This can be achieved using Reactive Extensions.
The building blocks
IObservable<T>
IObservable<T> is one of the 2 main interfaces for working with Reactive Extensions. It’s not a complicated interface with only a Subscribe method. Since version 4 of the .NET Framework this interface is included in the BCL. If you’re still using .NET 3.5 you can use this interface as well since the Rx team has includes these interface in a separate assembly for this scenario (being System.Observable).
An implementation of this interface can best be described as a collection of elements of type T. The main target of this IObservable<T> interface is to provide elements to the observers (subscribers). Thus, an IObservable<Animal> can be thought of as a collection of Animals, where the Animals will be pushed to the subscribed observers.
IObserver<T>
IObserver<T> is the other main interface for working with Reactive Extensions and is as well included in the BCL since .NET Framework 4. The IObserver<T> interface is meant to be the dual of IEnumerable<T>.
Although using Rx you will be exposed to the IObservable<T> interface all the time, you almost never will implement these interfaces yourselves since Rx provides a lot of implementations out of the box.
Overloads of Subscribe
As you know IObservable exposes only one method, the Subscribe method. The basic extension method is an overload to Subscribe where you pass just on Action<T> that will be performed when OnNext is invoked. The other extensions are also very useful and allow you to pass different combinations of delegates for the different events that could occur when subscribing to the source.
public static class ObservableExtensions
{
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,
Action<TSource> onNext);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,
Action<TSource> onNext, Action<Exception> onError);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,
Action<TSource> onNext, Action onCompleted);
public static IDisposable Subscribe<TSource>(this IObservable<TSource> source,
Action<TSource> onNext, Action<Exception> onError, Action onCompleted);
}
You will use these overloads a lot but there also are a lot of other static operators that are very helpful on the Observable static class. But how did we get this syntax? When we explore the IObserver interface we get something very familiar:
public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action
onCompleted);
Putting this together with the fact you need to provide an Observer when Subscribing to an Observable, we should get something like this:
Observable.Subscribe(Observer.Create(Action<T> onNext, Action<Exception> onError, Action
onCompleted));
Although this would have made sense, actually the Observer.Create isn’t really necessary; so the Rx-team has decided to omit this step.
Create and generate observables
The Observable class has plenty of methods that make creating common types of Obsersables more easy. We’ll explore some of these methods below and include an example of how to use them. In our examples I’ll use Subject<T> and its variants, I’ll explain what they exactly do and their differences in a later post, and I’m sure you can see the concept of them in the code.
Observable.Empty<T>()
This method returns an IObservable<T> that just publishes an OnCompleted.
// Create an empty Observable
var emptyObservable = Observable.Empty<int>();
Observable.Return<T>(T)
The return-method will return an observable that publishes the value that is provided and then publishes OnCompleted.
// Create our Observable
var returnObservable = Observable.Return<int>(1);
Observable.Never<T>()
This method will just return an IObservable<T> without publishing any events.
// Create our Observable
var neverObservable = Observable.Never<int>();
Observable.Throw<T>(Exception)
This method will only publish the specified exception.
// Create our Observable
var exceptionObservable = Observable.Throw<int>(new Exception("This should not happen :-)"));
Observable<T>.Create(Func<IObserver<T>,Action>
This method (and its signature) is a bit more complicated than the above methods. The main idea is that this method will let you specify a delegate that will be executed when a subscription is made. The IObserver<T> that made the subscription will be passed to the delegate you specify so that you can call the OnNext, OnError and OnCompleted methods as you wish. Your delegate actually is a Func that returns an Action. This action is the method that can be called when a subscriber disposes from their subscription (more on unsubscribing later in this post). Also notice the analogy with the Observer.Create explained above.
var obs = Observable.Create<int>(
observable =>
{
observable.OnNext(1);
observable.OnNext(2);
observable.OnCompleted();
return () => Console.WriteLine("Our observer has unsubscribed");
});
obs.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));
Observable.Range(int, int)
This method just returns a range on integers. The first parameter is the initial value, the second is the number of values that should be generated. The example will write all values from 5 to 12.
var rangeObservable = Observable.Range(5, 7);
rangeObservable.Subscribe(Console.WriteLine);
Observable.Interval(TimeSpan)
The Interval method will publish values incrementing from 0 every period that you specify. In the example we’ll publish a value every 500 milliseconds.
var intervalObservable = Observable.Interval(TimeSpan.FromMilliseconds(500));
intervalObservable.Subscribe(Console.WriteLine);
This method can be handy when emulating fluctuating data like the stock market.
// Random generator to emulate fluctuations
var random = new Random();
// Last know stock value; we'll start at 10
var lastStock = 10.0;
// Start our fluctuation every 500 milliseconds
var interval = Observable.Interval(TimeSpan.FromMilliseconds(500))
.Select(i =>
{
// Generate the fluctuation
var fluctuation = random.NextDouble() - 0.5;
lastStock += fluctuation;
return lastStock;
});
// Subscribe to our observable
interval.Subscribe(Console.WriteLine);
Observable.Start
This method has some different signatures. Basically the method allows you turn a Func<T> or an Action into an Observable. When using the overload with the Func<T>, the code will execute and when the Func return its value, it will be published followed by OnCompleted. An example:
var start = Observable.Start(() =>
{
Console.Write("Getting some work done");
for (int i = 0; i < 20; i++)
{
Thread.Sleep(100);
Console.Write(".");
}
Console.WriteLine();
return "My value";
});
start.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));
When using the overload using Action, then the returned Observable will be of type IObservable<Unit>. Unit can best be described as the void for generic types. In this case, it is just used to publish a notice that the Action is complete. However this is not very important since OnCompleted will be published immediately after Unit anyway. Let’s write our above example as an Action:
var start = Observable.Start(() =>
{
Console.Write("Getting some work done");
for (int i = 0; i < 20; i++)
{
Thread.Sleep(100);
Console.Write(".");
}
Console.WriteLine();
});
start.Subscribe(unit => Console.WriteLine("Unit published"), () =>
Console.WriteLine("Action completed"));
An important thing to not when using Observable.Start is that it also starts the Observable on the moment specified by the Scheduler). By default this will be set to ‘Know’. A small code example that clearly shows this is the following where we reuse part of the above code, but sleep some milliseconds before subscribing. You’ll see that we do get values printed from the Observable before we subscribed.
var start = Observable.Start(() =>
{
Console.Write("Getting some work done");
for (int i = 0; i < 20; i++)
{
Thread.Sleep(100);
Console.Write(".");
}
Console.WriteLine();
return "My value";
});
Thread.Sleep(500); // Wait before subscribing
Console.WriteLine("Subscribing"); // Make visible when we subscribe
start.Subscribe(Console.WriteLine, () => Console.WriteLine("Action completed"));
ToObservable<T>(this IEnumerable<T>)
This method can be used to convert an IEnumerable<T> into an IObservable<T>. Although this method is pretty straightforward, it also has some great value. By using this method it’s very easy to get some quick mocking done. In addition to this, the ToObservable and ToEnumerable methods are important from a concurrency point of view since they can be used to convert between synchronous and asynchronous.
// Create a new IEnumerable
var enumerate = new List<int>()
{ 1, 2, 3 };
// Convert the IEnumerable to IObservable
var observable = enumerate.ToObservable();
// Subscribe to the IObservable
observable.Subscribe(Console.WriteLine);
ToEnumerable<T>(this IObservable<T>)
This function is analogue with the above function but works in the opposite direction. Given an IObservable<T>, this observable will be converted to an IEnumerable<T>.
Observable.Generate
Generate is a method for generating sequences of numbers. Let’s just stick with a basic example and create a sequence starting for 5, incrementing with 5 while the value is less than 50.
var generatedObservable = Observable.Generate(5, i => i < 50, i => i + 5, i => i);
generatedObservable.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
Check the existence of values
Moving on from methods that create or generate observables, these methods confirm the existence of a published value.
Any<T>(this IObservable<T>)
This method will either check whether the IObservable will publish any value at all, or if you choose to use the overload (taking a Func<T, bool>) then it will check if any of the values satisfy your predicate. One of the interesting things of Any is that it is non-blocking and thus returns an IObservable<bool>. This will only yield one value.
// Create a new observable
var obs = Observable.Range(10, 15);
// Does the observable contains any value?
obs.Any().Subscribe(Console.WriteLine);
// Does the observable contains any value that passes the specified predicate?
obs.Any(i => i > 100).Subscribe(Console.WriteLine);
// Does Observable.Empty contains any value?
Observable.Empty<int>().Any().Subscribe(Console.WriteLine);
Contains<T>(this IOservable<T>, T)
Contains<T> is a method quite similar to Any() but will only accept a value of T. Just like Any() this method returns an IObservable<bool>.
// Create a new observable
var obs = Observable.Range(10, 15);
// Does the observable contains the value 15?
obs.Contains(15).Subscribe(Console.WriteLine);
// Does Observable.Empty contains the value 5?
Observable.Empty<int>().Contains(5).Subscribe(Console.WriteLine);
IsEmpty<T>(this IObservable<T>)
This extension method only checks to see whether the stream is empty (actually the opposite of Any()). This method as well returns an IObservable<bool>.
// Create a new observable
var obs = Observable.Range(10, 15);
// Is the observable empty?
obs.IsEmpty().Subscribe(Console.WriteLine);
// Create a new observable and publish a value
var subject = new ReplaySubject<int>();
subject.OnNext(1);
// Is this observable empty?
subject.IsEmpty().Subscribe(Console.WriteLine);
// Is Observable.Empty really empty?
Observable.Empty<int>().IsEmpty().Subscribe(Console.WriteLine);
Filtering and Aggregating
Next we’ll explore some extension methods that provide some sort of filter to the stream or aggregate the data that is being returned. You’ll notice that these are similar to those used for IEnumerable<T>. We’ll just explore one of those basic method, and then give a list of other basic methods that don’t need further explanation.
First<T>
// Create a new observable
var obs = Observable.Range(10, 15);
// And get the first element of this observable
Console.WriteLine(obs.First());
// Try to get the first value of our Observable.Empty
// This will throw an InvalidOperationException("Sequence contains no elements.")
Console.WriteLine(Observable.Empty<int>().First());
The following methods are quite self-explanatory when you have experience with IEnumerable<T>. One should notice that some of them return IObservable<T> instead of T. The reason for this is that by making them return an IObservable<T> the asynchronous nature of Rx is preserved. If these methods would return just T, this would make those calls synchronous and these could become blocking calls. Some methods do just return T (First, FirstOrDefault, Last, LastOrDefault, Single, ..).
- FirstOrDefault
- Last
- LastOrDefault
- Single
- Count
- Min
- Max
- Sum
- Average
- GroupBy
Where<T>(this IObservable<T>, Func<TSource, bool> predicate)
When talking about filtering it’s hard to ignore the ultimate filter where (which should be pretty familiar). In our example we’ll apply a filter that only prints where numbers where the modulo of 5 is 0.
// Create a new observable (10 -> 24)
var obs = Observable.Range(10, 15);
// Print only values where modulo 5 == 0
obs.Where(x => x % 5 == 0).Subscribe(Console.WriteLine);
Take<T>(this IObservable<T>, int)
Take will return you the first N publications specified by the integer value. Take(1) can be thought of as First with one large difference that it returns IObservable<T> instead of First that just returns T. In our example we’ll only take the first 3 values (begin 10, 11 and 12):
// Create a new observable
var obs = Observable.Range(10, 15);
// Take the first 3 items
obs.Take(3).Subscribe(Console.WriteLine);
Skip<T>(this IObservable<T>, int)
Skip will ignore the first N publications specified by the integer value. So while Take(3) returned the first 3 values (begin 10, 11 and 12), Skip(3) will ignore those and return the rest of the sequence.
// Create a new observable
var obs = Observable.Range(10, 15);
// Ignore the first 3 items
obs.Skip(3).Subscribe(Console.WriteLine);
DistinctUntilChanged<T>(this IObservable<T>)
Thus method will ignore all publications that have the same value as the previous value. In our example, the values 1, 2 and 3 will only be published once. This method can be very handy when handling user input that has (or can have) duplicates.
// Create a new observable
var obs = new Subject<int>();
// Subscribe to the observable
obs.DistinctUntilChanged().Subscribe(Console.WriteLine);
// Add some values
obs.OnNext(1);
obs.OnNext(1);
obs.OnNext(1);
obs.OnNext(2);
obs.OnNext(2);
obs.OnNext(2);
obs.OnNext(2);
obs.OnNext(3);
obs.OnNext(3);
obs.OnNext(3);
Buffering and time shifting
There are different scenarios where the amount of data that is provided is so large that you don’t need all the data that is been pushed. Luckily Rx provides some great method to control the rate at which values are published.
BufferWithCount and BufferWithTime
These methods allow you to buffer a range of values, and republish those once the buffer is full. You can chose whether you want to specify to buffer a number of elements (BufferWithCount) or buffer all the values per timespan (BufferWithTime). Let’s see how we use this:
// Create a new observable
var obs = Observable.Range(10, 15);
// Create buffers of 4 values
obs.BufferWithCount(4).Subscribe(
enumerable =>
{
Console.WriteLine("--- Buffer ---");
foreach (var i in enumerable)
{
Console.WriteLine(i);
}
enumerable.ToList().ForEach(Console.WriteLine);
}, () => Console.WriteLine("Obs Buffer Completed"));
// Create a new observable
var interval = Observable.Interval(TimeSpan.FromMilliseconds(150));
// Create a buffer of 1 second
interval.BufferWithTime(TimeSpan.FromSeconds(1)).Subscribe(
enumerable =>
{
Console.WriteLine("--- Buffer ---");
foreach (var i in enumerable)
{
Console.WriteLine(i);
}
}, () => Console.WriteLine("Interval Buffer Completed"));
// Just to keep the console visible :-)
Console.ReadKey();
Delay
This method will delay the entire Observable with the TimeSpan specified, or until the specified DateTime. In the example below we’ll subscribe twice to an Observable. In the first subscription we’ll delay the Observable with 5 seconds. In the second subscription we’ll delay to one minute.
// Create a new observable
var obs = Observable.Range(10, 15);
// Add a 5 second delay
obs.Delay(TimeSpan.FromSeconds(5)).Subscribe(Console.WriteLine);
// Delay observable to now + 1 minute
obs.Delay(DateTime.Now.AddMinutes(1)).Subscribe(Console.WriteLine);
Sample<T>(TimeSpan)
You can use the Sample<T> method to take one sample for the TimeSpan specified.
// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(150));
// Only print a sample value every 1 second
obs.Sample(TimeSpan.FromSeconds(1)).Subscribe(Console.WriteLine);
Throttle
The Throttle method is quite similar to the above methods except that it will ignore all values since the last published value if those occur before the specified timespan. In the first example we have a publishers that publishes a value every 100 milliseconds. Since we have subscribed with a Throttle of 200 milliseconds, no values are printed.
// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(100));
// Subscribe with a Throttle of 200 milliseconds
obs.Throttle(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);
In the next example we’ll create a new observable (using Observable.Create) that generates new values at varying speeds. Since we’ll still subscribe with a throttle of 200 milliseconds, some values will get printed (actually, those where Thread.Sleep(..) was bigger than 200).
// Create a new observable
var obs = Observable.Create<int>(
o =>
{
for (int i = 0; i < 100; i++)
{
o.OnNext(i);
Thread.Sleep(i++ % 10 < 5 ? 100 : 300);
}
return () => { };
});
// Subscribe with a Throttle of 200 milliseconds
obs.Throttle(TimeSpan.FromMilliseconds(200)).Subscribe(Console.WriteLine);
This method can be handy when programming a auto-complete alike feature. When using Throttle on an Observable Event (e.g. a textbox), you only want to lookup the values when the user stops typing for more then some milliseconds, to decrease the number of lookups. We’ll explore this usage in detail in a later post.
Unsubscribing
Up till now we have discovered the basics of the Reactive Framework allowing us to create an IObservable, subscribe to it and perform some basic operations such as filtering, aggregating, ... In addition to this, we explored some methods dealing with buffering and time shifting. A logical next step would be to look how we can unsubscribe from a subscription.
When you start looking for an Unsubscribe method, you’ll notice that such a method does not exist in the Reactive Framework. Instead of using an Unsubscribe method, Reactive Extensions will return an IDisposable every time a subscription is made. The best way to think of this IDisposable is as the subscription itself, thus disposing of the subscription will effectively unsubscribe as well. This way of working is (when you think of it) actually more logical then the unsubscribe-approach.
It’s important to note that when you call Dispose on the result of a subscribe call, this will not affect the underlying IObservable<T>, but just the instance of the subscription. This allows us to add and remove subscriptions to our IObservable<T> as we wish. In our example we start with 2 subscribers and then we dispose of one of the subscribers.
// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(100));
// Add the first subscription
var SubscriptionOne = obs.Subscribe(value => Console.WriteLine("Subscription 1 received: {0}",
value));
// Add the second subscription
var SubscriptionTwo = obs.Subscribe(value => Console.WriteLine("Subscription 2 received: {0}",
value));
// Let’s wait a bit
Thread.Sleep(500);
// Dispose of the first subscription
SubscriptionOne.Dispose();
// Write a notice to the console
Console.WriteLine("SubscriptionOne is disposed");
// Just to keep the console visible :-)
Console.ReadLine();
When executing the code you should get an output like you can see below, making it clear that once we dispose of SubscriptionOne, the second subscription still is subscribed and receiving data from our IObservable:

In my above example it seems that all values of Interval are being generated by the same OnNext call (being only one call for both subscriptions). When we expand our sample tough, we can see that each subscription will have their own subscription on the Observable.Interval().
// Create a new observable
var obs = Observable.Interval(TimeSpan.FromMilliseconds(100));
// Add the first subscription
var SubscriptionOne = obs.Subscribe(value => Console.WriteLine("Subscription 1 received: {0}",
value));
// Let’s wait a bit
Thread.Sleep(500);
// Add the second subscription
var SubscriptionTwo = obs.Subscribe(value => Console.WriteLine("Subscription 2 received: {0}",
value));
// Let’s wait a bit
Thread.Sleep(500);
// Dispose of the first subscription
SubscriptionOne.Dispose();
// Write a notice to the console
Console.WriteLine("SubscriptionOne is disposed");
// Just to keep the console visible :-)
Console.ReadLine();
When look at the corresponding output it’s clear that each subscription has it’s own values.

So what are the benefits of using an IDisposable type instead of creating a new type or providing an unsubscribe method? Of course the obvious advantage is that it’s an existing type, this has as advantage that most people already know and understand this type. Secondly, having an unsubscribe method would mean that you have keep an eye on the state, as with event handlers, something you don’t need to worry about when using IDisposable since this is done for you.
Last (but not least), there’s an advantage of compositionality that we get using IDisposable; let’s assume this example:
var Obs1 = Observable.Empty<int>();
var Obs2 = Observable.Empty<int>();
var Obs3 = Observable.Empty<int>();
var MergeObs = Observable.Merge(Obs1, Obs2, Obs3);
var Disposable = MergeObs.Subscribe(Console.WriteLine);
Disposable.Dispose();
In this code we take 3 different Observables and merge those into the MergeObs Observable (more on merging in a later post); by using the compositionality of IDisposable, our Disposable object will be an CompositeDisposable containing the unsubscription handles for all 3 original Observables and handle the disposals of those when disposing of the Disposable object. Another side effect of Rx’s asynchronous nature is that it’s possible that for some of the Observables, the subscription hasn’t been made yet. When calling Dispose this would mean that you should dispose of an subscription that doesn’t exist yet. This is solved by using an MutableDisposable object that represents an IDisposable that isn’t known yet, but can be disposed before it’s know. Once the IDisposable gets set, it gets disposed immediately.
OnError and OnCompleted
Both OnError and OnCompleted will signify the completion of the stream. When a stream publishes one of these, it will be the last publications and no further calls to OnNext can be executed. Let’s try what happens:
// Create a new observable
var obs = new Subject<int>();
// Subscribe to our observable
obs.Subscribe(Console.WriteLine, () => Console.WriteLine("Completed"));
// Publish a value
obs.OnNext(1);
// Complete the stream
obs.OnCompleted();
// Try to publish another value
obs.OnNext(2);
// Just to keep the console visible :-)
Console.ReadLine();
As you’ll notice out first value get pushed at us, while the second value (after the OnCompleted) does not get published.
Want more?
Make sure to check out the Reactive Extensions homepage, the Rx Forum and stay tuned for more posts about Reactive Extensions.