EFE KAHRAMAN Personal tech blog.

A Simple Akka Cluster Application

Below is a simple Akka Cluster application consisting of Producer and Consumer roles where each role runs on a separate node and it's own JVM.

TL;DR

Akka Cluster is not new. So is load balancing. You can find some cool articles here and here. Also, there’s a really nice sample at Akka’s own GitHub repo. In this post, I made a similar work and created a simple cluster application as it’s going to be the foundation of upcoming posts. You can get the project code from my GitHub repo as well.

Design

The goal of the design is to have a cluster of P x C nodes where P and C denote the number of Producer and Consumer nodes respectively. As the number of nodes is not known in the beginning and can vary over time, it’s useful to have a Seed node to form the cluster. Seed node is an actor system without any actor, and it’s used as an entry point to the cluster for newly created nodes. To make Seed node reachable, its address is fixed and predefined in all other nodes.

When a Consumer node is up, each Producer is notified and start sending messages to the new node. To resolve the Consumer actor properly, I decided to go further with an assumption that every Consumer node will have a single Consumer actor. However, Consumer actor itself can behave as a “notifier” in its own actor system and pass messages to other local actors as well.

At the first stage, Producer role will use Round Robin routing. However, routing mechanism will be kept as abstract.

For example, 2 Producer x 3 Consumer cluster will look like as follows:

It’s worth mentioning that Akka has built-in Cluster Aware Router which provides a Router mechanism aware of member nodes in the cluster. When used with Group of Routees, it covers the goal of this design either. That being said, I’ve implemented the whole mechanism in this project.

Code

Versions

Scala 2.12.1
Akka 2.4.17
sbt 0.13.13

Structure

All of the roles are placed in a single project. SBT assembly plugin is used to generate standalone JAR file.

Configuration

The only configuration file is application.conf. To make the application container friendly from day zero, I defined several placeholders under the args. (Containerization of cluster nodes will be demonstrated in upcoming posts).

args {
  host = "127.0.0.1"
  host = ${?host}
  port = 0
  port = ${?port}
  app-name = "AkkaClusterDemo"
  app-name = ${?app-name}
  seed-host = "127.0.0.1"
  seed-host = ${?seed.host}
  seed-port = 2551
  seed-port = ${?seed.port}
}

In the next akka section of the same file, provider is set to cluster which enables Cluster extension to be used. Also, seed-nodes is configured to point single node address, which is composed of variables shown above.

akka {
  actor {
    provider = cluster
  }

  cluster {
    seed-nodes = ["akka.tcp://"${args.app-name}"@"${args.seed-host}":"${args.seed-port}]
    # roles = ["role"]
  }

  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = ${args.host}
      port = ${args.port}
    }
  }
}

Note that roles definition is disabled, as this property is overwritten in main method.

ConsumerActor

ConsumerActor is kept minimal as it prints incoming message’s content only.

override def receive(): Receive = {
  case msg => log.info(s"""${sender.path.address} : $msg""")
}

ProducerActor

This is the part doing the heavy work. As discussed in the Design section, Producer role should be capable of receiving events for Consumer nodes. To achieve this, Actor subscribes itself to MemberUp notification, which occurs when a new node is accepted to the cluster.

val cluster = Cluster(context.system)
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])

The preStart function is called just after the actor instance is created (see Akka docs), so it’s definitely the only place to subscribe to Cluster. Once Producer receives the MemberUp notification for particular Consumer, it needs to subscribe itself to the lifecycle events of that node to catch the termination. Besides, routings should be updated as new destination becomes available. To achieve this, Consumer node is resolved in receive function, which looks like as follows:

override def receive(): Receive = {
  case MemberUp(member) if (member.roles.contains(Consumer)) =>
    log.info(s"""Received member up event for ${member.address}""")
    val consumerRootPath = RootActorPath(member.address)
    val consumerSelection = context.actorSelection(consumerRootPath / "user" / Consumer)

    import scala.concurrent.duration.DurationInt
    import context.dispatcher
    consumerSelection.resolveOne(5.seconds).onComplete(registerConsumer)
  case Terminated(actor) => strategy.removeRoutee(actor)
  case SimpleMessage =>
    strategy.sendMessage(s"#$counter")
    counter += 1
  case _ =>
}

