Typesafe Activator

Tweetmap Workshop Template

Tweetmap Workshop Template

retroryan
Source
September 17, 2014
basics playframework akka scala starter tweetmap

Starting Template for the Tweetmap Workshop

How to get "Tweetmap Workshop Template" on your computer

There are several ways to get this template.

Option 1: Choose tweetmap-workshop in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for tweetmap-workshop in the list of templates.

Option 2: Download the tweetmap-workshop project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for tweetmap-workshop.

  1. Download the Template Bundle for "Tweetmap Workshop Template"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\tweetmap-workshop> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a tweetmap-workshop project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME tweetmap-workshop on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/retroryan/activator-tweetmap-workshop#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

Welcome to the Tweetmap Going Reactive Tutorial! This tutorial starts with a basic Play Framework application and uses it to build a reactive tweet map.

Within the Activator UI you can:

  • Browse & edit the code (select Code. To save a file the keyboard shortcut command-s works.)
  • Add & delete files from the code (select Code and then the plus sign. To delete open the file and click on delete)
  • Open the code in IntelliJ IDEA or Eclipse (select Code, then the gear dropdown)
  • See the compile output (select Compile)
  • Test the application (select Test)
  • Run the application (select Run)

View the App

Click on the Run Tab and click on start to start the application running. Activator will automatically update the server when changes are made to the code.

Once the application has been compiled and the server started, your application can be accessed at: http://localhost:9000

Check in Run to see the server status.

Reactive Requests

Play Scala uses Futures to execute asynchronous tasks in the background. The Future is the handle to a future result. A callback function is added to the Future that gets called when the future completes.

The primary way of adding a callback to a Future is to add a map method that essentially means map the result of the Future to a new value - which in this case is a Response.

1. Create a new route that will search twitter by updatingconf/routes with the following route:

GET    /search      controllers.Tweets.search(query: String)

2. Update app/controllers/Tweets.scala to add a reactive request handler (or controller) for /tweets:



import scala.concurrent.Future
import play.api.libs.json.{JsValue, Json}
import play.api.libs.ws._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import play.api.Play.current

/**
* A reactive request is made in Play by returning a Future[Result].  This makes the request asynchronous
* since the server doesn't block waiting for the response.  This frees the server thread to handle other requests.
* The callback to the Future is added as a map, which maps the results to a new value when the Future completes.
* The results of the Future in this example are mapped to a result (HTTP 200 OK) that gets returned to the client.
**/
def search(query: String) = Action.async {
    fetchTweets(query).map(tweets => Ok(tweets))
}

/**
* Fetch the latest tweets and return the Future[JsValue] of the results.
* This fetches the tweets asynchronously and fulfills the Future when the results are returned by calling the function.
* The results are first filtered and only returned if the result status was OK.
* Then the results are mapped (or transformed) to JSON.
**/
def fetchTweets(query: String): Future[JsValue] = {
    val tweetsFuture = WS.url("http://twitter-search-proxy.herokuapp.com/search/tweets").withQueryString("q" -> query).get()
    tweetsFuture
        .filter(response => response.status == play.api.http.Status.OK)
        .map { response =>
                response.json
            } recover {
                case _ => Json.obj("statuses" -> Json.arr(Json.obj("text" -> "Error retrieving tweets")))
            }
}

3. Test it: http://localhost:9000/search?query=typesafe

AngularJS UI

1. The build.sbt file already has dependencies on AngularJS and Bootstrap:

"org.webjars" % "bootstrap" % "3.1.1",
"org.webjars" % "angularjs" % "1.2.16",
        

2. AngularJS has already been enabled in the main twirl template


<html ng-app="tweetMapApp">
<script src="@routes.Assets.versioned("lib/angularjs/angular.min.js")"></script>

3. Add the following to index.js to fetch the tweets:



app.factory('Twitter', function($http, $timeout) {

    var twitterService = {
        tweets: [],
        query: function (query) {
            $http({method: 'GET', url: '/search', params: {query: query}}).
                success(function (data) {
                    twitterService.tweets = data.statuses;
                });
        }
    };

    return twitterService;
});

app.controller('Search', function($scope, $http, $timeout, Twitter) {

    $scope.search = function() {
        Twitter.query($scope.query);
    };

});

app.controller('Tweets', function($scope, $http, $timeout, Twitter) {

    $scope.tweets = [];

    $scope.$watch(
        function() {
            return Twitter.tweets;
        },
        function(tweets) {
            $scope.tweets = tweets;
        }
    );

});

