1. Introduction to Reactive Programming

Reactor is an implementation of the Reactive Programming paradigm, which can be summed up as follows:

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).

The reactive programming paradigm is often presented in object-oriented languages as an extension of the Observer design pattern. You can also compare the main reactive streams pattern with the familiar Iterator design pattern, as there is a duality to the Iterable-Iterator pair in all of these libraries. One major difference is that, while an Iterator is pull-based, reactive streams are push-based.

The observer pattern is a software design pattern in which an object, named the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes, usually by calling one of their methods.

In object-oriented programming, the iterator pattern is a design pattern in which an iterator is used to traverse a container and access the container’s elements. The iterator pattern decouples algorithms from containers; in some cases, algorithms are necessarily container-specific and thus cannot be decoupled.

Using an iterator is an imperative programming pattern, even though the method of accessing values is solely the responsibility of the Iterable. Indeed, it is up to the developer to choose when to access the next() item in the sequence. In reactive streams, the equivalent of the above pair is Publisher-Subscriber. But it is the Publisher that notifies the Subscriber of newly available values as they come, and this push aspect is the key to being reactive. Also, operations applied to pushed values are expressed declaratively rather than imperatively: The programmer expresses the logic of the computation rather than describing its exact control flow.

In addition to pushing values, the error-handling and completion aspects are also covered in a well defined manner. A Publisher can push new values to its Subscriber (by calling onNext) but can also signal an error (by calling onError) or completion (by calling onComplete). Both errors and completion terminate the sequence. This can be summed up as follows:

onNext x 0..N [onError | onComplete]

This approach is very flexible. The pattern supports use cases where there is no value, one value, or n values (including an infinite sequence of values, such as the continuing ticks of a clock).

1.1. From Imperative to Reactive Programming

Reactive libraries, such as Reactor, aim to address these drawbacks of "classic" asynchronous approaches on the JVM while also focusing on a few additional aspects:

  • Composability and readability

  • Data as a flow manipulated with a rich vocabulary of operators

  • Nothing happens until you subscribe

  • Backpressure or the ability for the consumer to signal the producer that the rate of emission is too high

  • High level but high value abstraction that is concurrency-agnostic

You can think of data processed by a reactive application as moving through an assembly line. Reactor is both the conveyor belt and the workstations. The raw material pours from a source (the original Publisher) and ends up as a finished product ready to be pushed to the consumer (or Subscriber).

In Reactor, operators are the workstations in our assembly analogy. Each operator adds behavior to a Publisher and wraps the previous step’s Publisher into a new instance. The whole chain is thus linked, such that data originates from the first Publisher and moves down the chain, transformed by each link. Eventually, a Subscriber finishes the process.

In Reactor, when you write a Publisher chain, data does not start pumping into it by default. Instead, you create an abstract description of your asynchronous process (which can help with reusability and composition).

Propagating signals upstream is also used to implement backpressure, which we described in the assembly line analogy as a feedback signal sent up the line when a workstation processes more slowly than an upstream workstation.

1.2. Reactor Core Features

The Reactor project main artifact is reactor-core, a reactive library that focuses on the Reactive Streams specification and targets Java 8.

Reactor introduces composable reactive types that implement Publisher but also provide a rich vocabulary of operators: Flux and Mono. A Flux object represents a reactive sequence of 0..N items, while a Mono object represents a single-value-or-empty (0..1) result.

  • A Flux<T> is a standard Publisher<T> that represents an asynchronous sequence of 0 to N emitted items, optionally terminated by either a completion signal or an error. As in the Reactive Streams spec, these three types of signal translate to calls to a downstream Subscriber’s onNext, onComplete, and onError methods.

    flux
    Figure 1. Flux, an Asynchronous Sequence of 0-N Items
  • A Mono<T> is a specialized Publisher<T> that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).

    mono
    Figure 2. Mono, an Asynchronous 0-1 Result

1.3. Simple Ways to Create a Flux or Mono and Subscribe to It

The easiest way to get started with Flux and Mono is to use one of the numerous factory methods found in their respective classes.

For instance, to create a sequence of String, you can either enumerate them or put them in a collection and create the Flux from it, as follows:

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

