Akka Streams with Scala!
Demonstrates Akka StreamsTweet
Demonstrates Akka StreamsTweet
There are several ways to get this template.
akka-stream-scalain the Typesafe Activator UI.
Already have Typesafe Activator (get it
here)? Launch the UI then
akka-stream-scala in the list of
akka-stream-scalaproject as a zip archive
If you haven't installed Activator, you can get the code
by downloading the template bundle
In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:
Or from a command line:
This will start Typesafe Activator and open this template in your browser.
C:\Users\typesafe\akka-stream-scala> activator ui
akka-stream-scalaproject from the command line
If you have Typesafe Activator, use its command line mode
to create a new project from this template.
activator new PROJECTNAME akka-stream-scala on the command line.
The creator of this template maintains it at https://github.com/typesafehub/activator-akka-stream-scala#master.
We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.
This tutorial contains a few samples that demonstrates Akka Streams.
Akka Streams is an implementation of Reactive Streams, which is a standard for asynchronous stream processing with non-blocking backpressure. Akka Streams is interoperable with other Reactive Streams implementations.
Akka Streams is currently under development and these samples use a preview release, i.e. changes can be expected. Please try it out and send feedback to the Akka mailing list.
Akka Streams provides a way to express and run a chain of asynchronous processing steps acting on a sequence of elements. Every step is processed by one actor to support parallelism. The user describes the “what” instead of the “how”, i.e. things like batching, buffering, thread-safety are handled behind the scenes.
The processing steps are declared with a DSL, a so called
Flow may be connected to a
Source and/or a
Sink. It may also exist without either of these end points, as an "open" flow. Any open flow when connected to a
Source itself becomes a
Source and likewise when connected to a
Sink becomes a
Flow with both a
Sink is called a
RunnableFlow and may be executed.
Source and can be constructed from a collection, an iterator, a future, or a function which is evaluated repeatedly.
Each DSL element produces a new Flow that can be further transformed, building up a description of the complete transformation pipeline. In order to execute this pipeline the
Flow must be runnable (have both
Sink endpoints, and is materialized by calling one of the execution methods which include
Flow involves a process called materialization, which requires a
FlowMaterializer configured for an actor system. This
FlowMaterializer can (and usually is) an implicit value to streamline the calls.
It should be noted that the streams modeled by this library are “hot”, meaning that they asynchronously flow through a series of processors without detailed control by the user. In particular it is not predictable how many elements a given transformation step might buffer before handing elements downstream, which means that transformation functions may be invoked more often than for corresponding transformations on strict collections like
List. An important consequence is that elements that were produced into a stream may be discarded by later processors, e.g. when using the
By default every operation is executed within its own
Actor to enable full pipelining of the chained set of computations. This behavior is determined by the
akka.stream.scaladsl2.FlowMaterializer which is required by those methods that materialize the Flow into a series of
org.reactivestreams.Processor instances that are started and active. Synchronous compaction of steps is possible (but not yet implemented).
What does a
Flow look like?
Here we use an
Iterator over the Array produced by splitting the text using the spaces, as input producer; note that the iterator is an
Iterator[String] and this produces a
Source[String]. The flow written to use this
Source must match the type, so we could not treat the source as a source of
Int for example.
In this sample we convert each read line to upper case and printing it to the console. This is done in the lines
map(_.toUpperCase) takes Strings and produces Strings. Behind the scenes, this constructs a
Transformer[String, String] which is itself a
Flow When this is attached to the
Source[String], the result is a new
Flow that is also a
Source[String]. If the map was over a function that converted, say, String to Int, the result would be a
Source[Int] when attaching it to this
foreach(println) constructs and attaches a
Sink, in this case an implementation called
ForeachDrain and again this is specifically a
ForeachDrain[String] which matches the type of the
Source[String]. The result of attaching this matching
Sink to the
Source creates a
RunnableFlow which is then also run buy the
foreach on a collection (which returns
foreach on a Flow returns a
Future[Unit] instead. Because we get a
Future back, we can use it to shutdown the actor system once the flow is completed. This is accomplished by the final line in the flow:
onComplete(_ => system.shutdown())
Try to run the
sample.stream.BasicTransformation class by selecting it in the 'Main class' menu in the tab and click the 'Start' button.
Try to add additional steps in the flow, for example skip short lines:
filter(line => line.length > 3).
The API is intended to be familiar to anyone used to the collections API in Scala.
All stream manipulation operations can be found in the API documentation.
The mandatory non-blocking backpressure is a key feature of Reactive Streams.
In this sample we use a fast producer and several consumers, with potentially different throughput capacity. To avoid out of memory problems it is important that the producer does not generate elements faster than what can be consumed. Also the speed of the slowest consumer must be taken into account to avoid unbounded buffering in intermediate steps.
Here we use a random number generator as input. The input producer is a block of code which is evaluated repeatedly. It can generate elements very fast if needed.
We filter the numbers through two prime number checks and end up with a stream of prime numbers, which neighbor +2 number is also a prime number. These two flow filter steps can potentially be pipelined, i.e. executed in parallel.
Then we connect that prime number producer to two consumers. One writing to a file, and another printing to the console. To simulate that the file writer is slow we have added an additional sleep.
Try to run the
sample.stream.WritePrimes class by selecting it in the 'Main class' menu in the tab and click the 'Start' button.
Note that speed of the output in the console is limited by the slow file writer, i.e. one element per second.
Open primes.txt to see the file output.
Let us take a look at an example of more advanced stream manipulation.
We want to read a log file and pipe entries of different log level to separate files. For this the
groupBy operator is useful. It demultiplexes the incoming flow into separate source flows, one for each element key. The key is computed for each element using the given function. When a new key is encountered for the first time it is emitted to the downstream consumer together with a fresh producer that will eventually produce all the elements of the substream.
In this sample we group by a regular expression matching the log levels and then write the elements of each group to a separate file.
Try to run the
sample.stream.GroupLogFile class by selecting it in the 'Main class' menu in the tab and click the 'Start' button.
Akka Streams also provides a stream based API on top of Akka I/O.
TcpEcho without parameters it starts both client and server in the same JVM and the client connects to the server over port 6000.
The server is started by sending a
StreamTcp.Bind message to the actor provided by the
akka.stream.io.StreamIO extension. It receives a
StreamTcp.TcpServerBinding message as reply. Each new client connection is represented by a new
IncomingTcpConnection element in the
Publisher[IncomingTcpConnection]. From the connection the server can operate on the
inputStream: Publisher[ByteString] and
In this sample the server sends back the same bytes as it receives.
Source(inStream)constructs a Source from the
Sink(inStream)constructs a Sink from the
You can add transformation of the bytes using a
Flow. For example convert characters to upper case.
Source(testInput). map(byteStr => byteStr.map(_.toChar.toUpper.asInstanceOf[Byte])). connect(Sink(clientBinding.outputStream)).run()
The connection from the client is established by sending a
StreamTcp.Connect message to the actor provided by the
akka.io.IO extension. It receives a
StreamTcp.OutgoingTcpConnection message as reply and from that it can write data to the
Subscriber[ByteString]. The client can read the response from the server via the
In this sample the client sends a sequence of characters one-by-one to the server, aggregates the replies into a collection, and finally prints the contents of this collection.
Try to run the
sample.stream.TcpEcho class by selecting it in the 'Main class' menu in the tab and click the 'Start' button.
That runs the client and server in the same JVM process. It can be more interesting to run them in separate processes. Run the following commands in separate terminal windows.
<path to activator dir>/activator "run-main sample.stream.TcpEcho server 0.0.0.0 6001"
<path to activator dir>/activator "run-main sample.stream.TcpEcho client 127.0.0.1 6001"
You can also interact with the server with telnet:
telnet 127.0.0.1 6001
Type a few characters in the telnet session and press enter to see them echoed back to the terminal.