Tag Archives: Akka

[repost ]Building A Social Music Service Using AWS, Scala, Akka, Play, MongoDB, And Elasticsearch

original:http://highscalability.com/blog/2014/3/11/building-a-social-music-service-using-aws-scala-akka-play-mo.html

This is a guest repost by Rotem Hermon, former Chief Architect for serendip.me, on the architecture and scaling considerations behind making a startup music service.

serendip.me is a social music service that helps people discover great music shared by their friends, and also introduces them to their “music soulmates” – people outside their immediate social circle that shares a similar taste in music.

Serendip is running on AWS and is built on the following stack: scala (and some Java), akka(for handling concurrency), Play framework (for the web and API front-ends), MongoDB andElasticsearch.

Choosing The Stack

One of the challenges of building serendip was the need to handle a large amount of data from day one, since a main feature of serendip is that it collects every piece of music being shared on Twitter from public music services. So when we approached the question of choosing the language and technologies to use, an important consideration was the ability to scale.

The JVM seemed the right basis for our system as for its proven performance and tooling. It’s also the language of choice for a lot of open source system (like Elasticsearch) which enablesusing their native clients – a big plus.

When we looked at the JVM ecosystem, scala stood out as an interesting language option that allowed a modern approach to writing code, while keeping full interoperability with Java. Another argument in favour of scala was the akka actor framework which seemed to be a good fit for a stream processing infrastructure (and indeed it was!). The Play web framework was just starting to get some adoption and looked promising. Back when we started, at the very beginning of 2011, these were still kind of bleeding edge technologies. So of course we were very pleased that by the end of 2011 scala and akka consolidated to become Typesafe, with Play joining in shortly after.

MongoDB was chosen for its combination of developer friendliness, ease of use, feature set and possible scalability (using auto-sharding). We learned very soon that the way we wanted to use and query our data will require creating a lot of big indexes on MongoDB, which will cause us to be hitting performance and memory issues pretty fast. So we kept using MongoDB mainly as a key-value document store, also relying on its atomic increments for several features that required counters.
With this type of usage MongoDB turned out to be pretty solid. It is also rather easy to operate, but mainly because we managed to avoid using sharding and went with a single replica-set (the sharding architecture of MongoDB is pretty complex).

For querying our data we needed a system with full blown search capabilities. Out of the possible open source search solutions, Elasticsearch came as the most scalable and cloud oriented system. Its dynamic indexing schema and the many search and faceting possibilities it provides allowed us to build many features on top of it, making it a central component in our architecture.

We chose to manage both MongoDB and Elasticsearch ourselves and not use a hosted solution for two main reasons. First, we wanted full control over both systems. We did not want to depend on another element for software upgrades/downgrades. And second, the amount of data we process meant that a hosted solution was more expensive than managing it directly on EC2 ourselves.

Some Numbers

Serendip’s “pump” (the part that processes the Twitter public stream and Facebook user feeds) digests around 5,000,000 items per day. These items are passed through a series of “filters” that detect and resolve music links from supported services (YouTube, Soundcloud, Bandcamp etc.), and adds metadata on top of them. The pump and filters are running as akka actors, and the whole process is managed by a single m1.large EC2 instance. If needed it can be scaled easily by using akka’s remote actors to distribute the system to a cluster of processors.

Out of these items we get around 850,000 valid items per day (that is items that really contains relevant music links). These items are indexed in Elasticsearch (as well as in MongoDB for backup and for keeping counters). Since every valid item means updating several objects, we get an index rate of ~40/sec in Elasticsearch.
We keep a monthly index of items (tweets and posts) in Elasticsearch. Each monthly index contains ~25M items and has 3 shards. The cluster is running with 4 nodes, each on a m2.2xlarge instance. This setup has enough memory to run the searches we need on the data.

Our MongoDB cluster gets ~100 writes/sec and ~300 reads/sec as it handles some more data types, counters and statistics updates. The replica set has a primary node running on am2.2xlarge instance, and a secondary on a m1.xlarge instance.

Building A Feed

When we started designing the architecture for serendip’s main music feed, we knew we wanted the feed to be dynamic and reactive to user actions and input. If a user gives a “rock-on” to a song or “airs” a specific artist, we want that action to reflect immediately in the feed. If a user “dislikes” an artist, we should not play that music again.

We also wanted the feed to be a combination of music from several sources, like the music shared by friends, music by favorite artists and music shared by “suggested” users that have the same musical taste.
These requirements meant that a “fan-out-on-write” approach to feed creation will not be the way to go. We needed an option to build the feed in real-time, using all the signals we have concerning the user. The set of features Elasticsearch provides allowed us to build this kind of real-time feed generation.

The feed algorithm consists of several “strategies” for selecting items which are combined dynamically with different ratios on every feed fetch. Each strategy can take into account the most recent user actions and signals. The combination of strategies is translated to several searches on the live data that is constantly indexed by Elasticsearch. Since the data is time-based and the indexes are created per month, we always need to query only a small subset of the complete data.

Fortunately enough, Elasticsearch handles these searches pretty well. It also provides a known path to scaling this architecture – writes can be scaled by increasing the number of shards. Searches can be scaled by adding more replicas and physical nodes.

The process of finding “music soulmates” (matching users by musical taste) is making good use of the faceting (aggregation) capabilities of Elasticsearch. As part of the constant social stream processing, the system is preparing data by calculating the top shared artists for social network users it encounters (using a faceted search on their shared music).

When a serendip user gives out a signal (either by airing music or interacting with the feed), it can trigger a re-calculating of the music soulmates for that user. The algorithm finds other users that are top matched according to the list of favorite artists (which is constantly updated), weighing in additional parameters like popularity, number of shares etc. It then applies another set of algorithms to filter out spammers (yes, there are music spammers…) and outliers.

We found out that this process gives us good enough results while saving us from needing additional systems that can run more complex clustering or recommendation algorithms.

Monitoring And Deployment

Serendip is using ServerDensity for monitoring and alerting. It’s an easy to use hosted solution with a decent feature set and reasonable pricing for start-ups. ServerDensity natively provides server and MongoDB monitoring. We’re also making heavy use of the ability to report custom metrics into it for reporting internal system statistics.

An internal statistic collection mechanism collects events for every action that happens in the system, and keeps them in a MongoDB collection. A timed job reads those statistics from MongoDB once a minute and reports them to ServerDensity. This allows us to use ServerDensity for monitoring and alerting Elasticsearch as well as our operational data.

Managing servers and deployments is done using Amazon Elastic Beanstalk. Elastic Beanstalk is AWS’s limited PaaS solution. It’s very easy to get started with, and while it’s not really a full featured PaaS, its basic functionality is enough for most common use cases. It provides easy auto-scaling configuration and also gives complete access via EC2.

Building the application is done with a Jenkins instance that resides on EC2. The Play web application is packaged as a WAR. A post-build script pushes the WAR to Elastic Beanstalk as a new application version. The new version is not deployed automatically to the servers – it’s done manually. It is usually deployed first to the staging environment for testing, and once approved is deployed to the production environment.

Takeaways

For conclusion, here are some of the top lessons learned from building serendip, not by any special order.

  1. Know how to scale. You probably don’t need to scale from the first day, but you need to know how every part of your system can scale and to what extent. Give yourself enough time in advance if scaling takes time.
  2. Prepare for peaks. Especially in the life of a start-up, a single lifehacker or reddit post can bring your system down if you’re always running at near top capacity. Keep enough margin so you can handle a sudden load or be ready to scale really fast.
  3. Choose a language that won’t hold you back. Make sure the technologies you want to use have native clients in your language, or at least actively maintained ones. Don’t get stuck waiting for library updates.
  4. Believe the hype. You want a technology that will grow with your product and will not die prematurely. A vibrant and active community and some noise about the technology can be a good indication for its survival.
  5. Don’t believe the hype. Look for flame posts about the technology you’re evaluating. They can teach you about its weak points. But also don’t take them too seriously, people tend to get emotional when things don’t work as expected.
  6. Have fun. Choose a technology that excites you. One that makes you think “oh this is so cool what can I do with it”. After all, that’s (also) what we’re here for.

[repost ]Typesafe Interview: Scala + Akka Is An IaaS For Your Process Architecture

original:http://highscalability.com/blog/2013/5/8/typesafe-interview-scala-akka-is-an-iaas-for-your-process-ar.html

This is an email interview with Viktor Klang, Director of Engineering at Typesafe, on the Scala Futures model &Akka, both topics on which is he is immensely passionate and knowledgeable.

How do you structure your application? That’s the question I explored in the article Beyond Threads And Callbacks. An option I did not talk about, mostly because of my own ignorance, is a powerful stack you may not be all that familiar with: Scala and Akka.

To remedy my oversight is our acting tour guide, Typesafe’s Viktor Klang, long time Scala hacker and Java enterprise systems architect. Viktor was very patient in answering my questions and was enthusiastic about sharing his knowledge. He’s a guy who definitely knows what he is talking about.

I’ve implemented several Actor systems along with the messaging infrastructure, threading, async IO, service orchestration, failover, etc, so I’m innately skeptical about frameworks that remove control from the programmer at the cost of latency.

So at the end of the interview am I ready to drink the koolaid? Not quite, but I’ll have a cup of coffee with the idea.

I came to think of Scala + Akka as a kind of a IaaS for your process architecture. Toss in Play for the web framework and you have a slick stack, with far more out of the box power than Go, Node, or plaino jaino Java.

The build or buy decision is surprisingly similar to every other infrastructure decision you make. Should you use a cloud or build your own? It’s the same sort of calculation you need to go through when deciding on your process architecture. While at the extremes you lose functionality and flexibility, but since they’ve already thought of most everything you would need to think about, with examples, and support, you gain a tremendous amount too. Traditionally, however, processes architecture has been entirely ad-hoc. That may be changing.

Now, let’s start the interview with Viktor…

 

HS:  What is an Actor?

So let’s start from the very beginning! An Actor in the Actor Model is comprised by 3 distinct pieces:

  • A behavior
  • An address
  • A mailbox

The Address is the thing you send messages to, they are then put into the Mailbox and the Behavior is applied to the messages in the mailbox—one at a time. Since only one message is processed at a time, you can view an Actor as an island of consistency, connected to other actors via their Addresses and by sending and receiving messages from them.

There are 3 core operations that an Actor needs to support in order for it to qualify as an Actor.

  1. CREATE—an Actor has to be able to create new Actors
  2. SEND—an Actor needs to be able to send messages to Actors
  3. BECOME—an Actor needs to be able to change its behavior for the next message

Since what you send messages to is an Address, there is an indirection which allows the Mailbox and Behavior to live essentially anywhere, as long as the message can get routed there. This is also referred to as Location Transparency.

HS: How does Akka implement the Actor model?

Like the Actor model but requests are served by a designated pool configured on a per-actor basis. This allows for fine-grained control over execution provisioning and a means of bulkheading parts of your application from other parts of the application. Akka also allows to configure the mailbox implementation on a per-actor basis, which means that some actors might need a bounded one, some might want a priority-based one, some might want a deduplicating one, or fine-tuning things like overflow protection with head-dropping vs. tail-dropping etc.

Comparing with Threads, Akka Actors are extremely light-weight, clocking in at around 500b per instance, allowing for running many millions of actors on a commodity machine. Like Erlang Processes, Akka Actors are location transparent which means that it is possible to scale out to multiple machines without changing the way the code is written.

Akka Actors do not block on a thread when not having anything to process, which allows for high throughput at low latency as wake-up lag for threads can be avoided. It is also possible to configure the number of messages to process before handing back the thread to the pool, it is also possible to specify a time slice which will allow for the actor to keep processing new messages as long as it hasn’t run out of its time slice before handing back the thread to the pool.