Other examples of factory methods include the following:

Mono<String> noData = Mono.empty();  (1)

Mono<String> data = Mono.just("foo");

Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);  (2)
1 Notice the factory method honors the generic type even though it has no value.
2 The first parameter is the start of the range, while the second parameter is the number of items to produce.

When it comes to subscribing, Flux and Mono make use of Java 8 lambdas. You have a wide choice of .subscribe() variants that take lambdas for different combinations of callbacks, as shown in the following method signatures:

Lambda-based subscribe variants for Flux
subscribe(); (1)

subscribe(Consumer<? super T> consumer);  (2)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer);  (3)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer);  (4)

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer);  (5)
1 Subscribe and trigger the sequence.
2 Do something with each produced value.
3 Deal with values but also react to an error.
4 Deal with values and errors but also run some code when the sequence successfully completes.
5 Deal with values and errors and successful completion but also do something with the Subscription produced by this subscribe call.

2. The Reactive Extensions for .NET (Rx.NET)

The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators. Using Rx, developers represent asynchronous data streams with Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers. Simply put, Rx = Observables + LINQ + Schedulers.

Using Rx, you can represent multiple asynchronous data streams (that come from diverse sources, e.g., stock quote, tweets, computer events, web service requests, etc.), and subscribe to the event stream using the IObserver<T> interface. The IObservable<T> interface notifies the subscribed IObserver<T> interface whenever an event occurs.

Because observable sequences are data streams, you can query them using standard LINQ query operators implemented by the Observable extension methods. Thus you can filter, project, aggregate, compose and perform time-based operations on multiple events easily by using these standard LINQ operators. In addition, there are a number of other reactive stream specific operators that allow powerful queries to be written. Cancellation, exceptions, and synchronization are also handled gracefully by using the extension methods provided by Rx.

using System.Reactive.Linq;

DateTime thisDate = new DateTime(2007, 3, 10, 0, 0, 0);
DateTime dstDate = new DateTime(2007, 6, 10, 0, 0, 0);
DateTimeOffset thisTime;

thisTime = new DateTimeOffset(dstDate, new TimeSpan(-7, 0, 0));
await ShowPossibleTimeZonesAsync(thisTime);

thisTime = new DateTimeOffset(thisDate, new TimeSpan(+8, 0, 0));
await ShowPossibleTimeZonesAsync(thisTime);

thisTime = new DateTimeOffset(thisDate, new TimeSpan(+1, 0, 0));
await ShowPossibleTimeZonesAsync(thisTime);


static async Task ShowPossibleTimeZonesAsync(DateTimeOffset offsetTime)
{
    Console.WriteLine("{0} could belong to the following time zones:",
                      offsetTime.ToString());
    await TimeZoneInfo.GetSystemTimeZones()
       .Where(tz => tz.GetUtcOffset(offsetTime.DateTime).Equals(offsetTime.Offset))
       .ToObservable()
       .ForEachAsync(_ =>
       {
           Console.WriteLine("   {0}", _.DisplayName);
       });
    Console.WriteLine();
}
// Output:
//
// 06/10/2007 00:00:00 -07:00 could belong to the following time zones:
//    (UTC-08:00) Baja California
//    (UTC-08:00) Pacific Time (US & Canada)
//    (UTC-07:00) Arizona
//    (UTC-07:00) Yukon

// 03/10/2007 00:00:00 +08:00 could belong to the following time zones:
//    (UTC+08:00) Beijing, Chongqing, Hong Kong, Urumqi
//    (UTC+08:00) Irkutsk
//    (UTC+08:00) Kuala Lumpur, Singapore
//    (UTC+08:00) Taipei
//    (UTC+08:00) Ulaanbaatar

// 03/10/2007 00:00:00 +01:00 could belong to the following time zones:
//    (UTC+01:00) Amsterdam, Berlin, Bern, Rome, Stockholm, Vienna
//    (UTC+01:00) Belgrade, Bratislava, Budapest, Ljubljana, Prague
//    (UTC+01:00) Brussels, Copenhagen, Madrid, Paris
//    (UTC+01:00) Sarajevo, Skopje, Warsaw, Zagreb
//    (UTC+01:00) West Central Africa