Tag Archives: actors

[repost ]Akka actors versus Scala actors : control the message throughput of actors

original:http://www.crosson.org/2012/02/akka-actors-versus-scala-actors-control.html

In the post following this one, I show how to enhance the example code in order to be able to process a very high number of messages :Akka actors : 10 millions messages processed (1s / message) in 75 seconds !

Scala default actor system doesn’t come with any mechanism to control messages throughput between actors, so it is up to you to implement such feature in order to avoid OutOfMemory problem, in particular when actors are not processing messages at the same rate.
Akka framework is very interesting because it brings you all the features to control and manage throughputs. Let’s see how it work through a simple example :

import akka.actor._

sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage

object Dummy {
  def main(args:Array[String]) {
    val system=ActorSystem("DummySystem")
    val processor = system.actorOf(Props[MyMessageProcessor],name="default")
    for(i <- 1 to 10000000) {
      processor ! DoItMessage("Do the job with ID#%d now".format(i))
    }
    println("All jobs sent")
  }
}

class MyMessageProcessor extends Actor {
  def receive = {
    case _:DoItMessage => Thread.sleep(1000)
  }
}

Each message sent is more than 50 bytes, so a JVM with a 512Mo of maximum heap size will quickly run out of memory.

The good news is that akka support bounded actor mailboxes, just add a configuration file with such content :

dummy {
  akka {
    loglevel = WARNING
    actor {
      default-dispatcher {
         mailbox-capacity = 10000
      }
    }
  }
}

And make this configuration file taken into account with the following code update :

    import com.typesafe.config.ConfigFactory
    val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))

This time if you run again the previous code, you’ll get a stable memory footprint :

Of course it will take a long time to process as each message requires 1s (and actors are blocked during 1s because of Thread.sleep call, which monopolize a thread during the same duration => waste of resources)… If we want to reduce the processing time although each message requires 1s to be taken into account, we’ll have to load-balance message processing accross a high number of actors.

CONTEXT : Scala 2.9.1 / SBT 0.11.2 / AKKA 2.0-rc2 / sbteclipse 2.0.0

[repost ]Akka actors : 10 millions messages processed (1s / message) in 75 seconds !

original:http://www.crosson.org/2012/03/akka-actors-10-millions-messages.html

In the previous POST, Akka actors versus Scala actors : control the message throughput of actors, we’ve seen how to control message throughput and avoid OutOfMemory problem in particular when actors doesn’t have the same messages processing rate.

Now we have to find a way to find a better way to simulate our 1 second delay, because using Thread.sleep breaks actors management as it will monopolize all threads of the used executor. In fact we would like to simulate a not-blocking 1s processing time.

So we should remove the following Thread.sleep call :

class MyMessageProcessor extends Actor {
  def receive = {
    case _:DoItMessage => Thread.sleep(1000)
  }
}

The solution is straightforward as Akka comes with a scheduler system which allow you to send a message after a given delay.

class MySimulator(system:ActorSystem) extends Actor {
  def receive = {
    case _:DoItMessage =>
      system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") 
  }
}

Using scheduleOnce, we didn’t monopolize system resources, no thread is used, in fact we achieve a fake processing which simulates an asynchronous response which come 1s later. In the following code, the response is even received asynchronously by a future :

package dummy

import akka.actor._
import akka.util.duration._
import akka.util.Timeout
import akka.pattern.ask
import akka.dispatch.Future

sealed trait MyMessage
case class DoItMessage(cmd:String) extends MyMessage
case class DoneMessage extends MyMessage

object Dummy {
  def main(args:Array[String]) {

    val howmanyjob=10*1000000

 import com.typesafe.config.ConfigFactory
    implicit val system=ActorSystem("DummySystem",ConfigFactory.load.getConfig("dummy"))

    val simu = system.actorOf(
        Props(new MySimulator(system))
        .withDispatcher("simu-dispatcher"),
        name="simulator")

    val appManager = system.actorOf(
        Props(new ApplicationManager(system, howmanyjob))
        .withDispatcher("simu-dispatcher"),
        name="application-manager")

    import akka.routing.RoundRobinRouter
    val processor = system.actorOf(
        Props(new MyMessageProcessor(appManager, simu))
        .withDispatcher("workers-dispatcher")
        .withRouter(RoundRobinRouter(10)),
        name="default")

    for(i <- 1 to howmanyjob) {
      processor ! DoItMessage("Do the job with ID#%d now".format(i))
    }
    print("All jobs sent")
  }
}

class MyMessageProcessor(appManager:ActorRef, simu:ActorRef) extends Actor {
  def receive = {
    case msg:DoItMessage =>
      implicit val timeout = Timeout(5 minutes)
      val receivedTime = System.currentTimeMillis()
      val future = simu ? msg
      future.onComplete { 
        case result:Either[Throwable, String] =>
          assert(System.currentTimeMillis()-receivedTime >= 1000)
          appManager ! DoneMessage
      }
  }
}

class MySimulator(system:ActorSystem) extends Actor {
  def receive = {
    case _:DoItMessage =>
      // Fake processing, somewhere, the job is executed and we get 
      // the results 1s later asynchronously
      system.scheduler.scheduleOnce(1000 milliseconds, sender, "Done") 
  }
}

class ApplicationManager(system:ActorSystem, howmanyjob:Int) extends Actor {
  val startedTime = System.currentTimeMillis()
  var count=0
  def receive = {
    case DoneMessage => 
      count+=1
      if (count%(howmanyjob/20)==0) println("%d/%d processed".format(count, howmanyjob))
      if (count == howmanyjob) {
        val now=System.currentTimeMillis()
        println("Everything processed in %d seconds".format((now-startedTime)/1000))
        system.shutdown() 
      }
  }
}

The application configuration is the following (application.conf file) :

dummy {
  akka {
    loglevel = WARNING
    actor {
      default-dispatcher {
      }
    }
    scheduler {
      tick-duration = 50ms
      ticks-per-wheel = 1000
    }
  }

  simu-dispatcher {
    type = Dispatcher
    mailbox-capacity = 100000
  }

  workers-dispatcher {
    mailbox-capacity = 10000
    executor = "fork-join-executor"
    fork-join-executor {
      parallelism-min = 0
      parallelism-max = 6000
      parallelism-factor = 3.0      
    }
  }
}

The build.sbt SBT file (To get dependencies, build, test & run) is the following :

name := "AkkaSandbox"

version := "0.1"

scalaVersion := "2.9.1"

libraryDependencies += "com.typesafe.akka" % "akka-actor" % "2.0-RC3"

resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"

Everything is processed in 75 seconds using :

  • linux 3.2.1-gentoo-r2 64bits
  • AMD Phenom(tm) II X6 1090T (6 CPU cores)
  • Java HotSpot(TM) 64-Bit Server 1.6.0.31

CPU usage ~80%, java memory footprint is below 300Mo; each message is requiring 50-60 bytes, so the 10 million messages are requiring at least 480Mb (but not simultaneously).