This allows to tune for fairness or for throughput. Akka Actors will not be preempted when a higher-priority message arrives, but it is possible to have multiple actors sharing the same mailbox, which can mitigate this if required.

Inspired by Process Linking from Erlang, Akka Actors form a strict hierarchy, where actors created by an actor from a child-parent relationship where the parent is responsible for handling the failure of the children by issuing directives on how to deal with the different types of failure that can occur, or choose to escalate the problem to its parent. This has the benefit of creating the same kind of self-healing capabilities exhibited by Erlang. It is also possible for an Akka Actor to observe when another Actor will not be available anymore, and handle that accordingly.

 

HS:  Can you give an example of how Process Linking works in practice?

Actor A receives message B, which entails a potentially risky operation C (could be contacting an external server or do a computation that might blow up) instead of doing that work itself, it may spawn a new actor and let that actor do this risky operation. If that operation fails, then the exception is propagated to A (being the “parent”) who can decide to restart the failed actor to retry, or perhaps log that it failed. No matter if it fails or not, A has not been at risk, as the dangerous operation was delegated and managed. In the case of a more serious error that A cannot manage, A would escalate that error to its parent who might then act upon it instead.

 

HS: Can you go into some more detail about bulkheading, why is it important and how it’s  accomplished in Akka?

The Bulkhead Stability Pattern is from EIP by Nygard. It’s about gaining stability by compartmentalization, just like bulkheads for a boat.

Bulkheading of Threads in Akka is accomplished by assigning different thread pools to different segments of your actor hierarchy, which means that if one thread pool is overloaded by either high load, DoS attempt or a logic error creating an infinite loop for instance, other parts of the application can proceed since their Threads cannot be “infected” by the failing thread pool.

HS: Tail-dropping?

When it comes to dealing with asynchronous message passing systems one needs to decide what contention management policies one should use. Back-pressure is one policy, dropping messages is another, and if you decide to drop messages, which ones do you drop. Usually this is something that needs to be decided on a “per service” basis, either you drop the oldest (the one at the front of the queue, i.e. front-dropping) or the newest (tail-dropping). Sometimes one wants to have a priority queue so that the important messages end up at the front of the queue.

 

HS: What about these abilities helps programmers develop better/faster/robuster systems?

In any system, when load grows to surpass the processing capability, one must decide how to deal with the situation. With configurable mailbox implementations you as the developer can decide how to deal with this problem on a case-by-case basis, exploiting business knowledge and constraints to make sure that performance and scalability is not compromised to get the robustness (which is more than likely the case for a one-size-fits-all solution like backpressure).

 

HS: How does the location transparency work?

Each Akka Actor is identified by an ActorRef which is similar to Erlang PIDs, a level of indirection between the instance of the Actor and the senders. So senders only ever interact with ActorRefs which allows the underlying Actor instance to live anywhere (in the world potentially).

 

HS: Is there latency involved in schedule an Akka thread to execute?

When an Actor doesn’t have any messages it is not scheduled for execution, and when it gets a message it will attempt to schedule itself with the thread pool if it hasn’t already done so. The latency is completely up to the implementation of the Thread Pool used, and this is also configurable and extensible/user replaceable. By default Akka uses a state-of-the-art implementation of a thread pool without any single point of contention.

 

HS: Given you can configure the number of messages to  process before handing back the thread to the pool, that makes it a sort of run to completion model and the CPU time isn’t bounded?

Exactly.

 

HS:  Can it be interrupted?

No, but as soon as one message is done, it will check if it still has time left, and if so it will pick the next message.

 

HS: Can you ensure some sort of fair scheduling so some work items can make some progress?

That is up to the ThreadPool implementation and the OS Scheduler, fortunately the user can affect both.

 

HS: When multiple Actors share the same mailbox, if some actor has the CPU, it won’t give up the CPU for the higher priority message to be executed? How does this work on multiple CPUs?

If you have 10 Actors sharing a single priority mailbox and a thread pool of 10 Threads,

there is more opportunity for an actor to be done to pick up the high-priority work than if it’s a single actor that is currently processing a slow and low priority message. So it’s not a watertight solution, but it improves the processing of high-prio messages under that circumstance.

By placing requirements on priority of messages increases lock contention and sacrifices throughput for latency.

 

HS: How do Actors know where to start in a distributed fabric?

That is done by configuration so that one can change the production infrastructure without having to rebuild the application, or run the same application on multiple, different infrastructures without building customized distributions.

 

HS: How do Actors know how to replicate and handle failover?

Also in configuration.

HS: How do you name Actors?

When you create an Akka Actor you specify its name, and the address of the actor is a URI of its place in the hierarchy.

Example: “akka.tcp://applicationName@host:port/user/yourActorsParentsName/yourActorsName”

 

HS: How do you find Actors?

There are a couple of different ways depending on the use-case/situation, either you get the ActorRef (every Akka Actor is referred to by its ActorRef, this is equivalent to Address in the Actor Model) injected via the constructor of the Actor, or you get it in a message or as the sender of a message. If you need to do look ups of Actors there are 2 different ways, 1 is to create an ActorSelection, which can be described as query of the hierarchy, to which you can send messages and all actors matching the query will get it. Or you can use “actorFor” which lets you look up a specific actor using its full URI.

 

HS: How do you know what an Actor can do?

You don’t. Well, unless you define such a protocol, which is trivial.

 

HS: Why is indirection an important capability?

The indirection is important because it clearly separates the location of the behavior from the location of the sender. An indirection that can even be rebound at runtime, migrating actors from one physical node to another without impacting the Address itself.

 

HS: How does you not have contention on you thread pools?

Every Thread in that pool has its own task-queue, and there is no shared queue. Tasks are randomly distributed to the work-queues and when a Thread doesn’t have any tasks it will randomly work steal from other Threads. Having no single point of contention allows for much greater scalability.

 

HS: Could you please give a brief intro into Scala and why it’s so wonderful?

Sure!

I come from a C them C++ then Java background and discovered Scala back in 2007.

For me Scala is about focusing on the business-end of the programming and removing repetition & “ritual” code.

Scala is a unifier of object orientation and functional programming, as well as it is trying to minimize specialized constructs in the language and instead giving powerful & flexible constructs for library authors to ad functionality with.

I personally enjoy that Scala is expression oriented rather than statement oriented, which simplifies code by avoiding a lot of mutable state which tend to easy turn into an Italian pasta dish.

A statement doesn’t “return”/”produce” a result (you could say that it returns void), but instead it “side-effects” by writing to memory locations that it knows about, whereas an expression is a piece of code that “returns”/”produces” a value.

So all in all Scala lets me write less code, with less moving parts making it cheaper to maintain and a joy to write. A great combination in my book!

And not to forget that it allows me to use all good Java libraries out there, and even be consumed by Java (Akka can be used by both Scala and Java as an example).

 

HS: How do Scala futures fit into the scheme of things?

Alright. So I was a co-author of the SIP-14 proposal that was included in Scala 2.10. So the following explanations and discussions will center around that.

A Future is a read-handle for a single value that may be available at some point in time. Once the value is available it cannot and will not be changed.

A Promise is a write-handle for a single value that should be set at some point in time. Once the value is available it cannot and will not be changed.

The value of a Future/Promise may either be a result or an exception.

(You can get the corresponding Future from a Promise (by calling the future()-method on Promise) but not vice versa)

The strength of this model is that it allows you to program as if you already have the result, and the logic is applied when the result is available, effectively creating a data-flow style of programming, a model which easily can take advantage of concurrent evaluation.

When you program with Futures you need to have an ExecutionContext which will be responsible for executing the logic asychronously, for all intents and purposes this is equivalent to a thread pool.

As an example in Scala:

import scala.concurrent.{ Future, ExecutionContext }

import ExecutionContext.Implicits.global // imports into scope the global default execution context

// lets first define a method that adds two Future[Int]s

// This method uses a Scala for-expression, but it is only sugar for:

// f1.flatMap(left => f2.map(right => left + right))

// it asynchronously and non-blockingly adds the result of future1 to the result of future2

def add(f1: Future[Int], f2: Future[Int]): Future[Int] = for(result1 <- f1; result2 <- f2) yield result1 + result2

 

// Then lets define a method that produces random integers

def randomInteger() = 4 // Determined by fair dice roll

val future1 = Future(randomInteger()) //Internally creates a Promise[Int] and returns its Future[Int] immediately and calls “randomInteger()” asynchronously and completes the promise with the result which is then accessible from its Future.

val future2 = Future(randomInteger()) // same as above

 val future3 = add(future1, future2)

None of the code above is blocking any thread, and the code is declarative and doesn’t prescribe _how_ the code will be executed. The ExecutionContext can be switched without changing any of the logic.

So what happens if the value is exceptional?

val future3 = add(Future(throw new BadThingsHappenedException), Future(randomInteger()))

Then the exceptional completion of future1 will be propagated to future3.

So lets say we know a way to recover from BadThingsHappenedExceptions, let’s use the recover method:

val future1a = Future(throw new BadThingsHappenedException)

val future1b = future1a recover { case e: BadThingsHappenedException => randomInteger() }

val future2 = Future(randomInteger())

val future3 = add(future1b, future2)

So here we first create future1a, which will be completed exceptionally with a BadThingsHappenedException,

then we call the “recover” method on future1a, and provide a (partial) function literal that can convert BadThingsHappenedExceptions to an Int by calling our amazing randomInteger() method, the result of “recover” is a new future, which we call future1b.

So here we can observe that futures are only completed once, and the way to transform the results or exceptions of a future is to create a new Future which will hold the result of the transformation.

So from a less contrived example standpoint, we can do things like:

val future1 = Future(callSomeWebService) recover { case _: ConnectException => callSomeBackupWebService() }

val future2 = Future(callSomeOtherWebService) recover { case _: ConnectException => callSomeOtherBackupWebService() }

val future3 = for(firstResult <- future1; secondResult <- future2) yield combineResults(firstResult, secondResult)

future3 map { result => convertToHttpResponse(result) }

          recover { case _ => HttpResponse(400) } // underscore means “anything”

          foreach { response => sendResponseToClient(response) }

So what we do here is that we asynchronously call a couple of web services, and if any of them fail with a ConnectException we try to call some backup webservice, then we combine the results of those web-service responses into some intermediate result, then we convert that result into some HttpResponse, if there has been any exceptional things happened this far, we’ll recover to a HttpResponse which will have a 400-status and as the very last step we send our HttpResponse to some client that requested it.

So in our code we never wait for anything, what we do is to declare what we want to happen when/if we have a result, and there is a clear flow of data.

 

HS: Is a Future a scalar or can it have structure (arrays, maps, stucts,  etc)?

It is a single memory slot that can only be written once. So what you write to it should be a value (i.e. immutable) but can be a struct, a Map or what have you.

HS: How do you implement more interesting state machines where results from one state are used in another? I think that’s what I have a problem with a lot of times. I would prefer to go to clear error state where errors handled, for example. In the linkedin example they parallelize three separate calls and have a bit of error handling code somewhere that doesn’t seem to know where the error came from or why, which makes crafting specific error response difficult.

I understand what you mean, but I view it differently. With Futures you deal with the failure where you can, just as you deal with exceptions in Java where you can. This may or may not be in the method that produces the exception, or in the caller, or in the callers caller or otherwise.

You could view Futures (with exceptional results) as an on-heap version of exception handling (in contrast to plain ex

Exception handling which is on stack, meaning that any thread can choose to deal with the exception and not only the thread that causes it).

 

HS: A lot of the never wait for anything seems normal to me in C++. Send a message. All IO is async. Replies comes back. Gets dropped into the right actor queue.

I hear you! A lot of the good things we learned from C/C++ still applies, i.e. async IO is more resource efficient than blocking IO etc.

 