4. Replace the contents of index.scala.html file with:


@(message: String)

@main(message) {

    <div ng-controller="Tweets">
        <ul>
            <li ng-repeat="tweet in tweets">{{tweet.text}}</li>
        </ul>
    </div>
}

5. Run the app, make a query, and verify the tweets show up: http://localhost:9000

WebSockets with Akka Actor

WebSockets provide a bi-directional, full-duplex communications channels over a single TCP connection.

Play provides different mechanisms for handling WebSockets. In this tutorial we are going to use an actor to handle the WebSockets. First create the Actor to handle the WebSockets. First create the actor that will handle the WebSocket communication.

1. Create a new UserActor.scala file in /app/actors containing:



package actors

import akka.actor.{ActorLogging, Actor, ActorRef, Props}
import play.api.libs.json.JsValue
import scala.concurrent.duration._
import play.api.libs.concurrent.Execution.Implicits.defaultContext
import controllers.Tweets

/** The out actor is wired in by Play Framework when this Actor is created.
*   When a message is sent to out the Play Framework then sends it to the client WebSocket.
*
**/
class UserActor(out: ActorRef) extends Actor with ActorLogging {

    //The query is optional so that it starts as a None until the user issues the first query.
    var maybeQuery: Option[String] = None

    //Simulate events by periodically sending a message to self to fetch tweets.
    val tick = context.system.scheduler.schedule(Duration.Zero, 5.seconds, self, UserActor.FetchTweets)

    def receive = {
        //Handle the FetchTweets message to periodically fetch tweets if there is a query available.
        case UserActor.FetchTweets =>
            maybeQuery.map { query =>
                //sending a message to out sends it to the client websocket out by the Play Framework.
                Tweets.fetchTweets(query).map(tweetUpdate =>  out ! tweetUpdate)
            }

            case message: JsValue =>
                maybeQuery = (message \ "query").asOpt[String]
    }

    override def postStop() {
        tick.cancel()
    }

}

object UserActor {
    case object FetchTweets

    def props(out: ActorRef) = Props(new UserActor(out))
}

Wire up the WebSockets

WebSockets are created in Play using a normal route. The difference is the controller returns a WebSocket instead of a Result.

1. Add a route for the WebSockets connection to the routes file:

GET    /ws      controllers.Tweets.ws

2. Add a new controller method to create the Websocket in app/controllers/Tweets.scala:


    import play.api.mvc.WebSocket
    import actors.UserActor

    def ws = WebSocket.acceptWithActor[JsValue, JsValue] { request => out =>
        UserActor.props(out)
    }

3. Update the body of the app.factory section of index.js replacing the var twitterService = ... with :


    var ws = new WebSocket("ws://localhost:9000/ws");

    var twitterService = {
        tweets: [],
        query: function (query) {
            ws.send(JSON.stringify({query: query}));
        }
    };

    ws.onmessage = function(event) {
        $timeout(function() {
            twitterService.tweets = JSON.parse(event.data).statuses;
        });
    };

    return twitterService;

To verify tweets are showing up it is useful to use a browser inspector and then look under network for the path ws. Under there look at the frame and verify the requests are being sent. In chrome the network inspector has a bug and the websocket calls are not refreshed unless you tab out and back in.

4. Run the app and verify the tweets show up: http://localhost:9000

Update the Twitter Search to add Geo-Coding

1. Create new functions in app/controllers/Tweets.scala to to get (or fake) the location of the tweets:



//update the json imports to these imports:
import play.api.libs.json.{JsObject, JsValue, Json}
import play.api.libs.json.__

import scala.util.Random


private def putLatLonInTweet(latLon: JsValue) = __.json.update(__.read[JsObject].map(_ + ("coordinates" -> Json.obj("coordinates" -> latLon))))

private def tweetLatLon(tweets: Seq[JsValue]): Future[Seq[JsValue]] = {
    val tweetsWithLatLonFutures = tweets.map { tweet =>
        if ((tweet \ "coordinates" \ "coordinates").asOpt[Seq[Double]].isDefined) {
            Future.successful(tweet)
        } else {
            val latLonFuture: Future[(Double, Double)] = (tweet \ "user" \ "location").asOpt[String].map(lookupLatLon).getOrElse(Future.successful(randomLatLon))
            latLonFuture.map { latLon =>
                tweet.transform(putLatLonInTweet(Json.arr(latLon._2, latLon._1))).getOrElse(tweet)
            }
        }
    }

    Future.sequence(tweetsWithLatLonFutures)
}

