Akka: The Future is here!
by Viktor Klang
Futures are a neat way to use multiple cores without having to worry about managing specific threads or tasks. If you have some work that you want to get done while you get on with something else, then you simply create one or more Future(s) and give them the work to do in the form of a function that returns a result. Later you can test the Future for completion and pick up the result. An elegant way to get work done that you need in the Future!
Akka, part of the Typesafe Stack, has had a Future implementation for a long time. It’s so useful we decided to put a major effort into enhancing the capability, making it faster, more scalable and more intuitive to use. Key parts of the underpinning code used locks and blocking to ensure the integrity of the multi-threaded execution. These do not scale well in multi processor environments, as processors must wait while contentions are resolved. So we spent a lot of time to create a not-so-obvious non-blocking alternative. With the new implementation, things scale better and execution time is faster.
Comprehensions are the way we Scala programmers tend to think about doing things on collections. It is one of the function styles that leads to concise and easy to read code. We re-thought the Futures library to facilitate using Comprehensions and the more typical Scala “foreach” pattern, You can now handle sequences in an intuitive non-blocking way. We have found it a whole lot more powerful way to express our intentions.
Now let’s take a detailed look how the changes look at the code level:
- On
akka.dispatch.Futuresthere are new, Java API methods named "future" to useCallablesthat are executed on another thread to produce the result in aFuture. This means that they are non-blocking and you have the option to specify whichDispatcherwill execute your callable. -
Futures.awaitAllhas been deprecated in favor of:futures.foreach(_.await) -
Futures.awaitOnehas been deprecated in favor of:firstCompletedOf(futures).await. BecausefirstCompletedOfis non-blocking, andawaitis blocking, it's better if this is not encouraged by the API -
Futures.awaitMaphas been deprecated in favor of:futures map { f => fun(f.await) }. This is becauseFuturenow supports a non-blocking map, which wasn't the case before, so you'd have to await before mapping - There is now a Java API version of
Futures.reduce. - New! The
Futures.sequencemethod takes aTraversable[Future[T]]and non-blockingly returns aFuture[Traversable[T]]. - New! The
Futures.traversemethod transforms aTraversable[A]to aFuture[Traversable[B]]using a provided function fromAtoFuture[B]. This is a great way of performing "map" in parallel. - Future has now transformed into being isomorphic to dataflow variables. In recognition of this, we've added the key methods of
DataFlowVariabletoFuture. This includes:The above two methods are intended for usage within theapply: val f: Future[Int] = ... val i = f() // Logically f.await.resultOrException.get <<: val f: CompletableFuture[Int] = ... // Write side of Future, compare with the concept of Promise, or DataFlowVariable f << 5 // Sets the value 5 into f, since CompletableFutures can only be written once, they act like dataFlowVariables f << otherF // You can also set the value to the value of another Future, this will be done when that Future is completed // same behavior as CompletableFuture.completeWith(f: Future[...])Future.flowmethod, like this:This uses Continuation Passing Style with Delimited Continuations under the hood, to be able to write code that looks like it's blocking but it reality it isn't.import Future.flow def add(a: Future[Int], b: Future[Int]): Future[Int] = flow { a() + b() } - New!
Futurenow sports a couple of new methods:get: //Warning! Blocking val f: Future[Int] = ... f.get // Semantically f.await.resultOrException.get but for use in a non "flow" context value: val f: Future[Int] = ... val v: Option[Either[Throwable, Int]] = f.value // The current value of the Future, None if no value, and Left(error) or Right(result) otherwise onResult: val f: Future[Any] = ... f onResult { case "foo" => doSomething case 6 => doSomethingElse case SomeRegex(param) => doSomethingOther case _ => doAnything } // Applies the specified partial function to the result of the future when it is completed with a result recover: val f: Future[Any] = ... val result = f recover { case n: NumberFormatException => 0 } // Returns a new future that when the first future has been completed with an exception, will contain the transformed result" onException: val f: Future[Any] = ... f onException { case npe: NullPointerExcep => doSomething case 6 => doSomethingElse case SomeRegex(param) => doSomethingOther case _ => doAnything } // Applies the specified partial function to the result of the future when it is completed with an exception onTimeout: val f: Future[Any] = ... f onTimeout { future => doSomethingWhenTimeout(future) } - New!
Futureis now fully monadic so it can be used in for-comprehensions; all methods: map, flatMap, filter and foreach are non-blocking
These are exciting times. With Typesafe and Akka, the Future is here!
Browse Recent Blog Posts