First, let’s focus on the first case statement, which is matched when MemberUp message is received for any node having the Consumer role. As Address is accessible from Member object, ActorPath can be easily built for targeted Consumer actor which is used for resolving the target ActorRef with the help of ActorSelection. This point actually brings the restriction of having single Consumer actor at a remote node, since it’d be unclear for Producer if there were multiple Actors sharing the same path. Now let’s look at the registerConsumer function, which is run when resolving completes:

def registerConsumer(refTry: Try[ActorRef]): Unit = refTry match {
  case Success(ref) =>
    context watch ref
    strategy.addRoutee(ref)
  case Failure(_) => log.error("Couldn't find consumer on path!")
}

In Success case, Actor registers itself for the lifecycle events of the targeted Consumer actor which is referenced by ref. So that Terminated case could have been implemented in receive function. Next, ref is used to updating routings. This is done via strategy, which has the type of RouterStrategy defined as follows:

trait RouterStrategy {
    def addRoutee(ref: ActorRef): Unit;
    def removeRoutee(ref: ActorRef): Unit;
    def sendMessage[M >: Message](msg: M): Unit;
}

The current implementor of this trait is RoundRobinStrategy, which utilizes RoundRobinGroup underneath. Here I added RouterStrategy as an abstraction to easily extend the project with different routing capabilities in future.

Main

The main method determines the role from the first argument and sets the configuration as follows:

val role = args.headOption.getOrElse(Seed)
val config = ConfigFactory.parseString(s"""akka.cluster.roles = ["$role"]""")
  .withFallback(ConfigFactory.load())

Once the role is determined, an ActorSystem along with a corresponding Actor starts. If the role is defined as producer, a message generator also starts for demonstration purpose. After an initial delay of 10 seconds, a new message is generated in every 2 seconds.

def startMessageGenerator(producer: ActorRef): Unit = {
  import scala.concurrent.duration.DurationInt
  import system.dispatcher
  system.scheduler.schedule(10 seconds, 2 seconds, producer, SimpleMessage)
}

Results

In this section, I truncated big portions of output logs for the sake of simplicity.

Let’s start a seed node at first. Here port is set to 2551 as it’s the default port for Seed. Note that this is done via exporting the environment variable.

$ export port=2551 $ java -jar AkkaClusterDemo-assembly-1.0.jar

Next, let’s start a Consumer node.