private def randomLatLon: (Double, Double) = ((Random.nextDouble * 180) - 90, (Random.nextDouble * 360) - 180)

private def lookupLatLon(query: String): Future[(Double, Double)] = {
    val locationFuture = WS.url("http://maps.googleapis.com/maps/api/geocode/json").withQueryString(
        "sensor" -> "false",
        "address" -> query
    ).get()

    locationFuture.map { response =>
        (response.json \\ "location").headOption.map { location =>
                ((location \ "lat").as[Double], (location \ "lng").as[Double])
            }.getOrElse(randomLatLon)
    }
}

2. In app/controllers/Tweets.scala update the fetchTweets function to use the new tweetLatLon function:




def fetchTweets(query: String): Future[JsValue] = {
    val tweetsFuture = WS.url("http://twitter-search-proxy.herokuapp.com/search/tweets").withQueryString("q" -> query).get()
        tweetsFuture.flatMap { response =>
            tweetLatLon((response.json \ "statuses").as[Seq[JsValue]])
        } recover {
            case _ => Seq.empty[JsValue]
        } map { tweets =>
            Json.obj("statuses" -> tweets)
        }
}

    

Add the Tweet Map

1. The webjar dependency on leaflets has already been added to build.sbt

2. The Leaflet CSS and JS have already been added to main.scala.html file:


    <link rel='stylesheet' href='@routes.Assets.versioned("lib/leaflet/leaflet.css")'>
    <script type='text/javascript' src='@routes.Assets.versioned("lib/leaflet/leaflet.js")'></script>
    <script type='text/javascript' src='@routes.Assets.versioned("lib/angular-leaflet-directive/angular-leaflet-directive.min.js")'></script>

3. Above the <ul> in index.scala.html add a map:

<leaflet width="100%" height="500px" markers="markers"></leaflet>

4. Update the first line of index.js with:


    var app = angular.module('tweetMapApp', ["leaflet-directive"]);

