October 1, 2015 | Data Engineering
Akka Streams, the new experimental module under Akka project has been finally released in July after some months of development and several milestone and RC versions. In this series I hope to gently introduce concepts from the library and demonstrate how it can be used to address real-life stream processing challenges.
Akka Streams is an implementation of the Reactive Streams specification on top of Akka toolkit that uses actor based concurrency model. Reactive Streams specification has been created by the number of companies interested in asynchronous, non-blocking, event based data processing that can span across system boundaries and technology stacks.
WRITTEN BY
The Reactive Streams initiative is notable because it aims at addressing one of the most critical yet problematic challenges in asynchronous processing, that is the ability to align the processing speeds between producers and consumers of messages while allowing for efficient use of system resources. Basically, it can be potentially catastrophic to allow a fast producer to overwhelm a slower consumer with the rate of incoming messages as these situations generally lead to resource exhaustion somewhere in the path of the data if the source of data is not being back-pressured properly.
In the past back pressure has been commonly achieved by blocking the producer, while waiting on the consumer to process the messages at its own pace. This approach, dependent on synchronous processing of messages between systems is very inefficient and negates the benefits of asynchronous processing (much greater scalability and better resource utilisation), therefore a non-blocking solution for implementing back pressure is required. In the context of reactive streams back pressure is an integral part of asynchronous processing model and is implemented via asynchronous message passing.
Let’s get started with Akka Streams! First, we need a Maven dependency included in our SBT project so let’s add the following to build.sbt:
libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-experimental" % "1.0" )
In order to execute any processing streams we need to create an ActorSystem
and ActorMaterializer
:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
object MyFirstStream {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("MyActorSystem")
implicit val materializer = ActorMaterializer()
// stream definition and execution code goes here
}
}
ActorMaterializer
is responsible for creating actors that will eventually run the processing flow and it requires ActorSystem
to be available implicitly.
Before we can put together and execute even the most basic stream, let’s review a few basic building blocks that Akka Streams provides. Other, more advanced types of processing nodes will be introduced in subsequent parts of this series.
The source is the starting point of the stream, this is where the data flowing through the stream originates from. A source can be anything that can generate messages, like a collection, database query or HTTP request. Akka Streams allows creating sources from a variety of data producing entities:
val sourceFromRange = Source(1 to 10)
val sourceFromIterable = Source(List(1,2,3))
val sourceFromFuture = Source(Future.successful("hello"))
val sourceWithSingleElement = Source.single("just one")
val sourceEmittingTheSameElement = Source.repeat("again and again")
val emptySource = Source.empty
The sink is the ultimate destination of all the messages flowing through the stream. The library supports a number of out-of-the-box sink implementations:
val sinkPrintingOutElements = Sink.foreach[String](println(_))
val sinkCalculatingASumOfElements = Sink.fold[Int, Int](0)(_ + _)
val sinkReturningTheFirstElement = Sink.head
val sinkNoop = Sink.ignore
The flow is a processing step within the stream. It combines one incoming channel and one outgoing channel as well as some transformation of the messages passing through it. Akka Streams offers a rich DSL which is helpful to define different types of simple flows encompassing user defined behaviour:
val flowDoublingElements = Flow[Int].map(_ * 2)
val flowFilteringOutOddElements = Flow[Int].filter(_ % 2 == 0)
val flowBatchingElements = Flow[Int].grouped(10)
val flowBufferingElements = Flow[String].buffer(1000, OverflowStrategy.backpressure) // back-pressures the source if the buffer is full
Streams can represent any arbitrary processing graph or network and Akka Streams allows building such processing graphs very easily using a powerful DSL. Throughout this series we will be exploring various ways to use the library to define increasingly more complex processing graphs, starting with simple linear ones.
The only two mandatory processing steps any stream definition has to include are a source and a sink, so the most trivial stream might be created as:
val streamCalculatingSumOfElements: RunnableGraph[Future[Int]] = sourceFromIterable.toMat(sinkCalculatingASumOfElements)(Keep.right)
Unfortunately, putting the stream together like this forces us to deal with the way that materialised value of the processing stage is combined with the subsequent stage (a sink in the example above), hence the use of toMat
method that allows to specify which materialised value should be exposed during stream materialisation. Using runWith
or runFold
instead collapses attaching the sink and running the entire stream into a single method. Stream materialisation and dealing with materialised values is a pretty involved area and definitely deserves a separate post.
Now, if we want to include any transformations in the stream we might include a flow between the source and the sink:
val streamCalculatingSumOfDoubledElements: RunnableGraph[Future[Int]] = sourceFromIterable.via(flowDoublingElements).toMat(sinkCalculatingASumOfElements)(Keep.right)
It’s also worth noting that all the basic stream building components are fully composable so a source with an attached flow (or even multiple flows) can be treated as a source itself (more on that later in the series).
Now that we have defined the stream (i.e. created a blueprint that describes all the processing steps) we can execute it. This process is called the materialisation of the stream, and involves creating and wiring up Akka actors to create the infrastructure able to process any data generated by the source and finally produce a materialised value of the stream (more on that in future posts in this series).
val sumOfElements: Future[Int] = streamCalculatingSumOfElements.run()
sumOfElements.foreach(println) // we expect to see 6
val sumOfDoubledElements: Future[Int] = streamCalculatingSumOfDoubledElements.run()
sumOfDoubledElements.foreach(println) // we expect to see 12
Now, this may seem like a lot of code to perform some basic arithmetic calculation but I used it merely to demonstrate the basics of defining and running streams using Akka Streams. In fact, the library provides a few different ways of accomplishing this trivial exercise and these require much less code:
// runs the stream by attaching specified sink
sourceFromIterable.via(flowDoublingElements).runWith(sinkCalculatingASumOfElements).foreach(println)
// runs the stream by attaching sink that folds over elements on a stream
Source(List(1,2,3)).map(_ * 2).runFold(0)(_ + _).foreach(println)
In the next blog post in this series we will explore a more realistic scenario that showcases the capabilities of Akka Streams library including using non-linear processing stages.
You can find more details on Akka Streams at:
This blog is written exclusively by the OpenCredo team. We do not accept external contributions.
GOTO Copenhagen 2023 – The 12 Factor App For Data (Recording)
Watch the recording of our Technical Delivery Director, James Bowkett from the GOTO Copenhagen 2023 conference for his talk ‘The 12 Factor App For Data’