Tag Archives: Apache Hadoop

[repost ]Analyzing Twitter Data with Apache Hadoop, Part 2: Gathering Data with Flume

original:http://blog.cloudera.com/blog/2012/10/analyzing-twitter-data-with-hadoop-part-2-gathering-data-with-flume/

This is the second article in a series about analyzing Twitter data using some of the components of the Hadoop ecosystem available in CDH, Cloudera’s open-source distribution of Apache Hadoop and related projects. In the first article, you learned how to pull CDH components together into a single cohesive application, but to really appreciate the flexibility of each of these components, we need to dive deeper.

Every story has a beginning, and every data pipeline has a source. So, to build Hadoop applications, we need to get data from a source into HDFS.

Apache Flume is one way to bring data into HDFS using CDH. The Apache Flume website describes Flume as “a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data.” At the most basic level, Flume enables applications to collect data from its origin and send it to a resting location, such as HDFS. At a slightly more detailed level, Flume achieves this goal by defining dataflows consisting of three primary structures: sourceschannels and sinks. The pieces of data that flow through Flume are called events, and the processes that run the dataflow are called agents.

In the Twitter example, we used Flume to collect data from the Twitter Streaming API, and forward it to HDFS. Looking closer at the Flume pipeline from that example, we come away with a system like this:

 

In the rest of this post, we’ll take an in-depth look at the pieces of Flume that are used to build dataflows, and specifically, how they were used in the example.

Sources

source is just what it sounds like: the part of Flume that connects to a source of data, and starts the data along its journey through a Flume dataflow. A source processes events and moves them along by sending them into a channel. Sources operate by gathering discrete pieces of data, translating the data into individual events, and then using the channel to process events one at a time, or as a batch.

Sources come in two flavors: event-driven or pollable. The difference between event-driven and pollable sources is how events are generated and processed. Event-driven sources typically receive events through mechanisms like callbacks or RPC calls. Pollable sources, in contrast, operate by polling for events every so often in a loop. Another good way to frame this differentiation is as a push-versus-pull model, where event-driven sources have events pushed to them, and pollable sources pull events from a generator.

Examining the TwitterSource

In our Twitter analysis example, we built a custom source called TwitterSource. To understand how sources operate more thoroughly, let’s look at how the TwitterSource was built. We can start with a very generic piece of boilerplate code:

/**
 * A template for a custom, configurable Flume source
 */
public class BoilerplateCustomFlumeSource extends AbstractSource
    implements EventDrivenSource, Configurable {

  /**
   * The initialization method for the Source. The context contains all the
   * Flume configuration info, and can be used to retrieve any configuration
   * values necessary to set up the Source.
   */
  @Override
  public void configure(Context context) {
    // Get config params with context.get* methods
    // Example: stringParam = context.getString("stringParamName")
  }

  /**
   * Start any dependent systems and begin processing events.
   */
  @Override
  public void start() {
    // For an event-driven source, the start method should spawn
    // a thread that will receive events and forward them to the
    // channel
    super.start();
  }

  /**
   * Stop processing events and shut any dependent systems down.
   */
  @Override
  public void stop() {
    super.stop();
  }
}

With this code, we have a configurable source that we can plug into Flume, although at this stage, it won’t do anything.

The start() method contains the bulk of the source’s logic. In the TwitterSource, the twitter4j library is used to get access to the Twitter Streaming API, using this block of code:

// The StatusListener is a twitter4j API, which can be added to a Twitter
// stream, and will execute callback methods every time a message comes in
// through the stream.
StatusListener listener = new StatusListener() {
  // The onStatus method is a callback executed when a new tweet comes in.
  public void onStatus(Status status) {
    Map headers = new HashMap();
    // The EventBuilder is used to build an event using the headers and
    // the raw JSON of a tweet
    headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
    Event event = EventBuilder.withBody(
        DataObjectFactory.getRawJSON(status).getBytes(), headers);

    try {
      getChannelProcessor().processEvent(event);
    } catch (ChannelException e) {
      // If we catch a channel exception, it’s likely that the memory channel
      // does not have a high enough capacity for our rate of throughput, and
      // we tried to put too many events in the channel. Error handling or
      // retry logic would go here.
      throw e;
    }
  }

  // This listener will ignore everything except for new tweets
  public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) {}
  public void onTrackLimitationNotice(int numberOfLimitedStatuses) {}
  public void onScrubGeo(long userId, long upToStatusId) {}
  public void onException(Exception ex) {}
};