HS: The actor state machine makes sense of what to do. Thread contexts are correct. In your example there’s no shared state, which is the simplest situation, but when shared state is involved it’s not so clean, especially when many of these are bits of code are execution simultaneously.

Of course, but it depends on what one means by shared state. Something that I find useful is “what would I do if the actors were people and they’d be in different locations?”

Sharing state (immutable values) via message-passing is perfectly natural and in reality mimics how we as humans share knowledge (we don’t flip each others neurons directly :) )

Related Articles

[repost ]Indexing into ElasticSearch with Akka and Scala

original:http://sujitpal.blogspot.com/2012/11/indexing-into-elasticsearch-with-akka.html
I just completed the Functional Programming Principles with Scala course on Coursera, taught by Dr Martin Odersky, the creator of Scala. Lately I’ve been trying to use Scala (instead of Java) for my personal projects, so I jumped at the opportunity to learn the language from its creator. While I did learn things about Scala I didn’t know before, the greatest gain for me was what I learned about Functional Programming principles. Before the course, I had thought of Scala as a better, leaner Java, but after the course, I am beginning to appreciate also the emphasis on immutability and recursive constructs that one can see in Scala code examples on the Internet.

Some time after I finished the course, I heard about the Typesafe Developer Contest. Developers are invited to submit applications that use one or more of the components in the Typesafe Stack (Scala, Akka and Play). While I have no illusion of winning any prizes here, I figured it would be a good opportunity for to practice what I had learnt, and to also expand my knowledge of Scala to include two of its most popular libraries.

The application I came up with was a Scala front end to ElasticSearch. ElasticSearch is a distributed, RESTful search engine built on top of Apache Lucene, and has been on my list of things to check out for a while now. Communication with ElasticSearch is via its very comprehensive JSON over HTTP REST interface.

This post will describe the indexing portion of the application. Indexing by nature is embarassingly parallel, so its a perfect candidate for an Akka Actor based concurrency. The system consists of a single Master actor which spawns a fixed number of Worker actors and a Reaper actor which shuts down the system once all documents are processed.

Data is supplied to the indexing system as a arbitarily nested tree of files on the local filesystem. For each dataset, one must provide a parser to convert the file into a Map of name value pairs, a FileFilter that decides which of the files to pick for indexing, and the schema to use for the dataset. I used the Enron Email Dataset for my development and testing.

The Master Actor is responsible for crawling the local filesystem and distributing the list of files among the Worker Actors. Each Worker Actor processes one file at a time and POSTs the results as a JSON string to ElasticSearch, then sends back the result to the Master, which updates its count of successes and failures. Once the number of successes and failures equal the number of original files sent for processing to the workers, the Master sends a signal to the Reaper and shuts itself down. The Reaper then sends a signal to shut down the entire system. The structure is based heavily on the Akka tutorial and the Shutdown Patterns in Akka2 blog post.

The diagram below shows the different actors and the message sequence. Actors are represented by ovals and the solid colored lines represent the messages (and their sequence) being passed to them. The dotted green lines with the diamond heads show how the components are instantiated.

Additionally, I use Play’s WebServices API to do the HTTP POST and PUT requests to ElasticSearch, and Play’s JSON API to parse and create JSON requests out of native Scala data structures. Here is the code for the Actor system.

1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package esc.index

import java.io.{FileFilter, File}

import scala.Array.canBuildFrom
import scala.collection.immutable.Stream.consWrapper
import scala.io.Source

import akka.actor.actorRef2Scala
import akka.actor.{Props, ActorSystem, ActorRef, Actor}
import akka.routing.RoundRobinRouter
import play.api.libs.json.Json
import play.libs.WS

object Indexer extends App {

  /////////////// start main /////////////////

  val props = properties(new File("conf/indexer.properties"))
  val server0 = List(props("serverName"), 
      props("indexName")).foldRight("")(_ + "/" + _)
  val server1 = List(props("serverName"), 
      props("indexName"), 
      props("mappingName")).foldRight("")(_ + "/" + _)

  indexFiles(props)

  /////////////// end main /////////////////

  def indexFiles(props: Map[String,String]): Unit = {
    val system = ActorSystem("ElasticSearchIndexer")
    val reaper = system.actorOf(Props[Reaper], name="reaper")
    val master = system.actorOf(Props(new IndexMaster(props, reaper)), 
      name="master")
    master ! StartMsg
  }

  //////////////// actor and message definitions //////////////////

  sealed trait EscMsg
  case class StartMsg extends EscMsg
  case class IndexMsg(file: File) extends EscMsg
  case class IndexRspMsg(status: Int) extends EscMsg

  class IndexMaster(props: Map[String,String], reaper: ActorRef) 
      extends Actor {
    val numIndexers = props("numIndexers").toInt
    val schema = Class.forName(props("schemaClass")).
      newInstance.asInstanceOf[Schema]
    val router = context.actorOf(Props(new IndexWorker(props)).
      withRouter(RoundRobinRouter(numIndexers)))

    var nreqs = 0
    var succs = 0
    var fails = 0

    def createIndex(): Int = sendToServer(server0, """
      {"settings": 
        {"index": 
          {"number_of_shards": %s,
           "number_of_replicas": %s}
      }}""".format(props("numShards"), props("numReplicas")), 
      false)

    def createSchema(): Int = sendToServer(server1 + "_mapping", 
      """{ "%s" : { "properties" : %s } }""".
      format(props("indexName"), schema.mappings), false)

    def receive = {
      case StartMsg => {
        val filefilter = Class.forName(props("filterClass")).
          newInstance.asInstanceOf[FileFilter]
        val files = walk(new File(props("rootDir"))).
          filter(f => filefilter.accept(f))
        createIndex()
        createSchema()
        for (file <- files) {
          nreqs = nreqs + 1
          router ! IndexMsg(file) 
        }
      }
      case IndexRspMsg(status) => {
        if (status == 0) succs = succs + 1 else fails = fails + 1
        val processed = succs + fails
        if (processed % 100 == 0)
          println("Processed %d/%d (success=%d, failures=%d)".
            format(processed, nreqs, succs, fails))
        if (nreqs == processed) {
          println("Processed %d/%d (success=%d, failures=%d)".
            format(processed, nreqs, succs, fails))
          reaper ! IndexRspMsg(-1)
          context.stop(self)
        }
      }
    }
  }

  class IndexWorker(props: Map[String,String]) extends Actor {

    val parser = Class.forName(props("parserClass")).
      newInstance.asInstanceOf[Parser]
    val schema = Class.forName(props("schemaClass")).
      newInstance.asInstanceOf[Schema]

    def addDocument(doc: Map[String,String]): Int = {
      val json = doc.filter(kv => schema.isValid(kv._1)).
        map(kv => if (schema.isMultiValued(kv._1)) 
          Json.toJson(kv._1) -> Json.toJson(kv._2.split(",").
            map(e => e.trim).toSeq)
          else Json.toJson(kv._1) -> Json.toJson(kv._2)).
        foldLeft("")((s, e) => s + e._1 + " : " + e._2 + ",")
      sendToServer(server1, "{" + json.substring(0, json.length - 1) + "}", true)
    }

    def receive = {
      case IndexMsg(file) => {
        val doc = parser.parse(Source.fromFile(file))
        sender ! IndexRspMsg(addDocument(doc))
      }
    }
  }

  class Reaper extends Actor {
    def receive = {
      case IndexRspMsg(-1) => {
        println("Shutting down ElasticSearchIndexer")
        context.system.shutdown 
      }
    }  
  }

  ///////////////// global functions ////////////////////

  def properties(conf: File): Map[String,String] = {
    Map() ++ Source.fromFile(conf).getLines().toList.
      filter(line => (! (line.isEmpty || line.startsWith("#")))).
      map(line => (line.split("=")(0) -> line.split("=")(1)))
  }  

  def walk(root: File): Stream[File] = {
    if (root.isDirectory) 
      root #:: root.listFiles.toStream.flatMap(walk(_))
    else root #:: Stream.empty
  }

  def sendToServer(server: String, payload: String, 
      usePost: Boolean): Int = {
    val rsp = if (usePost) WS.url(server).post(payload).get
              else WS.url(server).put(payload).get
    val rspBody = Json.parse(rsp.getBody)
    (rspBody \ "ok").asOpt[Boolean] match {
      case Some(true) => 0
      case _ => -1
    }
  }
}

Of course, the indexing application is intended to be useful beyond the Enron dataset. To that end, I define a set of extension points which can be implemented by someone intending to index some new data with my code above. Its modeled as a set of traits for which concrete implementations need to be provided.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package esc.index

import scala.io.Source
import play.api.libs.json.Json

/////////////// Parser /////////////////

/**
 * An implementation of the Parser trait must be supplied
 * by the user for each new data source. The parse() method
 * defines the parsing logic for the new content.
 */
trait Parser {

  /**
   * @param s a Source representing a file on local filesystem.
   * @return a Map of field name and value.
   */
  def parse(s: Source): Map[String,String]
}

/////////////// Schema /////////////////

/**
 * An implementation of the Schema trait must be supplied 
 * by the user for each new data source. The mappings() 
 * method is a JSON string containing the fields and their
 * properties. It can be used to directly do a put_mapping
 * call on elastic search. The base trait defines some 
 * convenience methods on the mapping string.
 */
trait Schema {

  /**
   * @return a JSON string representing the field names
   * and properties for the content source.
   */
  def mappings(): String

  /**
   * @param fieldname the name of the field.
   * @return true if field exists in mapping, else false.
   */
  def isValid(fieldname: String): Boolean = {
    lazy val schemaMap = Json.parse(mappings)
    (schemaMap \ fieldname \ "type").asOpt[String] match {
      case Some(_) => true
      case None => false
    }
  }

  /**
   * @param fieldname the name of the field.
   * @return true if field is declared as multivalued, else false.
   */
  def isMultiValued(fieldname: String): Boolean = {
    lazy val schemaMap = Json.parse(mappings)
    (schemaMap \ fieldname \ "multi_field").asOpt[String] match {
      case Some("yes") => true
      case Some("no") => false
      case None => false
    }
  }
}

And finally here are concrete implementation of these traits for the Enron dataset.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package esc.index

import scala.io.Source
import scala.collection.immutable.HashMap
import java.io.FileFilter
import java.io.File
import java.util.Date
import java.text.SimpleDateFormat

/**
 * User-configurable classes for the Enron data. These are
 * the classes that will be required to be supplied by a user
 * for indexing a new data source. 
 */

class EnronParser extends Parser {

  override def parse(source: Source): Map[String,String] = {
    parse0(source.getLines(), HashMap[String,String](), false)
  }

  def parse0(lines: Iterator[String], map: Map[String,String], 
      startBody: Boolean): Map[String,String] = {
    if (lines.isEmpty) map
    else {
      val head = lines.next()
      if (head.trim.length == 0) parse0(lines, map, true)
      else if (startBody) {
        val body = map.getOrElse("body", "") + "\n" + head
        parse0(lines, map + ("body" -> body), startBody)
      } else {
        val split = head.indexOf(':')
        if (split > 0) {
          val kv = (head.substring(0, split), head.substring(split + 1))
          val key = kv._1.map(c => if (c == '-') '_' else c).trim.toLowerCase
          val value = kv._1 match {
            case "Date" => formatDate(kv._2.trim)
            case _ => kv._2.trim
          }
          parse0(lines, map + (key -> value), startBody)
        } else parse0(lines, map, startBody)
      }
    }
  }

  def formatDate(date: String): String = {
    lazy val parser = new SimpleDateFormat("EEE, dd MMM yyyy HH:mm:ss")
    lazy val formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss")
    formatter.format(parser.parse(date.substring(0, date.lastIndexOf('-') - 1)))
  }
}

class EnronFileFilter extends FileFilter {
  override def accept(file: File): Boolean = {
    file.getAbsolutePath().contains("/sent/")
  }
}

