Open Credo

January 26, 2017 | Data Engineering

Reactive event processing with Reactor Core: a first look

Suppose you are given the task of writing code that fulfils the following contract:

  • You will be given a promise that, at some point in the future, some data – a series of values – will become available.
  • In return, you will supply a promise that, at some point in the future, some data representing the results of processing that data will become available.
  • There may be more values to process than you can fit in memory, or even an infinite series of values.
  • You are allowed to specify what will be done with each individual value, as and when it becomes available; this includes discarding some values.
  • Whenever you want to use some external service to do something with a value, that service can only return you a promise that, at some point in the future, some data representing the result of processing that value will become available.

 

This blog is written exclusively by the OpenCredo team. We do not accept external contributions.

WRITTEN BY

Dominic Fox

Dominic Fox

Reactive event processing with Reactor Core: a first look

Using Streams

If you’re familiar with the Java 8 Streams API, you might consider fulfilling this contract using Streams:

  • You will be given a Stream of objects to process.
  • In return, you will supply a Stream of processing results.
  • Because the supplied Stream may be infinite, you will never call any terminal method on that Stream (e.g. to gather its contents into a collection)
  • You will define the processing of values using the Streams API’s map and filter functions.
  • If you call out to an external service, that service will return a Stream; you will use the Stream API’s flatMap function to integrate these results into your results stream.

For example, here’s some code that accepts a Stream of search engine queries, and returns a Stream of search engine results:

public Stream doSearchEngineLookups(Stream queries) {
    return queries.map(this::parseQuery).flatMap(this::getSearchEngineResults);
}

private SearchEngineQuery parseQuery(String query) {
    // Parse the query string into a search engine query
}

private Stream getSearchEngineResults(SearchEngineQuery query) {
    // Call out to search engine to find results matching query
}

The only problem with this is that when the client of doSearchEngineLookups attempts to do something with the Stream of values you’ve returned, any thread working on the stream will block whenever getSearchEngineResults blocks obtaining the next value to return in its Stream of results. It would be nice to have a non-blocking processing pipeline – perhaps we can use CompletableFuture to accomplish this?

Using CompletableFutures

Here’s how using CompletableFutures to fulfil our contract might look:

public CompletableFuture futureQueries) {
    return futureQueries.thenCompose(queries ->
            queries
                    .map(this::parseQuery)
                    .map(this::getSearchEngineResults)
                    .reduce(
                            CompletableFuture.completedFuture(Stream.empty()),
                            (f1, f2) -> f1.thenCombine(f2, Stream::concat)));
}

private CompletableFuture

When the Stream of query strings arrives, we map each value into a SearchEngineQuery and then into a CompletableFuture representing the result of doing the search engine lookup. We then combine all the CompletableFutures in the stream into a single CompletableFuture that will complete when they have all completed, combining the results of all of them together. Now our caller can simply handle a callback from the resulting CompletableFuture when all the results have been gathered, without having to block at any point. That reduce statement is a bit difficult to follow, but otherwise this looks fairly elegant – right?

The problem here is that we can no longer fulfil one of the conditions of our contract, which is that the incoming collection may be too large to fit into memory, or even infinitely long. The reduce method on Stream is a terminal method – it consumes all of the values in the stream – and in our case what we are doing is repeatedly concat-ing result streams together to build one final mega-stream that we can return all in one go. This is fine if we know we are only going to have to gather a few result sets together, but it’s likely to perform badly if the input stream is very large, and fail outright if it never terminates.

What we need here is some abstraction that combines Stream‘s modelling of a possibly-infinite sequence of values, and CompletableFuture‘s modelling of values that may arrive in the future. What we need is a Flux:

Using Reactor Core’s Flux

Here’s what we can do with a Flux:

public Flux doSearchEngineLookups(Flux queries) {
    return queries.map(this::parseQuery).flatMap(this::getSearchEngineResults);
}

private SearchEngineQuery parseQuery(String queryString) {
    return new SearchEngineQuery();
}

private Flux getSearchEngineResults(SearchEngineQuery query) {
    // Call out to search engine to find results matching query
}

How is this different from the earlier Stream example? The mechanism underlying a Stream is an object called a Spliterator, which basically dispatches the next value in the stream (if there is one) to a Consumer on demand, when its tryAdvance method is called:

public boolean tryAdvance(Consumer<? super T> action)

When a terminal method is called on a Stream, an object tasked with summarising values repeatedly calls tryAdvance to ask for results from that Spliterator. This is a synchronous call, since the caller must wait for the boolean return value to know whether or not a value was dispatched, so if obtaining and/or calculating the next value would block, the collector must also block. By contrast, the mechanism underlying a Flux is an object called a Publisher, which negotiates with a Subscriber via a series of non-blocking signals in both directions:

  1. Publisher receives a request to subscribe from a Subscriber.
  2. When the Publisher is ready, it notifies the Subscriber that a Subscription is ready for it.
  3. The Subscriber can then use the Subscription to request either a bounded or an unbounded number of results.
  4. When the Publisher has some results, it sends them, one at a time, to the Subscriber by calling its onNext method.
  5. The Publisher can also signal errors and the end of its data by calling appropriate methods on the Subscriber, while the Subscriber in turn can cancel its Subscription at any time.

This slightly elaborate dance of requests and notifications enables the Publisher and Subscriber to signal to each other independently, across thread boundaries if need be, with neither blocking on the other at any point. Note that every method defined on these interfaces has a void return type: it’s the responsibility of the receiver to handle the signal, dispatching it asynchronously if necessary, and then return as quickly as possible; the sender should never be waiting on a result before deciding what to do next.

This gives us a useful combination of the Spliterator‘s sequential yielding up of values, and the CompletableFuture‘s callback-based handling of values as they arrive (ironically, Spliterator is almost there already, since the tryAdvance method uses a Consumer callback to receive a value if one is available). Building a Flux around a Publisher is similar to building a Stream around a Spliterator: the fluent map, filter and flatMap methods on Flux associate a chain of processing operations with the underlying Publisher in much the same way as Stream‘s methods with the same names.

The major difference is in the way values are eventually handled. With a Stream, we will normally want to invoke a terminal method to gather up the results of processing into a final value – a count, sum or collection of processing results. With a Flux, we will ordinarily want to attach it to a final Subscriber that will handle the onNext and onComplete methods to interact with an asynchronous event sink (for example, writing to a java.nio.AsynchronousChannel). By accepting a Flux as input, and returning a Flux as output, we can insert our processing chain into an end-to-end non-blocking execution model.

Conclusion and further reading

In summary, Flux is an abstraction which brings some of the convenience of the Java Streams API to reactive event processing, making it simple to build fully asynchronous event processors in fairly vanilla-looking Java 8. The subtleties of connecting Fluxes to non-blocking data sources and sinks can be handled by library code (as in Spring 5’s new support for reactive abstractions in handling web requests), leaving the developer to concentrate on the logic of the processing chain.

The Publisher, Subscriber and Subscription interfaces, together with strict rules defining their permitted interactions, belong to the Reactive Streams project. Flux is defined as part of the Reactor project, and is available as part of the Reactor Core library.

RETURN TO BLOG

SHARE

Twitter LinkedIn Facebook Email

SIMILAR POSTS

Blog