Typesafe Activator

Processing RabbitMQ messages using Akka Streams

Processing RabbitMQ messages using Akka Streams

jczuchnowski
Source
December 14, 2014
akka reactive-streams akka-streams rabbitmq scaladays2014 scala

This project aims to show that reactive Akka Streams can provide a very pleasant way of working with RabbitMQ.

How to get "Processing RabbitMQ messages using Akka Streams" on your computer

There are several ways to get this template.

Option 1: Choose rabbitmq-akka-stream in the Typesafe Activator UI.

Already have Typesafe Activator (get it here)? Launch the UI then search for rabbitmq-akka-stream in the list of templates.

Option 2: Download the rabbitmq-akka-stream project as a zip archive

If you haven't installed Activator, you can get the code by downloading the template bundle for rabbitmq-akka-stream.

  1. Download the Template Bundle for "Processing RabbitMQ messages using Akka Streams"
  2. Extract the downloaded zip file to your system
  3. The bundle includes a small bootstrap script that can start Activator. To start Typesafe Activator's UI:

    In your File Explorer, navigate into the directory that the template was extracted to, right-click on the file named "activator.bat", then select "Open", and if prompted with a warning, click to continue:

    Or from a command line:

     C:\Users\typesafe\rabbitmq-akka-stream> activator ui 
    This will start Typesafe Activator and open this template in your browser.

Option 3: Create a rabbitmq-akka-stream project from the command line

If you have Typesafe Activator, use its command line mode to create a new project from this template. Type activator new PROJECTNAME rabbitmq-akka-stream on the command line.

Option 4: View the template source

The creator of this template maintains it at https://github.com/jczuchnowski/rabbitmq-akka-stream#master.

Option 5: Preview the tutorial below

We've included the text of this template's tutorial below, but it may work better if you view it inside Activator on your computer. Activator tutorials are often designed to be interactive.

Preview the tutorial

Overview

This project aims to show how reactive Akka Streams combined with Ractive Rabbit could provide an appealing way of working with RabbitMQ.

We are assuming some basic knowledge of RabbitMQ. Otherwise you wouldn't probably be here. But if this is one of these rare cases then please look closer at this cool piece of technology here http://www.rabbitmq.com/ .

You should have a RabbitMQ server installed locally with a management plug-in.

This application simulates a simplified censorship (yes, censorship :)) procedure, that intercepts private messages, makes a content analysis, and qualifies the message as OK or NOT OK. So the scenario is as follows:

Queue --> Akka Stream --> Exchange
  1. Consume a message from a RabbitMQ queue
  2. Process it through Akka Stream
  3. Publish it back to a RabbitMQ exchange with one of the two routing keys (based on some simple decision making process).

Explore

After starting the application you will see a short trial run that will result in five messages routed to censorship.ok.queue.

Then go to the RabbitMQ management console at http://localhost:15672/ , find an exchange named censorship.inbound.exchange and start publishing messages.

You should observe two effects:

  • Text of your message will be logged to a console,
  • Modified version of your message will land in either censorship.ok.queue or censorship.nok.queue (based on the text content of your message).

You will notice that messages containing the word 'terror' will go to the nok queue. This is the initial censorship filter. Feel free to modify the "forbidden words" list.

Connecting, consuming, processing, publishing

Below you will find main steps for this whole process.

Connecting to RabbitMQ

First thing we do is declaring a connection to RabbitMQ. With Reactive Rabbit library it is as simple as:

import io.scalac.amqp.Connection 
val connection = Connection()
Reactive Rabbit provides all the connection defaults, but you can customize them in your application.conf.

Next thing that has to be done is to set up all the exchanges and queues. Reactive Rabbit provides all the required methods. Unlike the RabbitMQ Java driver, all methods in Reactive Rabbit are asynchronous and return Future.

The whole setup is done in ConsumerApp.setupRabbit(). Snippet below should give an idea of the process:


/* declare and bind inbound exchange and queue */
Future.sequence {
  connection.exchangeDeclare(inboundExchange) :: 
  connection.queueDeclare(inboundQueue) :: Nil
} flatMap { _ =>
  connection.queueBind(inboundQueue.name, inboundExchange.name, "")
}

Declaring the consumer and the publisher

We will be consuming from a queue and publishing to an exchange. For this to happen we declare a consumer and publisher (suprisingly) by using appropriate Connection methods. They will give us in return Publisher and Subscriber respectively. This might seem counter-intuitive at first but the RabbitMQ consumer is our stream's Publisher and the RabbitMQ publisher is our stream's Subscriber.

It is worth noting that the Publisher and Subscriber returned from Connection are both interfaces defined by Rective Streams specification. That means Reactive Rabbit can be integrated with any implementation of the specification. Here we're using it with Akka Streams by wrapping Publisher and Subscriber into Source and Sink.


val rabbitConsumer = Source(connection.consume("queue_name"))
val rabbitPublisher = Sink(connection.publish("exchange_name"))

Processing messages

To split the work into logical parts we are using Flow[In, Out] on which we're applying multiple transformations (like map, mapFuture, groupBy). This allows us to define some message processing independently from the publisher and subscriber and connect them later. All the Flows are defined in the FlowFactory trait.

Putting it all together

After everything is set up we can combine all the parts into one processing pipeline and run the flow. Akka Streams provides us with an awesome, readable and clean DSL for this purpose:


val flow = rabbitConsumer via consumerMapping via domainProcessing via publisherMapping to rabbitPublisher
    
flow.run()

This is it

You just saw a clean functional way of working with RabbitMQ. There are of course many issues that haven't been addressed here, like the RabbitMQ Quality of Service property or rejecting messages at later stages of processing (to name a few). But this should get you going and hopefully this is the way we'll be doing things in the future.

Further reading:

comments powered by Disqus