The StatusListener implements a set of callbacks that will be called when receiving a new tweet, represented by aStatus object. There are other callbacks available but for the purposes of this source, we’re only concerned with new tweets. As can be seen in the TwitterSource, the StatusListener is created and registered in the start() method.

Looking a bit closer, we can pick out the code that actually builds an event out of a tweet:

headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));
Event event = EventBuilder.withBody(
      DataObjectFactory.getRawJSON(status).getBytes(), headers));

The EventBuilder interface takes a byte array and an optional set of headers, and creates an event, which we’re putting on the end of a list. The source processes events as they come in and passes them along to the channel:

channel.processEvent(event);

In order to connect to the Twitter APIs, we need access to some application-specific secrets. In the TwitterSource, these are variables like the consumerKey and consumerSecret, which are used to setup the Twitter stream:

twitterStream.setOAuthConsumer(consumerKey, consumerSecret);

So, where did the consumerKey and consumerSecret get defined? For this source, these variables are configuration parameters. Taking a look at the configure() method, we can see where the variables are defined:

consumerKey = context.getString(TwitterSourceConstants.CONSUMER_KEY_KEY);
consumerSecret = context.getString(TwitterSourceConstants.CONSUMER_SECRET_KEY);

The context object contains all the configuration parameters for the source, which can be pulled out and stored in instance variables using a variety of get accessors.

With this code in place, the custom source will be able to process tweets as events. The next step is to define where those events should go and how they should get there.

Configuring the Flume Agent

Before we discuss how to actually configure a Flume agent, we need to know what a configuration looks like. For the Twitter analysis example, we used this configuration:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = [required]
TwitterAgent.sources.Twitter.consumerSecret = [required]
TwitterAgent.sources.Twitter.accessToken = [required]
TwitterAgent.sources.Twitter.accessTokenSecret = [required]
TwitterAgent.sources.Twitter.keywords = hadoop, big data, analytics, bigdata, cloudera, data science, data scientist, business intelligence, mapreduce, data warehouse, data warehousing, mahout, hbase, nosql, newsql, businessintelligence, cloudcomputing

TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

Each object that is defined will be referenced by these names throughout the rest of the configuration. Most Flume configuration entries will follow a format very similar to the configuration of log4j appenders. An entry will look like[agent_name].[object_type].[object_name].[parameter_name], where [object_type] is one of sources, channels, or sinks.

Channels

Channels act as a pathway between the sources and sinks. Events are added to channels by sources, and later removed from the channels by sinks. Flume dataflows can actually support multiple channels, which enables more complicated dataflows, such as fanning out for replication purposes.

In the case of the Twitter example, we’ve defined a memory channel:

TwitterAgent.channels.MemChannel.type = memory

Memory channels use an in-memory queue to store events until they’re ready to be written to a sink. Memory channels are useful for dataflows that have a high throughput; however, since events are stored in memory in the channel, they may be lost if the agent experiences a failure. If the risk of data loss is not tolerable, this situation can be remedied using a different type of channel – i.e., with one that provides stronger guarantees of data durability like a FileChannel.

Sinks

The final piece of the Flume dataflow is the sink. Sinks take events and send them to a resting location or forward them on to another agent. In the Twitter example, we utilized an HDFS sink, which writes events to a configured location in HDFS.

The HDFS sink configuration we used does a number of things: First, it defines the size of the files with the rollCountparameter, so each file will end up containing 10,000 tweets. It also retains the original data format, by setting thefileType to DataStream and setting writeFormat to Text. This is done instead of storing the data as a SequenceFile or some other format. The most interesting piece, however, is the file path:

TwitterAgent.sinks.HDFS.hdfs.path = hdfs://hadoop1:8020/user/flume/tweets/%Y/%m/%d/%H/

The file path, as defined, uses some wildcards to specify that the files will end up in a series of directories for the year, month, day and hour during which the events occur. For example, an event that comes in at 9/20/2012 3:00PM will end up in HDFS at hdfs://hadoop1:8020/user/flume/tweets/2012/09/20/15/.

Where does the timestamp information come from? If you’ll recall, we added a header to each event in the TwitterSource:

headers.put("timestamp", String.valueOf(status.getCreatedAt().getTime()));

This timestamp header is used by Flume to determine the timestamp of the event, and is used to resolve the full path where the event should end up.

Starting the Agent

Now that we understand the configuration of our source, channel and sink, we need to start up the agent to get the dataflow running. Before we actually start the agent, we need to set the agent to have the appropriate name as defined in the configuration.

The file /etc/default/flume-ng-agent contains one environment variable defined called FLUME_AGENT_NAME. In a production system, for simplicity, the FLUME_AGENT_NAME will typically be set to the hostname of the machine on which the agent is running. However, in this case, we set it to TwitterAgent, and we’re ready to start up the process.

We can start the process by executing

$ /etc/init.d/flume-ng-agent start

Once it’s going, we should start to see files showing up in our /user/flume/tweets directory:

natty@hadoop1:~/source/cdh-twitter-example$ hadoop fs -ls /user/flume/tweets/2012/09/20/05
  Found 2 items
  -rw-r--r--   3 flume hadoop   255070 2012-09-20 05:30 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893253
  -rw-r--r--   3 flume hadoop   538616 2012-09-20 05:39 /user/flume/tweets/2012/09/20/05/FlumeData.1348143893254.tmp

As more events are processed, Flume writes to files in the appropriate directory. A temporary file, suffixed with .tmp, is the file currently being written to. That .tmp suffix is removed when Flume determines that the file contains enough events or enough time has passed to roll the file. Those thresholds are determined in the configuration of the HDFS sink, as we saw above, by the rollCount and rollInterval parameters, respectively.

Conclusion

In this article, you’ve seen how to develop a custom source and process events from Twitter. The same approach can be used to build custom sources for other types of data. Also, we’ve looked at how to configure a basic, complete dataflow for Flume, to bring data into HDFS as it is generated. A distributed filesystem isn’t particularly useful unless you can get data into it, and Flume provides an efficient, reliable way to achieve just that.

In Part 3, we’ll examine how to query this data with Apache Hive.

Jon Natkins (@nattybnatkins) is a Software Engineer at Cloudera

[repost]The Apache Projects – The Justice League Of Scalability

In this post I will define what I believe to be the most important projects within the Apache Projects for building scalable web sites and generally managing large volumes of data.

If you are not aware of the Apache Projects, then you should be. Do not mistake this for The Apache Web (HTTPD) Server, which is just one of the many projects in the Apache Software Foundation. Each project is it’s own open-source movement and, while Java is often the language of choice, it may have been created in any language.

I often see developers working on solutions that can be easily solved by using one the tools in the Apache Projects toolbox. The goal of this post is to raise awareness of these great pieces of open-source software. If this is not new to you, then hopefully it will be a good recap of some of the key projects for building scalable solutions.

The Apache Software Foundation
provides support for the Apache community of open-source software projects.

You have probably heard of many of these projects, such as Cassandra or Hadoop, but maybe you did not realize that they all come under the same umbrella. This umbrella is known as the Apache Projects and is a great place to keep tabs on existing and new projects. It is also a gold seal of approval in the open-source world. I think of these projects, or tools, as the superheroes of building modern scalable websites. By day they are just a list of open-source projects listed on a rather dry and hard to navigate website at http://www.apache.org. But by night… they do battle with some of the world’s most gnarly datasets. Terabytes and even petabytes are nothing to these guys. They are the nemesis of high throughput, the digg effect, and the dreaded query-per-second upper limit. They laugh in the face of limited resources. Ok, maybe I am getting carried away here. Let’s move on…