5. Update the body of the app.controller('Tweets' ... section of the index.js file with the following:


    $scope.tweets = [];
    $scope.markers = [];

    $scope.$watch(
        function() {
            return Twitter.tweets;
        },
        function(tweets) {
            $scope.tweets = tweets;

            $scope.markers = tweets.map(function(tweet) {
                return {
                    lng: tweet.coordinates.coordinates[0],
                    lat: tweet.coordinates.coordinates[1],
                    message: tweet.text,
                    focus: true
                }
            });

        }
    );



6. Go to http://localhost:9000 to see the TweetMap!

Test the Controller

1. Update the test/ApplicationSpec.scala file with these tests:

import play.api.libs.json.JsValue
import play.api.test._
import play.api.test.Helpers._

"Application" should {

    "render index template" in new WithApplication {
        val html = views.html.index("Coco")
        contentAsString(html) must include("Coco")
    }

    "render the index page" in new WithApplication{
        val home = route(FakeRequest(GET, "/")).get

        status(home) must be(OK)
        contentType(home) must be(Some("text/html"))
        contentAsString(home) must include("Tweets")
    }

    "search for tweets" in new WithApplication {
        val search = controllers.Tweets.search("typesafe")(FakeRequest())

        status(search) must be(OK)
        contentType(search) must be(Some("application/json"))
        (contentAsJson(search) \ "statuses").as[Seq[JsValue]].length must be > 0
    }

}
 

2. Run the tests in Activator

Achitecture Pitfalls?

What are some of the problems with the current architecture?

1. fetchTweets is inside the Application controller and is called from the UserActor - poor separation of concerns.

2. fetchTweets should be a separate service that can be run and scaled independently from the rest of the application and managed independently.

3. In a production scenario the fetchTweets service would need a twitter api token and would need to be shared between clients. The service should be designed in a way that it can be reused and scaled indepedntly.

Fetch Tweets Service - Akka Actor

Create a new actor that will act as the Global Fetch Tweet Loader. Note that the Play WS API requires a running Play Application - it takes an implicit app: Application. If it where used in the Akka Actor then the actor could not be run independent from the Play Application.

To get around this limitation we use the TweetAPI that is defined in TweetAPI.scala to call the twitter WS API. It creates a WS client without requiring a Play Application.

1. Create a new TweetLoader.scala file in /app/actors with the following actor:

package actors

import akka.actor._
import scala.concurrent.{ExecutionContext, Future}
import play.api.libs.json._
import scala.util._
import scala.util.Failure
import scala.util.Success
import play.api.libs.json.JsObject

/**
 * Tweet Loader Actor
 */
class TweetLoader extends Actor with ActorLogging {

    override def receive: Receive = {
        case TweetLoader.LoadTweet(search) => {
            implicit val ec: ExecutionContext = context.system.dispatcher
            fetchTweets(search) onComplete {
                case Success(respJson) ⇒ {
                    log.info(s"tweet loader returning json")
                    sender() ! TweetLoader.NewTweet(respJson)
                }
                case Failure(f) ⇒ {
                    log.info(s"tweet loader failed!")
                    sender() ! Status.Failure(f)
                }
            }
        }
    }

    // searches for tweets based on a query
    def fetchTweets(query: String)(implicit ec: ExecutionContext): Future[JsValue] = {
        val tweetsFuture = TweetAPI.tweetWS(query)

        tweetsFuture.flatMap { response =>
            tweetLatLon((response.json \ "statuses").as[Seq[JsValue]])
        } recover {
            case errMsg => {
                log.error(s"ERROR Loading Tweets: $errMsg")
                Seq.empty[JsValue]
            }
        } map { tweets =>
            Json.obj("statuses" -> tweets)
        }
    }


    private def putLatLonInTweet(latLon: JsValue) = __.json.update(__.read[JsObject].map(_ + ("coordinates" -> Json.obj("coordinates" -> latLon))))

    private def tweetLatLon(tweets: Seq[JsValue])(implicit ec: ExecutionContext): Future[Seq[JsValue]] = {
        val tweetsWithLatLonFutures = tweets.map { tweet =>
            if ((tweet \ "coordinates" \ "coordinates").asOpt[Seq[Double]].isDefined) {
                Future.successful(tweet)
            } else {
                val latLonFuture: Future[(Double, Double)] = (tweet \ "user" \ "location").asOpt[String]
                    .map(lookupLatLon)
                    .getOrElse(Future.successful(randomLatLon))
                latLonFuture.map { latLon =>
                    tweet.transform(putLatLonInTweet(Json.arr(latLon._2, latLon._1))).getOrElse(tweet)
                }
            }
        }
        Future.sequence(tweetsWithLatLonFutures)
    }

    private def randomLatLon: (Double, Double) = ((Random.nextDouble * 180) - 90, (Random.nextDouble * 360) - 180)

    private def lookupLatLon(query: String)(implicit ec: ExecutionContext): Future[(Double, Double)] = {
        val locationFuture = TweetAPI.geocodeWS(query)

        locationFuture.map { response =>
            (response.json \\ "location").headOption.map { location =>
                ((location \ "lat").as[Double], (location \ "lng").as[Double])
            }.getOrElse(randomLatLon)
        }
    }
}

object TweetLoader {
    case class LoadTweet(search: String)
    case class NewTweet(tweet: JsValue)

    def props(): Props = {
        Props(new TweetLoader())
    }
}

 

Fetch Tweets Service - Play Plugin

Fetch tweets will be a global service that all clients will share. This will be done by creating an Actors Play Plugin that will create a global instance of the actor and then registering it as a Play plugin.

1. The file/actors/Actors.scala was already added that defines the Actor plugin. This file just needs to be updated to load the Tweet Loader Actor - uncomment the two lines that create the reference to the Actor.

        def tweetLoader(implicit app: Application) = actors.tweetLoader
 
        private lazy val tweetLoader = system.actorOf(TweetLoader.props, "tweetLoader")
 

2. To register the plugin the file /conf/play.plugins was already added that configures the Actor plugin. The number represents the plugin loading order, by setting it to > 1000 we make sure it’s loaded after the global plugins.

    1200:actors.Actors
 

Update UserActor

/app/actors/UserActor.scala currently calls back into the main Application methods to fetch tweets. Change the code so that it gets a reference the Tweet Loader Actor and call it to fetch tweets:

1. Update the constructor and props to take a reference to the TweetLoader Actor as parameters:

    class UserActor(out: ActorRef, tweetLoader: ActorRef) .... 

    def props(out: ActorRef, tweetLoader: ActorRef): Props = {
        Props(new UserActor(out, tweetLoader))
    }
 

2. Change the FetchTweets handler to use the tweetLoader actor reference:

    case UserActor.FetchTweets =>
            maybeQuery.foreach { query =>
                tweetLoader ! TweetLoader.LoadTweet(query)
            }
 

3. Add a message handler that processes the new tweets from the Tweet Loader:

        case TweetLoader.NewTweet(tweet) => {
            log.info(s"New Tweet received - sending back to client")
            out ! tweet
        }
 

4. Update the creation of the UserActor in app/controllers/Tweets.scala to reference the tweet loader: .

    def ws = WebSocket.acceptWithActor[JsValue, JsValue] { request => out =>
            UserActor.props(out, actors.Actors.tweetLoader)
    }

5. Test out your changes - warning a subtle bug has been introduced that breaks the code.

Find and Fix the Bug!

Any clues into why the tweets are not being returned back to the client? Where in the message flow are they being lost?

1. Did you notice an error similar to the following in the console?

   [INFO] [09/13/2014 22:58:33.764] [application-akka.actor.default-dispatcher-3] [akka://application/deadLetters] Message [actors.TweetLoader$NewTweet] from Actor[akka://application/user/tweetLoader#1309203340] to Actor[akka://application/deadLetters] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
 

2. Update /app/actors/TweetLoader.scala to print out the sender it is sending the JSON back to:


    log.info(s"tweet loader returning json to ${sender()}")
 

3. /app/actors/TweetLoader.scala is calling the sender() in the callback to a future. When sender() gets called in the future, sender() is no longer valid and instead points to dead letters - the equivalent of /dev/null in Akka. Instead we need to capture the sender() Actor reference and use that instead.

    case TweetLoader.LoadTweet(search) => {
            val querySender = sender()

            implicit val ec: ExecutionContext = context.system.dispatcher

            fetchTweets(search) onComplete {
                case Success(respJson) ⇒ {
                    log.info(s"tweet loader returning json to ${querySender.path}")
                    querySender ! TweetLoader.NewTweet(respJson)
                }
                case Failure(f) ⇒ {
                    log.info(s"tweet loader failed!")
                    querySender ! Status.Failure(f)
                }
            }
        }
 

4. Now test out your changes - everything should be working!

Clustering - Part 1

Separating out the Tweet Loader definitely improved the architecture. Now lets make the Tweet Loader fully reactive by making it a separate service that can be run independently from the main Play Application.

1. First update /conf/application.conf to add configuration for Akka Clustering by adding the following at the end of the file. This enables Akka remoting, defines the actor provider to use cluster and defines the seed nodes used to initiate the cluster.

   # Akka configuration
akka {

  loglevel = INFO

  #This changes the actor provider from being local to being cluster aware.
  actor.provider = "akka.cluster.ClusterActorRefProvider"

  remote.netty.tcp {
    hostname = "127.0.0.1"
    port = 2551
  }

  cluster {
    #seed nodes are the nodes that other nodes look for to join the cluster.
    seed-nodes = ["akka.tcp://application@127.0.0.1:2551", "akka.tcp://application@127.0.0.1:2552"]
    #the default cluster role
    roles = ["frontend"]
    auto-down = on
  }
}

2. Create a new folder /app/backend and add to it a file called MainTweetLoader.scala. This will be used to start a stand-alone Akka cluster node. All that it needs to do is use a Java main to start-up and create an Akka actor system:

    package backend

import akka.actor._
import com.typesafe.config.ConfigFactory
import scala.io.StdIn

/**
 * Main class for starting cluster nodes.
 */
object MainTweetLoader {

    def main(args: Array[String]): Unit = {
        //The first parameter is the port to start the remote listener on.  It defaults to 0
        //which causes akka to start the remote listener on a random port.
        val system = if (args.isEmpty)
            startSystem("0")
        else
            startSystem(args(0))

        commandLoop(system)
    }

    def startSystem(port: String) = {

        val role = "backend-loader"
        val config = ConfigFactory.parseString(s"akka.cluster.roles=[$role]").
            withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port)).
            withFallback(ConfigFactory.load())

        // Create an actor system with the name of application - this is the same name
        // that play uses for it's actor system.  The names need to be the same so they
        // can join together in a cluster.
        ActorSystem("application", config)
    }

    def commandLoop(system: ActorSystem): Unit = {
        val line: String = StdIn.readLine()
        if (line.startsWith("s")) {
            system.shutdown()
        } else {
            commandLoop(system)
        }
    }
}

 