$ java -jar AkkaClusterDemo-assembly-1.0.jar consumer [main] [akka.remote.Remoting] Starting remoting [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://AkkaClusterDemo@127.0.0.1:55643] ...

As soon as Consumer starts and joins the cluster, Seed node transfers the leadership to that node:

[akka.cluster.Cluster(akka://AkkaClusterDemo)] Cluster Node [akka.tcp://AkkaClusterDemo@127.0.0.1:2551] - Leader is moving node [akka.tcp://AkkaClusterDemo@127.0.0.1:55643] to [Up]

However, we still need to keep Seed node to welcome next nodes. Also, note that Consumer node took port 55643 as there was no particular port defined. Now let’s start a Producer node and see what happens.

$ java -jar AkkaClusterDemo-assembly-1.0.jar producer [main] [akka.remote.Remoting] Starting remoting [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://AkkaClusterDemo@127.0.0.1:55645] ...

And once the Producer node joined to the cluster, it receives MemberUp event for the Customer node:

[akka.tcp://AkkaClusterDemo@127.0.0.1:55645/user/producer] Received member up event for akka.tcp://AkkaClusterDemo@127.0.0.1:55643

So that shows whenever a node joins the cluster, it receives the member events for running nodes as well. Now if we check the output of Consumer node, we can see the messages started arriving from Producer, which is started on port 55645.

[akka.tcp://AkkaClusterDemo@127.0.0.1:55643/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #0 [akka.tcp://AkkaClusterDemo@127.0.0.1:55643/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #1 [akka.tcp://AkkaClusterDemo@127.0.0.1:55643/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #2 [akka.tcp://AkkaClusterDemo@127.0.0.1:55643/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #3 ...

That’s cool. Right now, we have a 1 Producer x 1 Consumer cluster. What happens if we add two more Consumer nodes to the cluster ? Let’s start and see.

$ java -jar AkkaClusterDemo-assembly-1.0.jar consumer [main] [akka.remote.Remoting] Starting remoting [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://AkkaClusterDemo@127.0.0.1:55648] ... [akka.tcp://AkkaClusterDemo@127.0.0.1:55648/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #12 [akka.tcp://AkkaClusterDemo@127.0.0.1:55648/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #14 [akka.tcp://AkkaClusterDemo@127.0.0.1:55648/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #16 [akka.tcp://AkkaClusterDemo@127.0.0.1:55648/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #18 ...

This Customer node started on port 55648 and started to receive messages immediately after joining the cluster. Note that message numbers are started to increase by 2. Why ? This is because Producer is started to distribute messages among 2 Consumer nodes in Round Robin way.

Next, let’s start another Consumer node. This time expected behaviour is that every Consumer node will receive messages with numbers increasing by 3.

$ java -jar AkkaClusterDemo-assembly-1.0.jar consumer [main] [akka.remote.Remoting] Starting remoting [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://AkkaClusterDemo@127.0.0.1:55652] ... [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #26 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #29 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #32 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #35 ...

And new logs are written at Producer node showing the events for new Consumer nodes as expected:

... [akka.tcp://AkkaClusterDemo@127.0.0.1:55645/user/producer] Received member up event for akka.tcp://AkkaClusterDemo@127.0.0.1:55648 [akka.tcp://AkkaClusterDemo@127.0.0.1:55645/user/producer] Received member up event for akka.tcp://AkkaClusterDemo@127.0.0.1:55652 ...

So far good. Right now there is one Producer node dispatching messages to 3 Consumer nodes. Let’s assume we need one more Producer node in this cluster. So let’s start a new Producer and see what happens.

$ java -jar AkkaClusterDemo-assembly-1.0.jar producer [main] [akka.remote.Remoting] Starting remoting [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://AkkaClusterDemo@127.0.0.1:55657] ... [akka.tcp://AkkaClusterDemo@127.0.0.1:55657/user/producer] Received member up event for akka.tcp://AkkaClusterDemo@127.0.0.1:55652 [akka.tcp://AkkaClusterDemo@127.0.0.1:55657/user/producer] Received member up event for akka.tcp://AkkaClusterDemo@127.0.0.1:55643 [akka.tcp://AkkaClusterDemo@127.0.0.1:55657/user/producer] Received member up event for akka.tcp://AkkaClusterDemo@127.0.0.1:55648 ...

New Producer node started on port 55657 and received MemberUp events immediately after joining the cluster. Now there are 3 Consumer nodes, which are receiving Round Robin distributed messages from 2 different Producer nodes. To be concrete, the output of latest Consumer node looks like follows:

... [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #41 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55657 : #0 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #43 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55657 : #3 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #46 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55657 : #6 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55645 : #49 [akka.tcp://AkkaClusterDemo@127.0.0.1:55652/user/consumer] akka.tcp://AkkaClusterDemo@127.0.0.1:55657 : #9 ...

Conclusion

The presented application provides core functionality for forming an Akka cluster which can easily scale over time.

In this application, ProducerActor is the only place which is responsible for Consumer node management, in contrast to the Akka’s own sample application, where Frontend receives state messages and informs Backend with a special message type.

Depending on the design, one can say that Seed node can be a single point of failure. However, since Seed node is only needed when accepting a new node into the cluster, it can be even stopped or restarted rest of the time, without any impact on messaging between Producer and Consumer nodes.

Next post will be built on this application and demonstrate how to collect metrics on Akka clusters.

Cheers!

All the content of this blog post is licensed under Creative Commons Attribution 4.0 International License.
submit to reddit
comments powered by Disqus