Typesafe Activator

Distributed workers with Akka and Scala!

Distributed workers with Akka and Scala!

typesafehub
Source
July 3, 2014
sample akka scala

Illustrates a pattern for distributed workers using Akka cluster features.

How to get "Distributed workers with Akka and Scala!" on your computer

There are several ways to get this template.

Option 1: Choose akka-distributed-workers in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for akka-distributed-workers in the list of templates.

Option 2: Download the akka-distributed-workers project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for akka-distributed-workers.

  1. Download the Template Bundle for "Distributed workers with Akka and Scala!"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\akka-distributed-workers> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a akka-distributed-workers project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME akka-distributed-workers on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/typesafehub/activator-akka-distributed-workers#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

The Goal

Some applications need to distribute work to many machines because one single box obviously has limited resources. This tutorial will explain how to implement distributed workers with Akka cluster features.

The solution should support:

  • elastic addition/removal of frontend nodes that receives work from clients
  • elastic addition/removal of worker actors and worker nodes
  • thousands of workers
  • jobs should not be lost, and if a worker fails, the job should be retried

The design is based on Derek Wyatt's blog post Balancing Workload Across Nodes with Akka 2. This article describes the advantages of letting the workers pull work from the master instead of pushing work to the workers.

Explore the Code - Master

The heart of the solution is the Master actor that manages outstanding work and notifies registered workers when new work is available.

The Master actor is a singleton within the nodes with role "backend" in the cluster. This means that there will be one active master actor in the cluster. It runs on the oldest node.

You can see how the master singleton is started in the method startBackend in Main.scala

In case of failure of the master node another master actor is automatically started on a standby node. The master on the standby node takes over the responsibility for outstanding work. Work in progress can continue and will be reported to the new master. The state of the master can be re-created on the standby node using event sourcing. An alternative to event sourcing and the singleton master would be to keep track of all jobs in a central database, but that is more complicated and not as scalable. In the end of the tutorial we will describe how multiple masters can be supported with a small adjustment.

The master actor is made available for workers by registering itself in the ClusterReceptionist.

The frontend actor talks to the master actor via the in the ClusterSingletonProxy.

Later we will explore the implementation of the Master actor in depth, but first we will take a look at the frontend and worker that interacts with the master.

Explore the Code - Front End

A typical frontend provides a RESTful API that is used by the clients to submit (POST) jobs. When the service has accepted the job it returns Created/201 response code to the client. If it can't accept the job it returns a failure response code and the client has to retry or discard the job.

In this example the frontend is emulated, for simplicity, by an ordinary actor, see Frontend.scala and client requests are simulated by the WorkProducer.scala. As you can see the Frontend actor sends the work to the active master via the ClusterSingletonProxy. It doesn't care about the exact location of the master. Somewhere in the cluster there should be one master actor running. The message is sent with ask/? to be able to reply to the client (WorkProducer) when the job has been accepted or denied by the master.

You can see how a Frontend and WorkProducer actor is started in the method startFrontend in Main.scala

Explore the Code - Worker

We should support many worker nodes and we assume that they can be unstable. Therefore we don't let the worker nodes be members of the cluster, instead they communicate with the cluster through the Cluster Client. The worker doesn't have to know exactly where the master is located.

You can see how a worker is started in the method startWorker in Main.scala

Open Worker.scala.

The worker register itself periodically to the master, see the registerTask. This has the nice characteristics that master and worker can be started in any order, and in case of master fail over the worker re-register itself to the new master.

The Frontend actor sends the work to the master actor.

When the worker receives work from the master it delegates the actual processing to a child actor, WorkExecutor, to keep the worker responsive while executing the work.

Explore the Code - Master Revisited

Now when we know more about the Worker and Frontend that interacts with the Master it is time to take a closer look at Master.scala.

Workers register itself to the master with RegisterWorker. Each worker has an unique identifier and the master keeps track of the workers, including current ActorRef (sender of RegisterWorker message) that can be used for sending notifications to the worker. This ActorRef is not a direct link to the worker actor, but messages sent to it will be delivered to the worker. When using the cluster client messages are are tunneled via the receptionist on some node in the cluster to avoid inbound connections from other cluster nodes to the client.

