Typesafe Activator

Akka Streams with Scala!

Akka Streams with Scala!

typesafehub
Source
July 1, 2014
akka scala sample

Demonstrates Akka Streams

How to get "Akka Streams with Scala!" on your computer

There are several ways to get this template.

Option 1: Choose akka-stream-scala in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for akka-stream-scala in the list of templates.

Option 2: Download the akka-stream-scala project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for akka-stream-scala.

  1. Download the Template Bundle for "Akka Streams with Scala!"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\akka-stream-scala> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a akka-stream-scala project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME akka-stream-scala on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/typesafehub/activator-akka-stream-scala#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

This tutorial contains a few samples that demonstrates Akka Streams.

Akka Streams is an implementation of Reactive Streams, which is a standard for asynchronous stream processing with non-blocking backpressure. Akka Streams is interoperable with other Reactive Streams implementations.

Akka Streams is currently under development and these samples use a preview release, i.e. changes can be expected. Please try it out and send feedback to the Akka mailing list.

Akka Streams provides a way to express and run a chain of asynchronous processing steps acting on a sequence of elements. Every step is processed by one actor to support parallelism. The user describes the “what” instead of the “how”, i.e. things like batching, buffering, thread-safety are handled behind the scenes.

The processing steps are declared with a DSL, a so called Flow. The starting point can be a collection, an iterator, a block of code which is evaluated repeatedly or a org.reactivestreams.api.Producer. The Producer starting point can be the output of another Akka Streams Flow or a Producer implemented by another library.

Each DSL element produces a new Flow that can be further transformed, building up a description of the complete transformation pipeline. In order to execute this pipeline the Flow must be materialized by calling one of the consume, onComplete, toFuture or toProducer methods on it.

It should be noted that the streams modeled by this library are “hot”, meaning that they asynchronously flow through a series of processors without detailed control by the user. In particular it is not predictable how many elements a given transformation step might buffer before handing elements downstream, which means that transformation functions may be invoked more often than for corresponding transformations on strict collections like List. An important consequence is that elements that were produced into a stream may be discarded by later processors, e.g. when using the take combinator.

By default every operation is executed within its own Actor to enable full pipelining of the chained set of computations. This behavior is determined by the akka.stream.FlowMaterializer which is required by those methods that materialize the Flow into a series of org.reactivestreams.api.Processor instances that are started and active. Synchronous compaction of steps is possible (but not yet implemented). The returned Producer is interoperable with other Reactive Streams implementations.

Basic transformation

What does a Flow look like?

Open BasicTransformation.scala

Here we use a Vector as input producer; note that this works because Vector[A] is Iterable[A], which is the expected type. In other words, it is not necessary to have all the elements known in advance.

In this sample we convert each read line to upper case and printing it to the console.

Try to run the sample.stream.BasicTransformation class by selecting it in the 'Main class' menu in the Run tab and click the 'Start' button.

Try to add additional steps in the flow, for example skip short lines:


filter(line => line.length > 3).

All stream manipulation operations can be found in the API documentation.

Backpressure

The mandatory non-blocking backpressure is a key feature of Reactive Streams.

Open WritePrimes.scala

In this sample we use a fast producer and several consumers, with potentially different throughput capacity. To avoid out of memory problems it is important that the producer does not generate elements faster than what can be consumed. Also the speed of the slowest consumer must be taken into account to avoid unbounded buffering in intermediate steps.

Here we use a random number generator as input. The input producer is a block of code which is evaluated repeatedly. It can generate elements very fast if needed.

We filter the numbers through two prime number checks and end up with a stream of prime numbers, which neighbor +2 number is also a prime number. These two flow filter steps can potentially be pipelined, i.e. executed in parallel.

Then we connect that prime number producer to two consumers. One writing to a file, and another printing to the console. To simulate that the file writer is slow we have added an additional sleep.

Try to run the sample.stream.WritePrimes class by selecting it in the 'Main class' menu in the Run tab and click the 'Start' button.

Note that speed of the output in the console is limited by the slow file writer, i.e. one element per second.

Open primes.txt to see the file output.

Stream of streams

Let us take a look at an example of more advanced stream manipulation.

Open GroupLogFile.scala

We want to read a log file and pipe entries of different log level to separate files. For this the groupBy operator is useful. It demultiplexes the incoming stream into separate output streams, one for each element key. The key is computed for each element using the given function. When a new key is encountered for the first time it is emitted to the downstream consumer together with a fresh producer that will eventually produce all the elements of the substream.

In this sample we group by a regular expression matching the log levels and then write the elements of each group to a separate file.

Try to run the sample.stream.GroupLogFile class by selecting it in the 'Main class' menu in the Run tab and click the 'Start' button.

Open the input logfile.txt and look at the resulting output log files in the target directory.

TCP Stream

Akka Streams also provides a stream based API on top of Akka I/O.

Open TcpEcho.scala

When you Run TcpEcho without parameters it starts both client and server in the same JVM and the client connects to the server over port 6000.

The server is started by sending a StreamTcp.Bind message to the actor provided by the akka.stream.io.StreamIO extension. It receives a StreamTcp.TcpServerBinding message as reply. Each new client connection is represented by a new IncomingTcpConnection element in the connectionStream Producer[IncomingTcpConnection]. From the connection the server can operate on the inputStream: Producer[ByteString] and outputStream: Consumer[ByteString].

In this sample the server sends back the same bytes as it receives.


conn.inputStream.produceTo(conn.outputStream)

You can add transformation of the bytes using a Flow. For example convert characters to upper case.


Flow(conn.inputStream).map(byteStr => byteStr.map(_.toChar.toUpper.asInstanceOf[Byte])).
  toProducer(materializer).
  produceTo(conn.outputStream)

The connection from the client is established by sending a StreamTcp.Connect message to the actor provided by the akka.stream.io.StreamIO extension. It receives a StreamTcp.OutgoingTcpConnection message as reply and from that it can write data to the outputStream Consumer[ByteString]. The client can read the response from the server via the inputStream Producer[ByteString].

In this sample the client sends a sequence of characters one-by-one to the server, aggregates the replies into a collection, and finally prints the contents of this collection.

Try to run the sample.stream.TcpEcho class by selecting it in the 'Main class' menu in the Run tab and click the 'Start' button.

That runs the client and server in the same JVM process. It can be more interesting to run them in separate processes. Run the following commands in separate terminal windows.


<path to activator dir>/activator "run-main sample.stream.TcpEcho server 0.0.0.0 6001"		

<path to activator dir>/activator "run-main sample.stream.TcpEcho client 127.0.0.1 6001"		

You can also interact with the server with telnet:


telnet 127.0.0.1 6001

Type a few characters in the telnet session and press enter to see them echoed back to the terminal.

comments powered by Disqus