January 26, 2017 | Data Engineering
Suppose you are given the task of writing code that fulfils the following contract:
This blog is written exclusively by the OpenCredo team. We do not accept external contributions.
WRITTEN BY
If you’re familiar with the Java 8 Streams API, you might consider fulfilling this contract using Stream
s:
Stream
of objects to process.Stream
of processing results.Stream
may be infinite, you will never call any terminal method on that Stream
(e.g. to gather its contents into a collection)map
and filter
functions.Stream
; you will use the Stream API’s flatMap
function to integrate these results into your results stream.For example, here’s some code that accepts a Stream of search engine queries, and returns a Stream of search engine results:
public Stream doSearchEngineLookups(Stream queries) {
return queries.map(this::parseQuery).flatMap(this::getSearchEngineResults);
}
private SearchEngineQuery parseQuery(String query) {
// Parse the query string into a search engine query
}
private Stream getSearchEngineResults(SearchEngineQuery query) {
// Call out to search engine to find results matching query
}
The only problem with this is that when the client of doSearchEngineLookups
attempts to do something with the Stream
of values you’ve returned, any thread working on the stream will block whenever getSearchEngineResults
blocks obtaining the next value to return in its Stream
of results. It would be nice to have a non-blocking processing pipeline – perhaps we can use CompletableFuture
to accomplish this?
Here’s how using CompletableFutures to fulfil our contract might look:
public CompletableFuture futureQueries) {
return futureQueries.thenCompose(queries ->
queries
.map(this::parseQuery)
.map(this::getSearchEngineResults)
.reduce(
CompletableFuture.completedFuture(Stream.empty()),
(f1, f2) -> f1.thenCombine(f2, Stream::concat)));
}
private CompletableFuture
When the Stream
of query strings arrives, we map each value into a SearchEngineQuery
and then into a CompletableFuture
representing the result of doing the search engine lookup. We then combine all the CompletableFuture
s in the stream into a single CompletableFuture
that will complete when they have all completed, combining the results of all of them together. Now our caller can simply handle a callback from the resulting CompletableFuture
when all the results have been gathered, without having to block at any point. That reduce
statement is a bit difficult to follow, but otherwise this looks fairly elegant – right?
The problem here is that we can no longer fulfil one of the conditions of our contract, which is that the incoming collection may be too large to fit into memory, or even infinitely long. The reduce
method on Stream
is a terminal method – it consumes all of the values in the stream – and in our case what we are doing is repeatedly concat
-ing result streams together to build one final mega-stream that we can return all in one go. This is fine if we know we are only going to have to gather a few result sets together, but it’s likely to perform badly if the input stream is very large, and fail outright if it never terminates.
What we need here is some abstraction that combines Stream
‘s modelling of a possibly-infinite sequence of values, and CompletableFuture
‘s modelling of values that may arrive in the future. What we need is a Flux:
Here’s what we can do with a Flux:
public Flux doSearchEngineLookups(Flux queries) {
return queries.map(this::parseQuery).flatMap(this::getSearchEngineResults);
}
private SearchEngineQuery parseQuery(String queryString) {
return new SearchEngineQuery();
}
private Flux getSearchEngineResults(SearchEngineQuery query) {
// Call out to search engine to find results matching query
}
How is this different from the earlier Stream
example? The mechanism underlying a Stream
is an object called a Spliterator
, which basically dispatches the next value in the stream (if there is one) to a Consumer
on demand, when its tryAdvance
method is called:
public boolean tryAdvance(Consumer<? super T> action)
When a terminal method is called on a Stream
, an object tasked with summarising values repeatedly calls tryAdvance
to ask for results from that Spliterator
. This is a synchronous call, since the caller must wait for the boolean return value to know whether or not a value was dispatched, so if obtaining and/or calculating the next value would block, the collector must also block. By contrast, the mechanism underlying a Flux
is an object called a Publisher, which negotiates with a Subscriber via a series of non-blocking signals in both directions:
subscribe
from a Subscriber.onNext
method.cancel
its Subscription at any time.This slightly elaborate dance of requests and notifications enables the Publisher and Subscriber to signal to each other independently, across thread boundaries if need be, with neither blocking on the other at any point. Note that every method defined on these interfaces has a void
return type: it’s the responsibility of the receiver to handle the signal, dispatching it asynchronously if necessary, and then return as quickly as possible; the sender should never be waiting on a result before deciding what to do next.
This gives us a useful combination of the Spliterator
‘s sequential yielding up of values, and the CompletableFuture
‘s callback-based handling of values as they arrive (ironically, Spliterator
is almost there already, since the tryAdvance
method uses a Consumer
callback to receive a value if one is available). Building a Flux
around a Publisher
is similar to building a Stream
around a Spliterator
: the fluent map
, filter
and flatMap
methods on Flux
associate a chain of processing operations with the underlying Publisher
in much the same way as Stream
‘s methods with the same names.
The major difference is in the way values are eventually handled. With a Stream
, we will normally want to invoke a terminal method to gather up the results of processing into a final value – a count, sum or collection of processing results. With a Flux
, we will ordinarily want to attach it to a final Subscriber
that will handle the onNext
and onComplete
methods to interact with an asynchronous event sink (for example, writing to a java.nio.AsynchronousChannel
). By accepting a Flux
as input, and returning a Flux
as output, we can insert our processing chain into an end-to-end non-blocking execution model.
In summary, Flux
is an abstraction which brings some of the convenience of the Java Streams API to reactive event processing, making it simple to build fully asynchronous event processors in fairly vanilla-looking Java 8. The subtleties of connecting Flux
es to non-blocking data sources and sinks can be handled by library code (as in Spring 5’s new support for reactive abstractions in handling web requests), leaving the developer to concentrate on the logic of the processing chain.
The Publisher, Subscriber and Subscription interfaces, together with strict rules defining their permitted interactions, belong to the Reactive Streams project. Flux is defined as part of the Reactor project, and is available as part of the Reactor Core library.
GOTO Copenhagen 2023 – The 12 Factor App For Data (Recording)
Watch the recording of our Technical Delivery Director, James Bowkett from the GOTO Copenhagen 2023 conference for his talk ‘The 12 Factor App For Data’