class EnronSchema extends Schema {
  override def mappings(): String = """{
    "message_id": {"type": "string", "index": "not_analyzed", "store": "yes"},
    "from": {"type": "string", "index": "not_analyzed", "store": "yes"},
    "to": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "x_cc": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "x_bcc": {"type": "string", "index": "not_analyzed", "store": "yes", 
           "multi_field": "yes"},
    "date": {"type": "date", "index": "not_analyzed", "store": "yes"},
    "subject": {"type": "string", "index": "analyzed", "store": "yes"},
    "body": {"type": "string", "index": "analyzed", "store": "yes"}
  }"""
}

I did a very basic installation of ElasticSearch, just unzipping the distribution (my version is 0.19.11) and setting the cluster.name to “sujits_first_es_server” and network.bind_host to 127.0.0.1, then started on a separate terminal window with “bin/elasticsearch -f -Des.config.file=config/elasticsearch.yml”. You can verify that its up on http://localhost:9200.

This is most likely misconfiguration on my part, but whenever I had the network up (wired or wireless), ElasticSearch would try to find clusters to replicate with. Ultimately I had to turn off networking on my computer to get all the data to load.

One more thing is that while the code runs to completion, it does not exit, I have to manually terminate it with CTRL+C. I suspect it is some Future waiting on the sendToServer method, since if I replace that with a NOOP returning 0, it does run to completion normally. I need to investigate this further.

I will describe the search subsystem in a future post (once I finish it).

Update 2012-11-20 – I made some changes to the indexer to use the current GA version of Play2 (2.9.1/2.0.4) instead of a later yet to be released version that I was using previously. I also changed the code to use the Scala WebService libs instead of the Java one that I was using previously. I had hoped that this would allow the application to terminate, but no luck there. I ended up having to start a Play embedded server because the WS calls kept complaining about no running application being found. Somehow the Play WS API seems to be overkill for this application, I am considering switching to spray instead. The latest code can be found on GitHub.

[repost ]Viktor Klang on Akka, Futures and Promises, Scala

original:http://www.infoq.com/interviews/klang-akka

Summary
Viktor Klang talks about the features of Akka 2.x and future releases, Akka’s approach to fault tolerance, the effort to unify Futures in Scala, and the state of functional programming.

Bio
Viktor Klang, Also known as √, is a passionate programmer with a taste for concurrency paradigms and performance optimization. He’s the Tech Lead for the Akka project at Typesafe.

About the conference
Scala Days is the premier event for Scala enthusiasts, researchers, and practitioners. Scala is a general-purpose programming language designed to express common programming patterns in a concise, elegant, and type-safe way. It smoothly integrates features of object-oriented and functional programming.

 

 We are here at Scala Days 2012 in London and we found Viktor Klang. So Viktor who are you? 
 You mentioned Akka, can you give the basic introduction, the elevator pitch for Akka, what is it? 
 So can it be compared to Erlang, are the fault tolerance features inspired by Erlang? 
 So in Erlang you have to set up your own supervisor trees, that is different? 
 So in your talk yesterday, you talked about Futures and Promises and all these things. How do they relate to Actors? 
 This is the standard library of Scala or of Akka? 
 Is it a kind of a channel, can you think of it that way? 
 So you already brought out our favorite word, Monad, everybody loves a Monad? 
 So what is 2012, it’s Zippers, the Kleene-Star (whatever that is), Arrows? 
 Lenses are basically mutable mappings between different states? 
 Going back to the Monad, the Monads will help you with composing these Dataflows, is that one way of using Futures in Scala, I think they have special notation in Scala, to do that? 
 The last question about the Futures, you mentioned yesterday we have dozens of implementations of Futures, why do they exist, do they solve different problems or can you unify them, what is the solution there, or what problem do they solve? 
 It’s a big problem are to take on? 
 So at the moment we are at Akka 2.x basically, so what is the future for Akka? 
 So you have a stabile base in Akka 2 basically? 
 Since we are here at the Scala Days we need to chat about Scala a bit. Is Scala an enabler of Akka? 
 So what is Akka write in it? 
 Traits are essentially a sane version of multiple inheritance? 
 You mentioned your epiphany moment, before the moment what were your frustrations, were you frustrated with functional thinking, with static typing, what were your problems? 
 To wrap up we have a special Scala Days question, Viktor Klang if you had to describe yourself as a Monad, what would that Monad be? 

[repost ]Processing 10 million messages with Akka

original:http://www.akkaessentials.in/2012/03/processing-10-million-messages-with.html

Akka Actors promise concurrency. What better way to simulate that and see if how much time it takes to process 10 million messages using commodity hardware and software without any low level tunings. I wrote the entire 10 million messages processing in Java and the overall results astonished me.

When I ran the program on my iMac machine with an intel i5 – 4 core, 4 Gb RAM machine and JVM heap at 1024Mb, the program processed 10 million machines in 23 secs. I ran the program multiple times and the average time was in the range of 25 secs. So the through put I received was almost in the range of 400K messages per second which is phenomenal.


The below picture explains the flow used to simulate the load generation scenario.

Caveat: Each message sends a response after 1 second which is not the right simulation for a real world scenario. The message processing will consume some resources on the heap and gc activity which are not accounted for, in this scenario.

The program uses overall direction from the post Akka actors : 10 millions messages processed (1s / message) in 75 seconds ! although without any messages throttling.

The code base for the program is available at the following location – https://github.com/write2munish/Akka-Essentials

The ApplicationManagerSystem creates the actors and pumps in the traffic to the WorkerActor

        private ActorSystem system;