3. An sbt command alias has already been added to the build in /build.sbt

    addCommandAlias("rb", "runMain backend.MainTweetLoader 2552 -Dakka.remote.netty.tcp.port=2552 -Dakka.cluster.roles.0=backend-loader")
 

4. Test out the changes. First completely shutdown the play server and restart it. Then in a separate terminal window start an instance of the backend service by running:

  sbt rb
 

The backend node can be stopped by simply typing a 's' and hitting enter in the terminal window.

You should see in the log output of the terminal windows the cluster being started and the backend joining into the cluster. In the play log you should see something similiar to:

    Node [akka.tcp://application@127.0.0.1:2551] is JOINING, roles [frontend]
    Leader is moving node [akka.tcp://application@127.0.0.1:2551] to [Up]
    Cluster Node [akka.tcp://application@127.0.0.1:2551] - Node [akka.tcp://application@127.0.0.1:2552] is JOINING, roles [backend-loader]
 

And in the second terminal window you should see something similiar to:

    Cluster Node [akka.tcp://application@127.0.0.1:2552] - Started up successfully
    Cluster Node [akka.tcp://application@127.0.0.1:2552] - Welcome from [akka.tcp://application@127.0.0.1:2551]
 

Clustering - Part 2

1. Update the akka configuration to add a router that uses the tweet loader by updating /conf/application.conf to add:

 actor.deployment {
    # Routing to the tweetLoader using a cluster router
    # By making the router a group it will look for an existing instance of the tweetLoader actor
    # on any nodes with the role "backend-loader" and use it as a routee
    /tweetLoaderClient/router {
      router = round-robin-group
      nr-of-instances = 1000
      cluster {
        enabled = on
        routees-path = "/user/tweetLoader"
        allow-local-routees = off
        use-role = "backend-loader"
      }
    }
  }
 

2. Next we want to create an actor that will run on the main Play server and act as a client into the cluster. We use this by creating a separate Actor that uses that sends messages to the cluster router.

Create a new TweetLoaderClient.scala file in /app/actors with the following actor that routes messages to the tweet loader router:

   package actors

import akka.actor.{ActorLogging, Actor, Props}
import akka.routing.FromConfig

object TweetLoaderClient {
    def props(): Props = Props(new TweetLoaderClient())
}

/**
 * A client for the tweet loader, handles routing of the fetch tweet messages to the actual tweet loader
 */
class TweetLoaderClient extends Actor with ActorLogging {

    val tweetLoaderRouter = context.actorOf(Props.empty.withRouter(FromConfig), "router")

    def receive = {
        case TweetLoader.LoadTweet(search) => {
            log.info(s"sending search for $search to router")
            tweetLoaderRouter ! TweetLoader.LoadTweet(search)
        }
    }
}

3. Update the Actors Plugin to reference the tweet loader client instead of the actual tweet loader:

In the object Actors change the following line to reference tweetLoaderClient:

    def tweetLoaderClient(implicit app: Application) = actors.tweetLoaderClient
 

In the class Actors change the following line to reference tweetLoaderClient:

         private lazy val tweetLoaderClient = system.actorOf(TweetLoaderClient.props, "tweetLoaderClient")
 

4. Update the creation of the UserActor in app/controllers/Tweets.scala to reference the tweet loader client (instead of just the tweet loader): .

        def ws = WebSocket.acceptWithActor[JsValue, JsValue] { request => out =>
                UserActor.props(out, actors.Actors.tweetLoaderClient)
        }
    

5. Update the MainTweetLoader to create an instance of the TweetLoader actor. First add an initialization method and then call it from the main startup block:


     import actors.TweetLoader
     import akka.cluster.Cluster

    def initialize(system: ActorSystem): Unit = {
        //verify that this cluster node is running with the role of "backend-loader"
        if (Cluster(system).selfRoles.exists(r => r.startsWith("backend-loader"))) {
            system.actorOf(TweetLoader.props(), "tweetLoader")
        }
    }

    //in the main method add a call to initialize before the main command loop:
    initialize(system)
 

6. Test out the code - one more bugs remains! This is a much more subtle bug. Print out the sender in the tweetLoader and see if you can find it!

Fixing Clustering - Part 1

The problem is the tweetLoader is sending the results of fetching the tweet back to the TweetLoaderClient and not the UserActor. The solution is straight forward - have the tweetLoaderClient forward the message instead of directly sending it.

1. Change TweetLoaderClient receive block to use a forward instead of tell to send messages to the tweetLoaderRouter

        case TweetLoader.LoadTweet(search) => {
            log.info(s"forwarding search for $search to router")
            tweetLoaderRouter forward TweetLoader.LoadTweet(search)
        }

2. Test out the code - everything should work!

comments powered by Disqus