Joining The Justice League

Prior to a project joining the Apache Projects it will first be accepted into the Apache Incubator. For instance,Deltacloud has recently been accepted into the Apache Incubator.

The Apache Incubator has two primary goals:

  • Ensure all donations are in accordance with the ASF legal standards
  • Develop new communities that adhere to our guiding principles

– Apache Incubator Website

You can find a list all the projects currently in the Apache Incubator here.

Our Heroes

Here is a list, in no particular order, of tools you will find in the Apache Software Foundation Projects.

Kryptonite

While each have their major benefits, none is a “silver bullet” or a “golden hammer”. When you design software that scales or does any job very well, then you have to make certain choices. If you are not using the software for the task it is was designed for then you will obviously find it’s weakness. Understanding and balancing the strengths and weaknesses of various solutions will enable you to better design your scalable architecture. Just because Facebook uses Cassandra to do battle with it’s petabytes of data, does not necessarily mean it will be a good solution for your “simple” terabyte-wrestling architecture. What you do with the data is often more important than how much data you have. For instance, Facebook has decided that HBase is now a better solution for many of their needs. [This is the cue for everyone to run to the other side of the boat].

Kryptonite is one of the few things that can kill Superman.

The word kryptonite is also used in modern speech as a synonym for Achilles’ heel, the one weakness of an otherwise invulnerable hero.

– “Krytonite” by Wikipedia

Now, let’s look in more detail at some of the projects that can fly faster than a speeding bullet and leap tall datasets is a single [multi-server, distributed, parallelized, fault-tolerant, load-balanced and adequately redundant] bound.

Apache Cassandra

Cassandra was built by Facebook to hold it’s ridiculous volumes of data within it’s email system. It’s much a like distributed key-value store, but with a hierarchy. The model is very similar to most NoSQL databases. The data-model consists of columns, column-families and super-columns. I will not go into detail here about the data-model, but there is a great intro (see “WTF is a SuperColumn? An Intro to the Cassandra Data Model“) that you can read.

Cassandra can handle fast writes and reads, but its Kryptonite is consistency. It takes time to make sure all the nodes serving the data have the same value. For this reason Facebook is now moving away from Cassandra for its new messaging system, to HBase. HBase is a NoSQL database built on-top of Hadoop. More on this below.

Apache Hadoop

Apache Hadoop, son of Apache Nutch and later raised under the watchful eye of Yahoo, has since become an Apache Project. Nutch is an open-source web search project built on Lucene. The component of Nutch that became Hadoop gave Nutch it’s “web-scale”.

Hadoop’s goal was to manage large volumes of data on commodity hardware, such as thousands of desktop harddrives. Hadoop takes much of it’s design from a paper published by Google on their Bigtable. It stores data on the Hadoop Distributed File-System (a.k.a. “HDFS”) and manages the running of distributed Map-Reduce jobs. In a previous post I gave an example using Ruby with Hadoop to perform Map-Reduce jobs.

Map-Reduce is a way to crunch large datasets using two simple algorithms (“map” and “reduce”). You write these algorithms specific to the data you are processing. Although the your map and reduce code can beextremely simple, it scales across the entire dataset using Hadoop. This applies even if you have petabytes of data across thousands of machines. Your resulting data can be found in a directory on you HDFS disk when the Map-Reduce job completes. Hadoop provides some great web-based tools for visualizing your cluster and monitoring the progress of any running jobs.

Hadoop deals with very large chunks of data. You can tell Hadoop to Map-Reduce across everything it finds under a specific directory within HDFS and then output the results to another directory within HDFS. Hadoop likes [really really] large files (many gigabytes) made from large chunks (eg. 128Mb), so it can stream through them quickly, without many disk-seeks and manage distributing the chunks effectively.