When the master receives Work from frontend it adds the work item to the queue of pending work and notifies idle workers with WorkIsReady message.

To be able to restore same state in case of fail over to a standby master actor the changes (domain events) are stored in an append only transaction log and can be replayed when standby actor is started. Akka Persistence is used for that. Master extends PersistentActor and events are stored in with the calls to the persist method. When the domain event has been saved successfully the master replies with an acknowledgement message (Ack) to the frontend. The master also keeps track of accepted work identifiers to be able to discard duplicates sent from the frontend.

When a worker receives WorkIsReady it sends back WorkerRequestsWork to the master, which hands out the work, if any, to the worker. The master keeps track of that the worker is busy and expect a result within a deadline. For long running jobs the worker could send progress messages, but that is not implemented in the example.

When the worker sends WorkIsDone the master updates its state of the worker and sends acknowledgement back to the worker. This message must also be idempotent as the worker will re-send if it doesn't receive the acknowledgement.

Summary

The Master actor is a Cluster Singleton and register itself in the Cluster Receptionist. The Master is using Akka Persistence to store incoming jobs and state.

The Frontend actor send work to the master via the ClusterSingletonProxy.

The Worker communicate with the cluster and its master with the Cluster Client.

Run the Application

Open the Run tab and select worker.Main followed by Restart. On the left-hand side we can see the console output, which is logging output from nodes joining the cluster, the simulated work and results.

The worker.Main starts three actor systems in the same JVM process. It can be more interesting to run them in separate processes. Stop the application in the Run tab and then open three terminal windows.

In the first terminal window, start the first seed node with the following command:


<path to activator dir>/activator "runMain worker.Main 2551"

2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'.

In the second terminal window, start the frontend node with the following command:


<path to activator dir>/activator "runMain worker.Main 3001"		

3001 is to the port of the node. In the log output you see that the cluster node has been started and joins the 2551 node and becomes a member of the cluster. Its status changed to 'Up'.

Switch over to the first terminal window and see in the log output that the member joined. So far, no Worker has not been started, i.e. jobs are produced and accepted but not processed.

In the third terminal window, start a worker node with the following command:


<path to activator dir>/activator "runMain worker.Main 0"

Now you don't need to specify the port number, 0 means that it will use a random available port. This worker node is not part of the cluster, but it connects to one of the configured cluster nodes via the ClusterClient. Look at the log output in the different terminal windows. In the second window (frontend) you should see that the produced jobs are processed and logged as "Consumed result".

Take a look at the logging that is done in WorkProducer.scala, Master and Worker.scala. Identify the corresponding log entries in the 3 terminal windows.

Shutdown the worker node (third terminal window) with ctrl-c. Observe how the "Consumed result" logs in the frontend node (second terminal window) stops. Start the worker node again.


<path to activator dir>/activator "runMain worker.Main 0"

You can also start more such worker nodes in new terminal windows.

You can start more cluster backend nodes using port numbers between 2000-2999:


<path to activator dir>/activator "runMain worker.Main 2552"

You can start more cluster frontend nodes using port numbers between 3000-3999:


<path to activator dir>/activator "runMain worker.Main 3002"		

Note that this sample runs the shared LevelDB journal on the node with port 2551. This is a single point of failure, and should not be used in production. A real system would use a distributed journal.

The files of the shared journal are saved in the target directory and when you restart the application the state is recovered. You can clean the state with:


<path to activator dir>/activator clean

Many Masters

If the singleton master becomes a bottleneck we can start several master actors and shard the jobs among them. For each shard of master/standby nodes we use a separate cluster role name, e.g. "backend-shard1", "backend-shard2". Implementation of this is left as an exercise.

Next Steps

In this example we have used Cluster Singleton, Cluster Client and Distributed Publish Subscribe.

More in depth documentation can be found in the Cluster Specification and in the Cluster Usage documentation.

comments powered by Disqus