private final ActorRef router;
private final static int no_of_msgs = 10 * 1000000;
public ApplicationManagerSystem() {
final int no_of_workers = 10;
system = ActorSystem.create(“LoadGeneratorApp”);
final ActorRef appManager = system.actorOf(
new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new JobControllerActor(no_of_msgs);
}
}), “jobController”);
router = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new WorkerActor(appManager);
}
}).withRouter(new RoundRobinRouter(no_of_workers)));
}
private void generateLoad() {
for (int i = no_of_msgs; i >= 0; i–) {
router.tell(“Job Id ” + i + “# send”);
}
System.out.println(“All jobs sent successfully”);
}

Once the messages are received by the WorkerActor, the responses are scheduled to be send after 1000 milli secs

public class WorkerActor extends UntypedActor {
private ActorRef jobController;
@Override
public void onReceive(Object message) throws Exception {
// using scheduler to send the reply after 1000 milliseconds
getContext()
.system()
.scheduler()
.scheduleOnce(Duration.create(1000, TimeUnit.MILLISECONDS),
jobController, “Done”);
}
public WorkerActor(ActorRef inJobController) {
jobController = inJobController;
}
}

The response messages from the WorkerActor are send across to the JobControllerActor, that collects all the responses.

public class JobControllerActor extends UntypedActor {
int count = 0;
long startedTime = System.currentTimeMillis();
int no_of_msgs = 0;
@Override
public void onReceive(Object message) throws Exception {
if (message instanceof String) {
if (((String) message).compareTo(“Done”) == 0) {
count++;
if (count == no_of_msgs) {
long now = System.currentTimeMillis();
System.out.println(“All messages processed in “
+ (now – startedTime) / 1000 + ” seconds”);
System.out.println(“Total Number of messages processed “
+ count);
getContext().system().shutdown();
}
}
}
}
}

[repost ]Scaling Out with Scala and Akka on Heroku

original:https://devcenter.heroku.com/articles/scaling-out-with-scala-and-akka

 

Table of Contents

 

This article is a guided tour of an application called Web Words illustrating the use of Scala and Akka on Heroku’s Cedar stack. Web Words goes beyond “Hello, World” to show you a sampler of the technology you might use in a real-world application:

  • organizing a scalable application with Akka actors, rather than with explicit threads and java.util.concurrent
  • the power of functional programming and parallel collections in Scala
  • dividing an application into separate web and worker processes, usingRabbitMQ to send jobs to the worker
  • caching worker results in a MongoDB capped collection
  • embedding a Jetty HTTP server
  • forwarding Jetty requests to Akka HTTP to handle them asynchronously without tying up a thread
  • many small details along the way: using a Java library from Scala, using Akka actor pools, using Akka’s Future class, some cute Scala tricks, and more.

Because this sampler application shows so many ideas, you may want to skip around the article to the topics you’re most interested in.

Don’t forget, if the article skips a detail you’d like to know more about, you can also jump to the source code to see what’s what.

The README over on GitHub has instructions for running Web Words locally or on Heroku.

This article is not a ground-up tutorial on any of the technologies mentioned; the idea is to give you a taste. So don’t worry if some of the details remain unclear, just follow the links and dig deeper!

Sample code for the Web Words demo app is available on GitHub.

Web Words

The sample application, Web Words, takes a site URL and does a “shallow spider” (following just a few of the same-site links at the URL). It churns through the HTML on the site, calculating word frequencies and scraping links, then presents results:

Web Words screenshot

Web Words includes both IO-bound and CPU-bound tasks, illustrating how Scala and Akka support both.

The application is split into a web process, which parses HTTP requests and formats results in HTML, and a worker process called indexer which does the spidering and computes the results.

Overview: a request step-by-step

If you follow an incoming request to Web Words, here’s what the app shows you:

  • an embedded Jetty HTTP server receives requests to spider sites – http://wiki.eclipse.org/Jetty/Tutorial/Embedding_Jetty
  • requests are forwarded to Akka HTTP, which uses Jetty Continuations to keep requests from tying up threads – http://akka.io/docs/akka/1.2/scala/http.html
  • the web process checks for previously-spidered info in a MongoDB capped collection which acts as a cache. This uses the Heroku MongoHQ addon. – http://www.mongodb.org/display/DOCS/Capped+Collections – http://devcenter.heroku.com/articles/mongohq
  • if the spider results are not cached, the web process sends a spider request to an indexer process using the RabbitMQ AMQP addon – http://www.rabbitmq.com/getstarted.html – http://blog.heroku.com/archives/2011/8/31/rabbitmq_add_on_now_available_on_heroku/
  • the app talks to RabbitMQ using Akka AMQP – http://akka.io/docs/akka-modules/1.2/modules/amqp.html
  • the indexer process receives a request from AMQP and shallow-spiders the site using an Akka actor that encapsulates AsyncHttpClient – https://github.com/sonatype/async-http-client
  • the indexer uses Akka, Scala parallel collections, and JSoup to grind through the downloaded HTML taking advantage of multiple CPU cores – http://www.scala-lang.org/api/current/scala/collection/parallel/package.html – http://jsoup.org
  • the indexer stores its output back in the MongoDB cache and sends an AMQP message back to the web process
  • the web process loads the now-cached data from MongoDB
  • the web process unsuspends the Jetty request and writes out the results

Akka: actor and future

Almost all the code in Web Words runs inside Akka actors.

An actor is the fundamental unit of organization in an Akka application. The actor model comes from Erlang, where it’s said to support code with nine “9”s of uptime.

An actor is:

  • an object
  • that sends and receives messages
  • that’s guaranteed to process only one message on one thread at a time

Because actors work with messages, rather than methods, they look like little network servers rather than regular objects. However, since they’re (often) in-process, actors are much lighter-weight than a separate server daemon would be. In fact, they’re much lighter-weight than threads. One JVM process could contain millions of actors, compared to thousands of threads.

Toy actor example

A trivial, toy example could look like:

import akka.actor._

class HelloActor extends Actor {
    override def receive = {
        case "Hello" => self reply "World"
    }
}

This is an actor that receives a string object “Hello” as a message and sends back the string object “World” as a message.

There are only two steps to create an actor:

  • extend the Actor base class
  • override receive to handle your messages

The whole point of an Actor is that receive need not be thread-safe; it will be called for one message at a time so there’s no need for locking on your actor’s member variables, as long as you only touch the actor’s state from insidereceive. (If you spawn your own threads and have those touch the actor outside Akka’s control, you are on your own. Don’t do that.)

To use this actor, you could write:

import akka.actor._
import akka.actor.Actor.actorOf

val actor = actorOf[HelloActor]
actor.start
val future = actor ? "Hello"
println("Got: " + future.get)
actor.stop

The method Actor.actorOf creates an ActorRef, which is a handle that lets you talk to an actor. The idea is to forbid you from calling methods on the actor; you can only send messages. (Also: the ActorRef may refer to an actor running on another machine, or due to actor restarts may refer to different actor instances over time.)

Actor references have a start method which registers the actor with the Akka system and a stop method which unregisters the actor.

The operator ? (also known as ask) sends a message to an actor and returns aFuture containing the reply to that message. Future.get blocks and waits for the future to be completed (a bad practice! we’ll see later how to avoid it), returning the result contained in the Future.

If you don’t need a Future for the reply, you can use the ! operator (also known as tell) to send a message instead. If you use ! from within another actor, the sending actor will still receive any reply message, but it will be handled by the actor’s receive method rather than sent to a Future.

Real actors and futures in Web Words

Now let’s look at a real example.

URLFetcher actor

In URLFetcher.scala, an actor encapsulates AsyncHttpClient. The actor supports only one message, FetchURL, which asks it to download a URL:

sealed trait URLFetcherIncoming
case class FetchURL(u: URL) extends URLFetcherIncoming

While messages can be any object, it’s highly recommended to use immutableobjects (immutable means no “setters” or modifiable state). In Scala, a case class makes an ideal message.

The URLFetcherIncoming trait is optional: it gives you a type shared by all messages coming in to the actor. Because the trait is sealed, the compiler can warn you if a match expression doesn’t handle all message types.

The URLFetcher actor supports only one outgoing message, a reply to FetchURL:

sealed trait URLFetcherOutgoing
case class URLFetched(status: Int, headers: Map[String, String], body: String) extends URLFetcherOutgoing

The actor itself holds an AsyncHttpClient object from the AsyncHttpClient library and uses it to handle FetchURL messages, like this:

class URLFetcher extends Actor {

    private val asyncHttpClient = URLFetcher.makeClient

    override def receive = {
        case incoming: URLFetcherIncoming => {
            val f = incoming match {
                case FetchURL(u) =>
                    URLFetcher.fetchURL(asyncHttpClient, u)
            }

            self.channel.replyWith(f)
        }
    }

    override def postStop = {
        asyncHttpClient.close()
    }
}

Of course the real work happens in URLFetcher.fetchURL() which maps theAsyncHttpClient API onto an Akka future. Check out URLFetcher.scala to see that code.

postStop is a hook method actors can override to clean up when the actor shuts down, in this case it closes the AsyncHttpClient object.

Akka automatically sets up a self field, referring to an actor’s own ActorRef.

self.channel refers to the current message’s sender. A Channel can receive messages, and may be either a Future or an Actor.

replyWith is a utility method kept in the Web Words common project. It’s added to Akka’s Channel using the so-called “Pimp my Library” pattern, so its implementation illustrates both that pattern and the use of Future:

// Class that adds replyWith to Akka channels
class EnhancedChannel[-T](underlying: Channel[T]) {
    /**
     * Replies to a channel with the result or exception from
     * the passed-in future
     */
    def replyWith[A <: T](f: Future[A])(implicit sender: UntypedChannel) = {
        f.onComplete({ f =>
            f.value.get match {
                case Left(t) =>
                    underlying.sendException(t)
                case Right(v) =>
                    underlying.tryTell(v)
            }
        })
    }
}

// implicitly create an EnhancedChannel wrapper to add methods to the
// channel
implicit def enhanceChannel[T](channel: Channel[T]): EnhancedChannel[T] = {
    new EnhancedChannel(channel)
}

The above code is in the package.scala for the common project. In it, you can see how to set up an onComplete callback to be invoked when a Future is completed.

Important caution: a Future will always invoke callbacks in another thread! To avoid concurrency issues and stick to the actor model, use callbacks only to send messages to actors, keeping the real work in an actor’s receive method.

SpiderActor

URLFetcher doesn’t do all that much; it’s a simple proxy giving theAsyncHttpClient object an Akka-style API.

Let’s look at SpiderActor, which uses the URLFetcher to shallow-spider a site.

Again this actor has one request and one reply to go with it:

sealed trait SpiderRequest
case class Spider(url: URL) extends SpiderRequest

sealed trait SpiderReply
case class Spidered(url: URL, index: Index)

Given a site URL, the SpiderActor computes an Index (see Index.scala) to go with it.

SpiderActor delegates to two other actors, one of which is the URLFetcher:

class SpiderActor
    extends Actor {
    private val indexer = actorOf[IndexerActor]
    private val fetcher = actorOf[URLFetcher]

    override def preStart() = {
        indexer.start
        fetcher.start
    }

    override def postStop() = {
        indexer.stop
        fetcher.stop
    }

SpiderActor ties the two other actors to its own lifecycle by overriding preStartand postStop, ensuring that the entire “tree” of actors starts and stops together.

Composing futures

SpiderActor offers a nice illustration of how to use map and flatMap withFuture. First, in a fetchBody method, we send a request to the URLFetcher then use map to convert the URLFetched reply into a simple string:

private def fetchBody(fetcher: ActorRef, url: URL): Future[String] = {
    val fetched = fetcher ? FetchURL(url)
    fetched map {
        case URLFetched(status, headers, body) if status == 200 =>
            body
        case URLFetched(status, headers, body) =>
            throw new Exception("Failed to fetch, status: " + status)
        case whatever =>
            throw new IllegalStateException("Unexpected reply to url fetch: " + whatever)
    }
}

This example does not block. The code after map runs asynchronously, after theURLFetched reply arrives, and extracts the reply body as a string. If something goes wrong and the exceptions here are thrown, the returned Future[String]would be completed with an exception instead of a result.

Once a reply body comes back, SpiderActor will want to index it (a task performed by IndexerActor). Indexing is itself an asynchronous operation. To “chain” two futures, use flatMap.

Both map and flatMap return a new future. With map, you provide a function to convert the original future’s value, when available, into a new value. With flatMap, you provide a function to convert the original future’s value, when available, into yet another future. flatMap is useful if you need to do something else asynchronous, once you have a value from the original future.

This code from SpiderActor uses both map and flatMap to chain theFuture[String] from fetchBody (shown above) into a Future[Index].

private def fetchIndex(indexer: ActorRef, fetcher: ActorRef, url: URL): Future[Index] = {
    fetchBody(fetcher, url) flatMap { body =>
        val indexed = indexer ? IndexHtml(url, body)
        indexed map { result =>
            result match {
                case IndexedHtml(index) =>
                    index
            }
        }
    }
}

Nothing here is blocking, because the code never uses Future.await orFuture.get. Instead, map and flatMap are used to transform futures… in the future.

The nice thing about this is that map and flatMap are standard methods as seen in Scala’s normal collections library, and as seen in Scala’s Option class.Future is like a one-element collection that automatically keeps itself asynchronous as it’s transformed.

Other collection operations such as filter and foreach work on Future, too!

Actor pools

IndexerActor, used by SpiderActor, is an example of an actor pool. An actor pool is an actor that contains a pool of identical delegate actors. Pools can be configured to determine how they load-balance messages among delegates, and to control when they create and destroy delegates.

In Web Words, actor pools are set up in two abstract utility classes,CPUBoundActorPool and IOBoundActorPool. These pools have settings intended to make sense for delegates that compute something on the CPU or delegates that perform blocking IO, respectively.

Many of the settings defined in these utility classes were not arrived at scientifically; you’d need to run benchmarks are on your particular application and hardware to know the ideal settings for sure.

Let’s look at CPUBoundActorPool, then its subclass IndexerActor.

First, CPUBoundActorPool mixes in some traits to select desired policies:

trait CPUBoundActorPool
    extends DefaultActorPool
    with SmallestMailboxSelector
    with BoundedCapacityStrategy
    with MailboxPressureCapacitor
    with Filter
    with BasicRampup
    with BasicBackoff {

Reading from the top down, this actor pool will:

  • SmallestMailboxSelector: send each message to the delegate with the smallest mailbox (least message backlog)
  • BoundedCapacityStrategy: computes the number of delegates within an upper and a lower limit, based on a pressure and a filter method. pressurereturns the number of “busy” delegates, while filter computes a change in actual number of delegates based on the current number and the current pressure.
  • MailboxPressureCapacitor: provides a pressure method which counts delegates as “busy” if they have a backlog of messages exceeding a certain threshold
  • Filter: provides a filter method which delegates to rampup and backoffmethods. These compute proposed increases and decreases in capacity, respectively.
  • BasicRampup: implements the rampup method to compute a percentage increase in delegates when pressure reaches current capacity.
  • BasicBackoff: implements the backoff method to compute a percentage decrease in delegates when pressure falls below a threshold percentage of capacity.

CPUBoundActorPool configures its mixin traits by overriding methods:

    // Selector: selectionCount is how many pool members to send each message to
    override def selectionCount = 1

    // Selector: partialFill controls whether to pick less than selectionCount or
    // send the same message to duplicate delegates, when the pool is smaller
    // than selectionCount. Does not matter if lowerBound >= selectionCount.
    override def partialFill = true

    // BoundedCapacitor: create between lowerBound and upperBound delegates in the pool
    override val lowerBound = 1
    override lazy val upperBound = Runtime.getRuntime().availableProcessors() * 2

    // MailboxPressureCapacitor: pressure is number of delegates with >pressureThreshold messages queued
    override val pressureThreshold = 1

    // BasicRampup: rampupRate is percentage increase in capacity when all delegates are busy
    override def rampupRate = 0.2

    // BasicBackoff: backoffThreshold is the percentage-busy to drop below before
    // we reduce actor count
    override def backoffThreshold = 0.7

    // BasicBackoff: backoffRate is the amount to back off when we are below backoffThreshold.
    // this one is intended to be less than 1.0-backoffThreshold so we keep some slack.
    override def backoffRate = 0.20

Each message will go to just one delegate. The pool will vary between 1 and (2x number of cores) delegates. We’ll ramp up by 20% if all delegates have a backlog of 1 already. We’ll back off by 20% if only 70% of delegates have a backlog of 1. Again, the exact settings are not scientific; you’d have to tune this in a real application.

To subclass CPUBoundActorPoolIndexerActor has to implement just one more thing, a method called instance which generates a new delegate:

override def instance = Actor.actorOf(new Worker())

Actor pools have a method _route which just forwards to a delegate, soIndexerActor can implement receive with that:

override def receive = _route

Optionally, an actor pool could look at the message and decide whether to send it to _route or do something else instead.

akka.conf

Akka has a configuration file akka.conf, automatically loaded from the classpath. Typically you might want to configure the size of Akka’s thread pool and the length of Akka’s timeouts. See the akka.conf for the web process for an example.

Scala

While this article is not an introduction to Scala, the Web Words example does show off some nice properties of Scala that deserve mention.

Working with Java libraries

If you had to rewrite all your Java code, you’d never be able to switch to Scala. Fortunately, you don’t.

For example, IndexerActor uses a Java library, called JSoup, to parse HTML.

In general, you import a Java library and then use it, like this:

import org.jsoup.Jsoup

val doc = Jsoup.parse(docString, url.toExternalForm)

The most common “catch” is that Scala APIs use Scala’s collections library, while Java APIs use Java’s collections library. To solve that, Scala provides two options.

The first one adds explicit asScala and asJava methods to collections, and can be found in JavaConverters:

import scala.collection.JavaConverters._

val anchors = doc.select("a").asScala

The second option, not used in IndexerActor, adds implicit conversions among Scala and Java collections so things “just work”; the downside is, you can’t see by reading the code that there’s a conversion going on. To get implicit conversions, import scala.collection.JavaConversions._ rather thanJavaConverters.

The choice between explicit asScala and asJava methods, and implicit conversions, is a matter of personal taste in most cases. There may be some situations where an explicit conversion is required if the Scala compiler can’t figure out which implicit to use.

The converters work efficiently by creating wrappers around the original collection, so in general should not add much overhead.

Functional programming

With CPUs getting more cores rather than higher clock speeds, functional programming becomes more relevant than ever. Akka’s actor model and Scala’s functional programming emphasis are two tools for developing multithreaded code without error-prone thread management and locking.

(What is it, anyway?)

You may be wondering what “functional programming” means, and why it’s important that Scala offers it.

Here’s a simple definition. Functional programming emphasizes transformation (take a value, return a new value) over mutable state (take a value, change the value in-place). Functional programming contrasts with imperative or procedural programming.

The word function here has the sense of a mathematics-style function. If you think about f(x) in math, it maps a value x to some result f(x)f(x) always represents the same value for a given x. This “always the same output for the same input” property also describes program subroutines that don’t rely upon or modify any mutable state.

In addition to the core distinction between transformation and mutation, “functional programming” tends to imply certain cultural traditions: for example, amap operation that transforms a list by applying a function to each list element.

Functional programming isn’t really a language feature, it’s a pattern that can be applied in any language. For example, here’s how you could use add one to each element in a list in Java, by modifying the list in-place (treating the list as mutable state):

public static void addOneToAll(ArrayList<Integer> items) {
    for (int i = 0; i < items.size(); ++i) {
        items.set(i, items.get(i) + 1);
    }
}

But you could also use a functional style in Java, transforming the list into a new list without modifying the original:

public static List<Integer> addOneToAll(List<Integer> items) {
    ArrayList<Integer> result = new ArrayList<Integer>();
    for (int i : items) {
        result.add(i + 1);
    }
    return result;
}

Unsurprisingly, you can use either style in Scala as well. Imperative style in Scala:

def addOneToAll(items : mutable.IndexedSeq[Int]) = {
    var i = 0
    while (i < items.length) {
        items.update(i, items(i) + 1)
        i += 1
    }
}

Functional style in Scala:

def addOneToAll(items : Seq[Int]) = items map { _ + 1 }

You might notice that the “functional style in Scala” example is shorter than the other three approaches. Not an uncommon situation.

There are several advantages to functional programming:

– it’s inherently parallelizable and thread-safe – it enables many optimizations, such as lazy evaluation – it can make code more flexible and generic – it can make code shorter

Let’s look at some examples in Web Words.

Collection transformation

In SpiderActor, there’s a long series of transformations to choose which links on a page to spider:

// pick a few links on the page to follow, preferring to "descend"
private def childLinksToFollow(url: URL, index: Index): Seq[URL] = {
    val uri = removeFragment((url.toURI))
    val siteRoot = copyURI(uri, path = Some(null))
    val parentPath = new File(uri.getPath).getParent
    val parent = if (parentPath != null) copyURI(uri, path = Some(parentPath)) else siteRoot

    val sameSiteOnly = index.links map {
        kv => kv._2
    } map {
        new URI(_)
    } map {
        removeFragment(_)
    } filter {
        _ != uri
    } filter {
        isBelow(siteRoot, _)
    } sortBy {
        pathDepth(_)
    }
    val siblingsOrChildren = sameSiteOnly filter { isBelow(parent, _) }
    val children = siblingsOrChildren filter { isBelow(uri, _) }

    // prefer children, if not enough then siblings, if not enough then same site
    val toFollow = (children ++ siblingsOrChildren ++ sameSiteOnly).distinct take 10 map { _.toURL }
    toFollow
}

(The syntax { _ != uri } is a function with one parameter, represented by _, that returns a boolean value.)

This illustrates some handy methods found in the Scala collections API.

  • map transforms each element in a collection, returning a new collection of transformed elements. For example, map { new URI(_) } in the above converts a list of strings to a list of URI objects.
  • filter uses a boolean test on each element, including only the elements matching the test in a new collection. For example, filter { _ != uri } in the above includes only those URIs that aren’t the same as the original root URI.
  • sortBy sorts a collection using a function on each element as the key, so to sort by path depth it’s sortBy { pathDepth(_) }.
  • distinct unique-ifies the collection.
  • take picks only the first N items from a collection.

The childLinksToFollow function might be longer and more obfuscated if you wrote it in Java with the Java collections API. The Scala version is also better abstracted: index.links could be any kind of collection (Set or List, parallel or sequential) with few or no code changes.

Better refactoring

First-class functions are a powerful feature for factoring out common code. For example, in the AMQPCheck class (incidentally, another nice example of using an existing Java API from Scala), several places need to close an AMQP object while ignoring possible exceptions. You can quickly and easily do this in Scala:

private def ignoreCloseException(body: => Unit): Unit = {
    try {
        body
    } catch {
        case e: IOException =>
        case e: AlreadyClosedException =>
    }
}

Then use it like this:

 ignoreCloseException { channel.close() }
 ignoreCloseException { connection.close() }

You could also use a more traditional Java-style syntax, like this:

 ignoreCloseException(channel.close())
 ignoreCloseException(connection.close())

In Java, factoring this out to a common method might be clunky enough to keep you from doing it.

Parallel collections

Parallel collections have the same API as regular Scala collections, but operations on them magically take advantage of multiple CPU cores.

Convert any regular (sequential) collection to parallel with the par method and convert any parallel collection to sequential with the seq method. In most situations, parallel and sequential collections are interchangeable, so conversions may not be needed in most code.

Two important points about Scala’s collections library that may be surprising compared to Java:

  • immutable collections are the default; operations on immutable collections return a new, transformed collection, rather than changing the old one in-place
  • when transforming a collection, the new collection will have the same type as the original collection

These properties are crucial to parallel collections. As you use mapfilter,sortBy, etc. on a parallel collection, each new result you compute will itself be parallel as well. This means you only need to convert to parallel once, with a call to par, to convert an entire chain of computations into parallel computations.

Parallel collections are enabled by functional programming; as long as you only use the functional style, the use of multiple threads doesn’t create bugs or trickiness. Parallel looks just like sequential.

Returning to IndexerActor, you can see parallel collections in action. We want to perform a word count; it’s a parallelizable algorithm. So we split the HTML into a parallel collection of lines:

val lines = s.split("\\n").toSeq.par

(toSeq here converts the array from java.lang.String.split() to a Scala sequence, then par converts to parallel.)

Then for each line in parallel we can break the line into words:

val words = lines flatMap { line =>
        notWordRegex.split(line) filter { w => w.nonEmpty }
    }

The flatMap method creates a new collection by matching each element in the original collection to a new sub-collection, then combining the sub-collections into the new collection. In this case, because lines was a parallel collection, the new collection from flatMap will be too.

The parallel collection of words then gets filtered to take out boring words like “is”:

splitWords(body.text) filter { !boring(_) }

And then there’s a function to do the actual word count, again in parallel:

private[indexer] def wordCount(words: ParSeq[String]) = {
    words.aggregate(Map.empty[String, Int])({ (sofar, word) =>
        sofar.get(word) match {
            case Some(old) =>
                sofar + (word -> (old + 1))
            case None =>
                sofar + (word -> 1)
        }
    }, mergeCounts)
}

The aggregate method needs two functions. The first argument to aggregate is identical to the one you’d pass to foldLeft: here it adds one new word to a map from words to counts, returning the new map. In fact you could write wordCountwith foldLeft, but it wouldn’t use multiple threads since foldLeft has to process elements in sequential order:

// ParSeq can't parallelize foldLeft in this version
private[indexer] def wordCount(words: ParSeq[String]) = {
    words.foldLeft(Map.empty[String, Int])({ (sofar, word) =>
        sofar.get(word) match {
            case Some(old) =>
                sofar + (word -> (old + 1))
            case None =>
                sofar + (word -> 1)
        }
    })
}

The second argument to aggregate makes it different from foldLeft: it allowsaggregate to combine two intermediate results. The signature of mergeCounts is:

def mergeCounts(a: Map[String, Int], b: Map[String, Int]): Map[String, Int]

With this available, aggregate can:

  • subdivide the parallel collection (split the sequence of words into multiple sequences)
  • fold the elements in each subdivision together (counting word frequencies per-subdivision in a Map[String,Int])
  • aggregate the results from each subdivision (merging word frequency maps into one word frequency map)

When wordCount returns, IndexerActor computes a list of the top 50 words:

wordCount(words).toSeq.sortBy(0 - _._2) take 50

toSeq here converts the Map[String,Int] to a Seq[(String, Int)]; the result gets sorted in descending order by count; then take 50 takes up to 50 items from the start of the sequence.

Full disclosure: it’s not really a given that using parallel collections forIndexerActor makes sense. That is, it’s completely possible that if you benchmark on a particular hardware setup with some particular input data, using parallel collections here turns out to be slower than sequential. Fortunately, one advantage of the parallel collections approach is that it’s trivial to switch between parallel and sequential collections as your benchmark results roll in.

XML Support

In WebActors.scala you can see an example of Scala’s inline XML support. In this case, it works as a simple template system to generate HTML. Of course there are many template systems available for Scala (plus you can use all the Java ones), but a simple application such as Web Words gets pretty far with the built-in XML support.

Here’s a function from WebActors.scala that returns the page at /words:

def wordsPage(formNode: xml.NodeSeq, resultsNode: xml.NodeSeq) = {
    <html>
        <head>
            <title>Web Words!</title>
        </head>
        <body style="max-width: 800px;">
            <div>
                <div>
                    { formNode }
                </div>
                {
                    if (resultsNode.nonEmpty)
                        <div>
                            { resultsNode }
                        </div>
                }
            </div>
        </body>
    </html>
}

You can just type XML literals into a Scala program, breaking out into Scala code with {} anywhere inside the XML. The {} blocks should return a string (which will be escaped) or a NodeSeq. XML literals themselves are values of typeNodeSeq.

Bridging HTTP to Akka

There are lots of ways to serve HTTP from Scala, even if you only count Scala-specific libraries and frameworks and ignore the many options inherited from Java.

Web Words happens to combine embedded Jetty with Akka’s HTTP support.

Embedded Jetty: web server in a box

Heroku gives you more flexibility than most cloud JVM providers because you can run your own main() method, rather than providing a .war file to be deployed in a servlet container.

Web Words takes advantage of this, using embedded Jetty to start up an HTTP server. Because Web Words on Heroku knows it’s using Jetty, it can rely onJetty Continuations, a Jetty-specific feature that allows Akka HTTP to reply to HTTP requests asynchronously without tying up a thread for the duration of the request. (Traditionally, Java servlet containers need a thread for every open request.)

There’s very little to this; see WebServer.scala, where we fire up a Jetty Serverobject on the port provided by Heroku (the PORT env variable is picked up inWebWordsConfig.scala):

val server = new Server(config.port.getOrElse(8080))
val handler = new ServletContextHandler(ServletContextHandler.SESSIONS)
handler.setContextPath("/")
handler.addServlet(new ServletHolder(new AkkaMistServlet()), "/*");
server.setHandler(handler)
server.start()

ServletContextHandler is a handler for HTTP requests that supports the standardJava servlet API. Web Words needs a servlet context to add AkkaMistServlet to it. (Akka HTTP is also known as Akka Mist, for historical reasons.)AkkaMistServlet forwards HTTP requests to a special actor known as theRootEndpoint, which is also created in WebServer.scala.

By the way, the use of Jetty here is yet another example of seamlessly using a Java API from Scala.

Akka HTTP

The AkkaMistServlet from Akka HTTP suspends incoming requests using Jetty Continuations and forwards each request as a message to the RootEndpointactor.

In WebActors.scala, Web Words defines its own actors to handle requests, registering them with RootEndpoint in the form of the following handlerFactorypartial function:

private val handlerFactory: PartialFunction[String, ActorRef] = {
    case path if handlers.contains(path) =>
        handlers(path)
    case "/" =>
        handlers("/words")
    case path: String =>
        custom404
}

private val handlers = Map(
    "/hello" -> actorOf[HelloActor],
    "/words" -> actorOf(new WordsActor(config)))

private val custom404 = actorOf[Custom404Actor]

Request messages sent from Akka HTTP are subclasses of RequestMethod;RequestMethod wraps the standard HttpServletRequest and HttpServletResponse, and you can access the request and response directly if you like. There are some convenience methods on RequestMethod for common actions such as returning an OK status:

class HelloActor extends Actor {
    override def receive = {
        case get: Get =>
            get OK "hello!"
        case request: RequestMethod =>
            request NotAllowed "unsupported request"
    }
}

Here OK and NotAllowed are methods on RequestMethod that set a status code and write out a string as the body of the response.

The action begins in WordsActor which generates HTML for the main /wordspage of the application, after getting an Index object from a ClientActor instance:

val futureGotIndex = client ? GetIndex(url.get.toExternalForm, skipCache)

futureGotIndex foreach {
    // now we're in another thread, so we just send ourselves
    // a message, don't touch actor state
    case GotIndex(url, indexOption, cacheHit) =>
        self ! Finish(get, url, indexOption, cacheHit, startTime)
}

ClientActor.scala contains the logic to check the MongoDB cache viaIndexStorageActor and kick off an indexer job when there’s a cache miss. When the ClientActor replies, the WordsActor sends itself a Finish message with the information necessary to complete the HTTP request.

To handle the Finish message, WordsActor generates HTML:

private def handleFinish(finish: Finish) = {
    val elapsed = System.currentTimeMillis - finish.startTime
    finish match {
        case Finish(request, url, Some(index), cacheHit, startTime) =>
            val html = wordsPage(form(url, skipCache = false), results(url, index, cacheHit, elapsed))

            completeWithHtml(request, html)

        case Finish(request, url, None, cacheHit, startTime) =>
            request.OK("Failed to index url in " + elapsed + "ms (try reloading)")
    }
}

A couple more nice Scala features are illustrated in handleFinish()!

  • keywords are allowed for parameters: form(url, skipCache = false) is much clearer than form(url, false)
  • pattern matching lets the code distinguish a Finish message withSome(index) from one with None, while simultaneously unpacking the fields in the Finish message

Connecting the web process to the indexer with AMQP

Separating Web Words into two processes, a web frontend and a worker process called indexer, makes it easier to manage the deployed application. The web frontend could in principle serve something useful (at least an error page) while the indexer is down. On a more complex site, some worker processes might be optional. You can also scale the two processes separately as you learn which one will be the bottleneck.

However, having two processes creates a need for communication between them.RabbitMQ, an implementation of the AMQP standard, is conveniently available as a Heroku add-on. AMQP stands for “Advanced Message Queuing Protocol” and that’s what it does: queues messages.

Web Words encapsulates AMQP in two actors, WorkQueueClientActor andWorkQueueWorkerActor. The client actor is used in the web process and the worker actor in the indexer process. Both are subclasses ofAbstractWorkQueueActor which contains some shared implementation.

Akka AMQP module

Akka’s AMQP module contains a handy akka.amqp.rpc package, which layers a request-response remote procedure call on top of AMQP. On the server (worker) side, it creates an “RPC server” which replies to requests:

override def createRpc(connectionActor: ActorRef) = {
    val serializer =
        new RPC.RpcServerSerializer[WorkQueueRequest, WorkQueueReply](WorkQueueRequest.fromBinary, WorkQueueReply.toBinary)
    def requestHandler(request: WorkQueueRequest): WorkQueueReply = {
        // having to block here is not ideal
        // https://www.assembla.com/spaces/akka/tickets/1217
        (self ? request).as[WorkQueueReply].get
    }
    // the need for poolSize>1 is an artifact of having to block in requestHandler above
    rpcServer = Some(RPC.newRpcServer(connectionActor, rpcExchangeName, serializer, requestHandler, poolSize = 8))
}

While on the client (web) side, it creates an “RPC client” which sends requests and receives replies:

override def createRpc(connectionActor: ActorRef) = {
    val serializer =
        new RPC.RpcClientSerializer[WorkQueueRequest, WorkQueueReply](WorkQueueRequest.toBinary, WorkQueueReply.fromBinary)
    rpcClient = Some(RPC.newRpcClient(connectionActor, rpcExchangeName, serializer))
}

WorkQueueClientActor and WorkQueueWorkerActor are thin wrappers around these server and client objects.

Akka’s AMQP module offers several abstractions in addition to theakka.amqp.rpc package, appropriate for different uses of AMQP.

In Web Words, the web process does not heavily rely on getting a reply to RPC requests; the idea is that the web process retrieves results directly from the MongoDB cache. The reply to the RPC request just kicks the web process and tells it to check the cache immediately. If an RPC request times out for some reason, but an indexer process did cache a result, a user pressing reload in their browser should see the newly-cached result.

With multiple indexer processes, RPC requests should be load-balanced across them.

Message serialization

To send messages over AMQP you need some kind of serialization; you can use anything – Java serialization, Google protobufs, or in the Web Words case, acheesy hand-rolled approach that happens to show off some neat Scala features:

def toBinary: Array[Byte] = {
    val fields = this.productIterator map { _.toString }
    WorkQueueMessage.packed(this.getClass.getSimpleName :: fields.toList)
}

Scala case classes automatically extend a trait called Product, which is also extended by tuples (pairs, triples, and so on are tuples). You can walk over the fields in a case class with productIterator, so the above code serializes a case class by converting all its fields to strings and prepending the class name to the list. (To be clear, in “real” code you might want to use a more robust approach.)

On the deserialization side, you can see another nice use of Scala’s pattern matching:

override def fromBinary(bytes: Array[Byte]) = {
    WorkQueueMessage.unpacked(bytes).toList match {
        case "SpiderAndCache" :: url :: Nil =>
            SpiderAndCache(url)
        case whatever =>
            throw new Exception("Bad message: " + whatever)
    }
}

The :: operator (“cons” for the Lisp crowd) joins elements in a list, so we’re matching a list with two elements, where the first one is the string"SpiderAndCache". You could also write this as:

case List("SpiderAndCache", url) =>

Checking AMQP connectivity

In AMQPCheck.scala you’ll find some code that uses RabbitMQ’s Java APIdirectly, rather than Akka AMQP. This code exists for two reasons:

  • it verifies that the AMQP broker exists and is properly configured; once Akka AMQP starts, you’ll get a deluge of backtraces if the broker is missing as Akka continuously tries to recover. The code in AMQPCheck.scala gives you one nice error message.
  • it lets the web process block on startup until the indexer starts up, so startup proceeds cleanly without any backtraces.

More on AMQP

AMQP is an involved topic. RabbitMQ has a nice tutorial that’s worth checking out. You can use the message queue in many flexible ways.

Heroku makes it simple to experiment and see what happens if you run multiple instances of the same process, as you architect the relationships among your processes.

Caching results in MongoDB

Web Words uses AMQP as a “control channel” to kick the indexer process to index a new site, and tell a web process when indexing is completed. Actual data doesn’t go via AMQP, however. Instead, it’s stored in MongoDB by the indexer process and retrieved by the web process.

MongoDB is a convenient solution for caching object-like data. It stores collections of JSON-like objects (the format is called BSON since it’s a compact binary version of JSON). A special feature of MongoDB called a capped collectionis ideal for a cache of such objects. Capped collections use a fixed amount of storage, or store a fixed number of objects. When the collection fills up, the least-recently-inserted objects are discarded, that is, it keeps whatever is newest. Perfect for a cache! MongoDB is pretty fast, too.

IndexStorageActor

IndexStorageActor encapsulates MongoDB for Web Words. AnIndexStorageActor stores Index objects: simple.

IndexStorageActor uses the Casbah library, a Scala-friendly wrapper around MongoDB’s Java driver.

Much of the code in IndexStorageActor.scala deals with converting Index objects to DBObject objects. This code could be replaced with a library such as Salat, but it’s done by hand in Web Words to show how you’d do it manually (and avoid another dependency).

IndexStorageActor is an actor pool, extending IOBoundActorPool. Because Casbah is a blocking API, each worker in the pool will tie up a thread for the duration of the request to MongoDB. This can be dangerous; by default, Akka has a maximum number of threads, and running out of threads could lead to deadlock. At the same time, you don’t want to have too few threads in your IO-bound pool because you can do quite a bit of IO at once (since it doesn’t use up CPU). Tuning this is an application-specific exercise.

IndexStorageActor could override the upperBound method to adjust the maximum size of its actor pool and thus the maximum number of simultaneous outstanding MongoDB requests.

An asynchronous API would be a better match for Akka, and there’s one in development called Hammersmith.

Using a capped collection

IndexStorageActor’s use of MongoDB may be pretty self-explanatory.

To set up a capped collection:

db.createCollection(cacheName,
            MongoDBObject("capped" -> true,
                "size" -> sizeBytes,
                "max" -> maxItems))

To add a new Index object to the collection:

cache.insert(MongoDBObject("url" -> url,
                "time" -> System.currentTimeMillis().toDouble,
                "index" -> indexAsDBObject(index)))

To look up an old Index object:

val cursor =
    cache.find(MongoDBObject("url" -> url))
                .sort(MongoDBObject("$natural" -> -1))
                .limit(1)

The special key "$natural" in the above “sort object” refers to the order in which objects are naturally positioned on disk. For capped collections, this is guaranteed to be the order in which objects were inserted. The -1 means reverse natural order, so the sort retrieves the newest object first.

Build and deploy: SBT, the start-script plugin, and ScalaTest

The build for Web Words illustrates:

  • SBT 0.11 – https://github.com/harrah/xsbt/wiki/
  • xsbt-start-script-plugin – https://github.com/typesafehub/xsbt-start-script-plugin
  • testing with ScalaTest – http://www.scalatest.org/

Here’s a quick tour of each one, as applied to Web Words.

Simple Build Tool (SBT)

SBT build configurations are themselves written in Scala; you can find the Web Words build in project/Build.scala. This is an example of a “full” configuration; there’s another (more concise but less flexible) build file format called “basic” configuration. Full configurations are .scala files while basic configurations are in special .sbt files. While full configurations require more typing, basic configurations have the downside that you need to start over with a full configuration if you discover a need for more flexibility.

SBT build files are concerned with lists of settings that control the build. An SBT build will have a tree of projects, where each project will have its own list of settings.

In project/Build.scala, you can see there are four projects; the project calledroot is an aggregation of the webindexer, and common projects that contain the actual code.

Here’s the definition of the common project, which is a library shared between the other two projects:

lazy val common = Project("webwords-common",
                       file("common"),
                       settings = projectSettings ++
                       Seq(libraryDependencies ++= Seq(akka, akkaAmqp, asyncHttp, casbahCore)))

According to this configuration,

  • The project is named “webwords-common”; this name will be used to name the jar if you run sbt package, so the prefix webwords- is intended to avoid a jar called common.jar.
  • The project will be in the directory common (each project directory should contain a src/main/scalasrc/test/scala, etc. for a Scala project, orsrc/main/javasrc/main/resources, and so on).
  • The project’s settings will include projectSettings (a list of settings defined earlier in the file to be included in all projects), plus some library dependencies.

Settings are defined with some special operators. In project/Build.scala you will see:

  • := sets a setting to a value
  • += adds a value to a list-valued setting
  • ++= concatenates a list of values to a list-valued setting

xsbt-start-script-plugin

Have a look at the Procfile for Web Words, you’ll see it contains the following:

web: web/target/start
indexer: indexer/target/start

The format is trivial:

NAME OF PROCESS: SHELL CODE TO EXECUTE

Heroku will run the given shell code to create each process. In this case, theProcfile launches a script called start created by SBT for each process.

These scripts are generated by xsbt-start-script-plugin as part of its stage task.stage is a naming convention that could be shared by other plugins and means “prepare the project to be run, in an environment that deploys source trees rather than packages.” In other words, stage does what you want in order to compile and run the application in-place, using the class files generated during the compilation. While sbt package (built in to SBT) creates a .jar andsbt package-war (provided by xsbt-web-plugin) creates a .warsbt stage gives you something you can execute (from Procfile or its non-Heroku equivalent).

If you run sbt stage and have a look at the generated start script, you’ll see that it’s setting up a classpath and specifying which main class the JVM should run.

The xsbt-start-script-plugin README explains how to use it in a project, in brief you add its settings to your project, for example the indexer project in Web Words:

lazy val indexer = Project("webwords-indexer",
                          file("indexer"),
                          settings = projectSettings ++
                          StartScriptPlugin.startScriptForClassesSettings ++
                          Seq(libraryDependencies ++= Seq(jsoup))) dependsOn(common % "compile->compile;test->test")

startScriptForClassesSettings defines stage to run a main method found in the project’s .class files. The plugin can also generate a script to run .war files and.jar files, if you’d rather package the project and launch from a package.

ScalaTest

It’s possible to use JUnit with a Scala project, but there are a few popular Scala-based test frameworks (you can use them for Java projects too, by the way). Web Words uses ScalaTest, two other options are Specs2 and ScalaCheck.

ScalaTest gives you a few choices for how to write test files. An example from Web Words looks something like this:

it should "store and retrieve an index" in {
    val storage = newActor
    cacheIndex(storage, exampleUrl, sampleIndex)
    val fetched = fetchIndex(storage, exampleUrl)
    fetched should be(sampleIndex)
    storage.stop
}

ScalaTest provides a “domain-specific language” or DSL for testing. The idea is to use Scala’s flexibility to define a set of objects and methods that map naturally to a problem domain, without having to give up type-safety or write a custom parser. (SBT’s configuration API is another example of a DSL.)

The ScalaTest DSL lets you write:

it should "store and retrieve an index" in

or

fetched should be(sampleIndex)

rather than something more stilted.

There are quite a few tests in Web Words, illustrating one way to go about testing an application. You may find TestHttpServer.scala useful: it embeds Jetty to run a web server locally. Use this to test HTTP client code.

If you declare a project dependency with "compile->compile;test->test", then the tests in that project can use code from the dependency’s tests. For example, in Build.scala, the line

dependsOn(common % "compile->compile;test->test")

enables the web and indexer projects to use TestHttpServer.scala located in thecommon project.

Often it’s useful to define tests in the same package as the code you’re testing. This allows tests to access types and fields that aren’t accessible outside the package.

Summing it up

A real application has quite a few moving parts. In Web Words, some of those are traditional Java libraries (JSoupJettyRabbitMQ Java client,AsyncHttpClient) while others are shiny new Scala libraries (AkkaCasbah,ScalaTest).

Scala and Akka are pragmatic tools to pull the JVM ecosystem together and write horizontally scalable code, without the dangers of rolling your own approach to concurrency. Programming in functional style with the actor model naturally scales out, making these approaches a great fit for cloud platforms such as Heroku.

About Typesafe

Typesafe, a company founded by the creators of Scala and Akka, offers the commercially-supported Typesafe Stack. To learn more, or if you just want to hang out and talk Scala, don’t hesitate to look us up at typesafe.com.

 

If this article is incorrect or outdated, or omits critical information, please let us know. For all other issues, please see our support channels.

 

 

[repost ]Async SQL and Akka

original:http://www.gamlor.info/wordpress/2012/05/async-sql-and-akka/

I’ve already written several times about Akka and async programming with it. For example doing access files and webservices with it. So async API stack is complete, right? We’ve got async network and web access via NIO and frameworks like Play 2.0SprayHTTPClient etc. And with NIO 2 we easily can access the file system in asynchronous way. But wait, what about your database? What if tons of your data sits in a good old relational database? Unfortunately the current JDBC model is blocking by definition. No asynchronous operation is allowed.

The standard way of doing things for async SQL operations is to use some background threads and do things there. And for many applications this is good enough. But if you do more and more db operations you start to consume more and more threads, which will be waiting for blocking operations. Wouldn’t it be nicer if the database operations are truly asynchronous like other operations?

JDBC is nice, but let's you wait

JDBC is nice, but let’s you wait

So I’ve looked for an alternative to JDBC. I’ve found two approaches to provide a asynchronous JDBC alternative. The async-mysql-connector and ADBCJ. Unfortunate both are not really actively maintained. Anyhow I decided to go ahead with ADBCJ. More about that later. Let’s first take a look at how we to use everything.

Async SQL in Akka

Let’s take a short look how to use that API in Akka. First we need to configure the database in our Akka configuration:

myapp{
     async-jdbc{
        url = “adbcj:mysql://localhost/employees”
        username = “root”
        password = “”
     }
}

After that we can use the Database Akka extension to create a connection. And with that connection we then can do our database operations. The API is very JDBC like, but with the difference that all operations run asynchronously. Since they return Akka futures, you can compose the operations like your used to with Akka futures. Here an example:

// grab the extension
val dbSupport = Database(actorSystem)
val result = for{
  // open the connection asynchronously
  connection <- dbSupport.connect()
  // Execute a query quest
  newestEmployee <- connection.executeQuery(
“SELECT first_name,last_name,hire_date FROM employees ” +
“ORDER BY hire_date DESC ” +
“LIMIT 0,5”)
  // When the results arrived, close the connection
   _ <- connection.close()
} yield newestEmployee
// Process the results
result.onSuccess{
  case resultSet =>{
resultSet.foreach{
row => println(row(“first_name”).getString
+ ” “+row(“first_name”).getString
+ ” since ” + row(“hire_date”).getString )
}
  }
}.onFailure{
  case e:Exception =>{
e.printStackTrace()
  }
}

A regular query returns a immutable ResultSet. Alternatively you can pass a event handler to the query method. That handler will be called as the data streams in. This way you can assemble the data yourself. For example it can build up a string:

val resultAsString = for {
// open the connection asynchronously
  connection <- dbSupport.connect()
  // Execute a query quest
  dataAsString <- connection.executeQuery(
“SELECT first_name,last_name,hire_date FROM employees ” +
“ORDER BY hire_date DESC ” +
“LIMIT 0,5”, “”) {
// Instead of creating a result set, we also can directly react
// as the data streams in
case StartRow(jsonToBuild) => jsonToBuild + “-“
case AValue(value, jsonToBuild) => jsonToBuild + “,” + value.getString
case EndRow(jsonToBuild) => jsonToBuild + “\n”
  }
  // When the results arrived, close the connection
  _ <- connection.close()
} yield dataAsString
// Process the results
resultAsString.onSuccess {
  case stringData => print(stringData)
}.onFailure {
  case e: Exception => {
e.printStackTrace()
  }
}

Of course transactions, updates and prepared statements are also supported:

val result = for {
  // open the connection asynchronously
  connection <- dbSupport.connect()
  _ <- connection.beginTransaction()
  insertedInfo <- connection.executeUpdate(
“INSERT INTO employees (emp_no, birth_date, first_name, last_name, gender, hire_date) ” +
“VALUES (42, ‘1986-05-07’, ‘Roman’, ‘Stoffel’, ‘M’, ‘2012-05-17’)”)
  statement <- connection.prepareQuery(“SELECT first_name FROM employees WHERE emp_no = ?”)
  queryResult <- statement.execute(42)
  _ <- connection.rollback()
  _ <- connection.close()
} yield queryResult

Of course it’s annoying to manually close the connection every time. Therefore a .withConnection method exists. It will open a connection for you, run your code and close the connection afterwards. Everything is done asynchronously, therefore closure has to return a future, and the connection is closed when that future completes:

val result = Database(actorSystem)
  .withConnection{
  connection =>connection.executeQuery(
“SELECT first_name,last_name,hire_date FROM employees ” +
“ORDER BY hire_date DESC ” +
“LIMIT 0,5”)
}

And also a .withTransaction exists. It does the same for the transaction. It will commit it if the given closure finishes regular. It rolls the transaction back if the closure or the returned future fails.

connection.withTransaction{
  txConn =>
txConn.executeQuery(
“SELECT first_name,last_name,hire_date FROM employees ” +
“ORDER BY hire_date DESC ” +
“LIMIT 0,5”)
}

ADBCJ calls you back

ADBCJ calls you back

The ADBCJ Stuff

Now the stuff above is just a thin wrapper around the ADBCJ API. It’s basically a JDBC clone, where all operations are asynchronous. That also means that it requires a driver which implement the operations asynchronously. Originally three drivers where around. A MySQL driver, a PostgreeSQL driver and a Bridge to JDBC. Of course the JDBC bridge cannot remove the blocking calls. It just just executes them on worker threads. So it doesn’t bring a real benefit.

Now unfortunately the ADBCJ implementations were only proof of concept implementations. So I forked it on Github, fixed the worst issues and implemented a few missing features. But I did that only for MySQL. So what’s the exact state of it?

  • The  MySQL and JDBC bridge are up to date. The basics work: Transactions, queries, prepared statements etc.
  • The PostgreeSQL compiles, but I haven’t maintained it. Didn’t bother to run the tests. Prepared statements not implemented.
  • Many data type implementations are missing.
  • The test suite is not in a good shape. I’ve improved it a lot, but it is still very minimalistic. It just doesn’t cover enough.
  • Connection pool for ADBCJ connections is missing. (There was some implementation, but not tests. Removed it for now until I’ve test etc im Place.)
  • It’s just far away from rock solid.

So for a real world application, ADBCJ would need a lot of work. First to improve the MySQL implementation and second to support other databases. Unfortunately this is tedious database protocol implementation work, which isn’t fun. So unless someone is really desperate, it will never be done.

Hopefully, JDBC version 42.0 will include async operations, and async operations can be done easily ;) .

Get the Stuff

The tiny ‘wrapper’ is on my github repo here: https://github.com/gamlerhart/akka-async-apis. My fork ADBCJ  is here: https://github.com/gamlerhart/adbcj. Also the stuff is on my github hosted Maven Snapshot repository. You can grab it via SBT or Maven:

Repository for Maven: https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots
GroupID: info.gamlor.akkaasync
ArtifactID: akka-dbclient_2.9.1
Version: 1.0-SNAPSHOT

And you also need the MySQL driver:

GroupID: org.adbcj
ArtifactID: mysql-async-driver
Version: 0.3-SNAPSHOT

So via SBT:

resolvers += “Gamlor-Repo” at “https://github.com/gamlerhart/gamlor-mvn/raw/master/snapshots”
libraryDependencies += “com.typesafe.akka” % “akka-actor” % “2.0”
libraryDependencies += “info.gamlor.akkaasync” %% “akka-dbclient” % “1.0-SNAPSHOT”
libraryDependencies += “org.adbcj” % “adbcj-api” % “0.3-SNAPSHOT”
libraryDependencies += “org.adbcj” % “mysql-async-driver” % “0.3-SNAPSHOT”
view rawbuild.sbtThis Gist brought to you by GitHub.

 

The Future

Well, most applications probably don’t have a real need for an asynchronous JDBC replacement. Connection pools and running enough threads is good enough. And many applications can use enough caching to reduce the database operation pressure. So the niche for this stuff is small =).

Next Post

Next time I probably shine some more light on ADBCJ itself.