Hadoop’s Kryptonite would be it’s brute force. It is designed for churning through large volumes of data, rather than being real-time. A common use-case is to spool up data for a period of time (an hour) and then run your map-reduce jobs on that data. By doing this you can very efficiently process vasts amounts of data, but you would not have real-time results.

Recommended Reading
Hadoop: The Definitive Guide by Tom White

Apache HBase

Apache HBase is a NoSQL layer on-top of Hadoop that adds structure to your data. HBase uses write-ahead logging to manage writes, which are then merged down to HDFS. A client request is responded to as soon as the update is written to the write-ahead log and the change is made in memory. This means that updates are very fast. The read side is also fast, since data is stored on disk in key order. Subsequently, because data is stored on disk in key-order, scans across sequential keys are fast, due to the low number disk seeks required. Larger scans are not currently possible.

HBase’s Krytonite would be similar to most NoSQL databases out there. Many use-cases still benefit from using relational database and HBase is not a relational database.

Look Out For This Book Coming Soon
HBase: The Definitive Guide by Lars George (May 2011)
Lars has an excellent blog that covers Hadoop and HBase thoroughly.

Apache ZooKeeper

Apache ZooKeeper is the janitor of our Justice League. It is being used more and more in scalable applications such as Apache HBase, Apache Solr (see below) and Katta. It manages an application’s distributed needs, such as configuration, naming and synchronization. All these tasks are important when you have a large cluster with constantly failing disks, failing servers, replication and shifting roles between your nodes.

Apache Solr

Apache Solr is built on-top of one of my favorite Apache Projects, Apache Lucene [Java]. Lucene is a powerful search engine API written in Java. I have built large distributed search engines with Lucene and have been very happy with the results.

Solr packages up Lucene as a product that can be used stand-alone. It provides various ways to interface with the search engine, such as via XML or JSON requests. Therefore, Java knowledge is not a requirement for using it. It adds a layer to Lucene that makes it more easily scale across a cluster of machines.

Apache ActiveMQ

Apache ActiveMQ is a messaging queue much like RabbitMQ or ZeroMQ. I mention ActiveMQ because I has used it, but it is definitely worth checking out the alternatives.

message queue is way to quickly collect data, funnel the data through your system and use the same information for multiple services. This provides separation, within your architecture, between collecting data and using it. Data can be entered into different queues (data streams). Different clients can subscribe to these queues and use the data as they wish.

ActiveMQ has two types of queue, “queue” and “topic”.

The queue type “queue” means that each piece of data on the queue can only be read once. If client “A” reads a piece of data off the queue then client “B” cannot read it, but can read the next item on the queue. This is a good way of dividing up data across a cluster. All the clients in the cluster will take a share of the data and process it, but the whole dataset will only be processed once. Faster clients will take a larger share of the data and slow clients will not hold-up the queue.

A “topic” means that each client subscribed will see all the data, regardless of what the other clients do. This is useful if you have different services all requiring the same dataset. It can be collected and managed once by ActiveMQ, but utilized by multiple processors. Slow clients can cause this type of queue to back-up.

If you are interested in messaging queues then I suggest checking out these Message Queue Evaluation Notesby SecondLife, who are heavy users of messaging queues.

Apache Mahout

The son of Lucene and now a Hadoop side-kick, Apache Mahout was born to be an intelligence engine. From the Hindi word for “elephant driver” (Hadoop being the elephant), Mahout has grown into a top-level Apache Project in it’s own right, mastering the art of Artificial Intelligence on large datasets. While Hadoop can tackle the more heavy-weight datasets on it’s own, more cunning datasets require a little more algorithmic manipulation. Much of the focus of Mahout is on large datasets using Map-Reduce on-top of Hadoop, but the code-base is optimized to run well on non-distributed datasets as well.

We appreciate your comments

If you found this blog post useful then please leave a comment below. I would like to hear which other Apache Projects you think deserve more attention and if you have ever been saved, like Lois Lane, by one of the above.

Resources

original:http://www.philwhln.com/the-apache-projects-the-justice-league-of-scalability