Tag Archives: Apache Tez

[repost ]Apache Tez

original:http://hortonworks.com/hadoop/tez/

Apache™ Tez generalizes the MapReduce paradigm to a more powerful framework for executing a complex DAG (directed acyclic graph) of tasks. MapReduce has been the data processing backbone for Hadoop®, but its batch-oriented nature makes it unsuited for interactive query. Tez will allow projects in the Apache Hadoop® ecosystem such as Apache Hive and Apache Pig to meet demands for fast response times and extreme throughput at petabyte scale. The Apache Tez project is part of theStinger Initiative.

What Tez Does

Tez is the logical next step for Apache Hadoop after Apache Hadoop YARN. With YARN the community generalized Hadoop MapReduce to provide a general-purpose resource management framework wherein MapReduce became merely one of the applications that could process data in a Hadoop cluster. Tez provides a more general data-processing application to the benefit of the entire ecosystem.

Tez will speed Pig and Hive workloads by an order of magnitude. By eliminating unnecessary tasks, synchronization barriers, and reads from and write to HDFS, Tez speeds up data processing across both small-scale, low-latency and large-scale, high-throughput workloads.

hadoopstack

Read more on the motivations, architecture and performance gains of Apache Tez for data processing in Hadoop. The series has the following posts:

How Tez Works

With the emergence of Apache Hadoop YARN as the basis of next generation data-processing architectures, there is a strong need for an application which can execute a complex DAG of tasks which can then be shared by other applications in the Hadoop ecosystem. The constrained DAG expressible in MapReduce (one set of maps followed by one set of reduces) often results in multiple MapReduce jobs which harm latency for short queries and throughput for large-scale queries. Tez provides a more expressive DAG of tasks, within a single application or job, that is better aligned with the required processing task. For example, any given SQL query can be expressed as a single job using Tez.

[repost ]Runtime API in Apache Tez

original:http://hortonworks.com/blog/task-api-apache-tez/

This post is the third in our series on the motivations, architecture and performance gains of Apache Tez for data processing in Hadoop. The series has the following posts:

Apache Tez models data processing as a dataflow graph, with the vertices in the graph representing processing of data and edges representing movement of data between the processing. The user logic, that analyses and modifies the data, sits in the vertices. Edges determine the consumer of the data, how the data is transferred and the dependency between the producer and consumer vertices.

For users of MapReduce (MR), the most primitive functionality that Tez can provide is an ability to run a chain of Reduce stages as compared to a single Reduce stage in the current MR implementation. Via the Task API, Tez can do this and much more by facilitating execution of any form of processing logic that does not need to be retrofitted into a Map or Reduce task and also by supporting multiple options of data transfer between different vertices that are not restricted to the MapReduce shuffle transport mechanism.

The Building Blocks of Tez

The Task API provides the building blocks for a user to plug-in their logic to analyze and modify data into the vertex and augment this processing logic with the necessary plugins to transfer and route data between vertices.

Tez models the user logic running in each vertex as a composition of a set of Inputs, a Processor and a set of Outputs.

  • Input: An input represents a pipe through which a processor can accept input data from a data source such as HDFS or the output generated by another vertex.
  • Processor: The entity responsible for consuming one or more Inputs and producing one or more Outputs.
  • Output: An output represents a pipe through which a processor can generate output data for another vertex to consume or to a data sink such as HDFS.

Given that an edge in a DAG is a logical entity that represents a number of physical connections between the tasks of 2 connected vertices, to improve ease of programmability for a developer implementing a new Processor, there are 2 kinds of Inputs and Outputs to either expose or hide the level of complexity:

  • Logical: A corresponding pair of a LogicalInput and a LogicalOutput represent the  logical edge between 2 vertices. The implementation of Logical objects hides all the underlying physical connections and exposes a single view to the data.
  • Physical: The pair of Physical Input and Output represents the connection between a task of the Source vertex and a task of a Destination vertex.

An example of the Reduce stage within an MR job would be a Reduce Processor that receives data from the maps via ShuffleInput and generates output to HDFS. Likewise, an intermediate Reduce stage in an MRR chain would be quite similar to the final Reduce stage except for the difference in the Output type.

tez1

Tez Runtime API

tez3

To implement a new Input, Processor or Output, a user to implement the appropriate interfaces mentioned above. All objects are given a Context object in their initialize functions. This context is the hook for these objects to communicate to the Tez framework. The Inputs and Outputs are expected to provide implementations for their respective Readers and Writers which are then used by the Processor to read/write data. In a task, after the Tez framework has initialized all the necessary Inputs, Outputs and the Processor, the  Tez framework invokes the Processor’s run function and passes the appropriate handles to all the Inputs and Outputs for that particular task.

Tez allows all inputs and outputs to be pluggable. This requires support for passing of information from the Output of a source vertex to the Input of the destination vertex. For example, let us assume that the Output of a source vertex writes all of its data to a key-value store. The Output would need to communicate the “key” to the Input of the next stage so that the Input can retrieve the correct data from the key-value store. To facilitate this, Tez uses Events.

Events in Tez

Events in Tez are a way to pass information amongst different components.

  • The Tez framework uses Events to pass information of system events such as task failures to the required components.
  • Inputs of a vertex can inform the framework of any failures encountered when trying to retrieve data from the source vertex’s Output that in turn can be used by the framework to take failure recovery measures.
  • An Output can pass information of the location of the data, which it generates, to the Inputs of the destination vertex.  An example of this is described in the Shuffle Event diagram which shows how the output of a Map stage informs the Shuffle Input of the Reduce stage of the location of its output via a Data Movement Event.

tez2

Another use of Events is to enable run-time changes to the DAG execution plan. For example, based on the amount of the data being generated by a Map stage, it may be more optimal to run less reduce tasks within the following Reduce stage. Events generated by Outputs are routed to the pluggable Vertex/Edge management modules, allowing them to make the necessary decisions to modify some run-time parameters as needed.

Available implementations of Inputs/Processors/Outputs

 The flexibility of Tez allows anyone to implement their Inputs and Outputs, whether they use blocking/non-blocking transport protocols, handle data in the form of raw bytes/records/key-value pairs etc., and build Processors to handle these variety of Inputs and Outputs.

There is already a small repository of various implementations of Inputs/Outputs/Processors:

  • MRInput and MROutput: Basic input and outputs to handle data to/from HDFS that are MapReduce compatible as they use MapReduce constructs such as InputFormat, RecordReader, OutputFormat and RecordWriter.
  • OnFileSortedOutput and ShuffleMergedInput: A pair of key-value based Input and Output that use the local disk for all I/O and provide the same sort+merge functionality that is required for the “shuffle” edge between the Map and Reduce stages in a MapReduce job.
  • OnFileUnorderedKVOutput and ShuffledUnorderedKVInput: These are similar to the shuffle pair mentioned earlier except that the data is not sorted implicitly. This can be a big performance boost in various situations.
  • MapProcessor and ReduceProcessor: As the names suggest, these processors are available for anyone trying to run a MapReduce job on the Tez execution framework. They can be used to run an MRR chain too.

As the Hive and Pig projects adapt to use Tez, we hope this repository will grow to house a common set of building blocks for use across the different projects.

[repost ]Apache Tez: Dynamic Graph Reconfiguration

original:http://hortonworks.com/blog/apache-tez-dynamic-graph-reconfiguration/

This post is the fifth in our series on the motivations, architecture and performance gains of Apache Tez for data processing in Hadoop. The series has the following posts:

Case Study: Automatic Reduce Parallelism

Motivation

tez1Distributed data processing is dynamic by nature and it is extremely difficult to statically determine optimal concurrency and data movement methods a priori. More information is available during runtime, like data samples and sizes, which may help optimize the execution plan further. We also recognize that Tez by itself cannot always have the smarts to perform these dynamic optimizations. The design of Tez includes support for pluggable vertex management modules to collect relevant information from tasks and change the dataflow graph at runtime to optimize for performance and resource usage. The diagram shows how we can determine an appropriate number of reducers in a MapReduce like job by observing the actual data output produced and the desired load per reduce task.

Performance & Efficiency via Dynamic Graph Reconfiguration

Tez envisions running computation by the most resource efficient and high-performance means possible given the runtime conditions in the cluster and the results of the previous steps of the computation. This functionality is constructed using a couple of basic building blocks

  • Pluggable Vertex Management Modules: The control flow architecture of Tez incorporates a per-vertex pluggable module for user logic that deeply understands the data and computation. The vertex state machine invokes this user module at significant transitions of the state machine such as vertex start, source task completion etc. At these points the user logic can examine the runtime state and provide hints to the main Tez execution engine on attributes like vertex task parallelism.
  • Event Flow Architecture: Tez defines a set of events by which different components like vertices, tasks etc. can pass information to each other. These events are routed from source to destination components based on a well-defined routing logic in the Tez control plane. One such event is the VertexManager event that can be used to send any kind of user-defined payload to the VertexManager of a given vertex.

Case Study: Reduce task parallelism and Reduce Slow-start

Determining the correct number of reduce tasks has been a long standing issue for Map Reduce jobs. The output produced by the map tasks is not known a priori and thus determining that number before job execution is hard. This becomes even more difficult when there are several stages of computation and the reduce parallelism needs to be determined for each stage. We take that as a case study to demonstrate the graph reconfiguration capabilities of Tez.

tez2Reduce Task Parallelism: Tez has a ShuffleVertexManager that understands the semantics of hash based partitioning performed over a shuffle transport layer that is used in MapReduce. Tez defines a VertexManager event that can be used to send an arbitrary user payload to the vertex manager of a given vertex. The partitioning tasks (say the Map tasks) use this event to send statistics such as the size of the output partitions produced to the ShuffleVertexManager for the reduce vertex. The manager receives these events and tries to model the final output statistics that would be produced by the all the tasks. It can then advise the vertex state machine of the Reduce vertex to decrease the parallelism of the vertex if needed. The idea being to first over-partition and then determine the correct number at runtime. The vertex controller can cancel extra tasks and proceed as usual.

tez3Reduce Slow-start/Pre-launch: Slow-start is a MapReduce feature where-in the reduce tasks are launched before all the map tasks complete. The hypothesis being that reduce tasks can start fetching the completed map outputs while the remaining map tasks complete. Determining when to pre-launch the reduce tasks is tricky because it depends on output data produced by the map tasks. It would be inefficient to run reduce tasks so early that they finish fetching the data and sit idle while the remaining maps are still running. In Tez, the slow-start logic is embedded in the ShuffleVertexManager. The vertex state controller informs the manager whenever a source task (here the Map task) completes. The manager uses this information to determine when to pre-launch the reduce tasks and how many to pre-launch. It then advises the vertex controller.

Its easy to see how the above can be extended to determine the correct parallelism for range-partitioning scenarios. The data samples could be sent via the VertexManager events to the vertex manager that can create the key-range histogram and determine the correct number of partitions. It can then assign the appropriate key-ranges to each partition. Thus, in Tez, this operation could be achieved without the overhead of a separate sampling job.

Learn more about Apache Tez here. 

[repost ]Re-Using Containers in Apache Tez

original:http://hortonworks.com/blog/re-using-containers-in-apache-tez/

This post is the sixth in our series on the motivations, architecture and performance gains of Apache Tez for data processing in Hadoop. The series has the following posts:

Motivation

Tez follows the traditional Hadoop model of dividing a job into individual tasks, all of which are run as processes viaYARN, on the users’ behalf – for isolation, among other reasons. This model comes with inherent costs – some of which are listed below.

  • Process startup and initialization cost, especially when running a Java process is fairly high. For short running tasks, this initialization cost ends up being a significant fraction of the actual task runtime. Re-using containers can significantly reduce this cost.
  • Stragglers have typically been another problem for jobs – where a job runtime is limited by the slowest running task. With reduced static costs per tasks – it becomes possible to run more tasks, each with a smaller work-unit. This reduces the runtime of stragglers (smaller work-unit), while allowing faster tasks to process additional work-units which can overlap with the stragglers.
  • Re-using containers has the additional advantage of not needing to allocate each container via the YARN ResourceManager (RM).

Other than helping solve some of the existing concerns, re-using containers provide additional opportunities for optimization where data can be shared between tasks.

Consideration for Re-Using Containers

Compatibility of containers

Each vertex in Tez specifies parameters, which are used when launching containers. These include the requested resources (memory, CPU etc), YARN LocalResources, the environment, and the command line options for tasks belonging to this Vertex. When a container is first launched, it is launched for a specific task and uses the parameters specified for the task (or vertex) – this then becomes the container’s signature. An already running container is considered to be compatible for another task when the running container’s signature is a superset of what the task requires.

Scheduling

Initially, when no containers are available, the Tez AM will request containers from the RM with location information specified, and rely on YARN’s scheduler for locality-aware assignments. However, for containers which are being considered for re-use, the scheduling smarts offered by YARN are no longer available.

The Tez scheduler works with several parameters to take decisions on task assignments –  task-locality requirements, compatibility of containers as described above, total available resources on the cluster, and the priority of pending task requests.

When a task completes, and the container running the task becomes available for re-use – a task may not be assigned to it immediately – as tasks may not exist, for which the data is local to the container’s node. The Tez scheduler first makes an attempt to find a task for which the data would be local for the container. If no such task exists, the scheduler holds on to the container for a specific time, before actually allocating any pending tasks to this container. The expectation here, is that more tasks will complete – which gives additional opportunities for scheduling tasks on nodes which are close to the data. Going forward, non-local containers may be used in a speculative manner.

Priority of pending tasks (across different vertices), compatibility and cluster resources are considered to ensure that tasks which are deemed to be of higher priority (either due to a must-run-before relationship, failure, or due to specific scheduling policies) have an available container.

In the future, affinity will become part of the scheduling decision. This could be dictated by common resources shared between tasks, which need only be loaded by the first task running in a container, or by the data generated by the first task, which can then directly be processed by subsequent tasks, without needing to move/serialize the data – especially in the case of One-to-One edges.

Beyond simple JVM Re-Use

Cluster Dependent Work Allocation

At the moment, the number of tasks for a vertex, and their corresponding ‘work-units’ are determined up front. Going forward, this is likely to change to a model, where a certain number of tasks are setup up front based on cluster resources, but work-units for these tasks are determined at runtime. This allows additional optimizations where tasks which complete early are given additional work, and also allows for better locality-based assignment of work.

Object Registry

Each Tez JVM (or container) contains an object cache, which can be used to share data between different tasks running within the same container. This is a simple Key-Object store, with different levels of visibility/retention. Objects can be cached for use within tasks belonging to the same Vertex, for all tasks within a DAG, and for tasks running across a Tez Session (more on Sessions in a subsequent post). The resources being cached may, in the future, be made available as a hint to the Tez Scheduler for affinity based scheduling.

Examples of usage:

tez11) Hive makes use of this object registry to cache data for Broadcast Joins, which is fetched and computed once by the first task, and used directly by remaining tasks which run in the same JVM.

2) The sort buffer used by OnFileSortedOutput can be cached, and re-used across tasks.

Learn more about Apache Tez here. 

[repost ]Apache Tez—对MapReduce数据处理的归纳

original:http://www.infoq.com/cn/news/2013/09/TEZ

最近的一篇InfoQ文章中曾讨论过,Hortonworks新的Stinger Initiative非常依赖Tez——一个全新Hadoop数据处理框架。

博客文章Apache Tez:Hadoop数据处理的新篇章中写道:

“诸如Hive和Pig等更高级别的数据处理应用,需要这样的一个执行框架:该框架能够用有效的方式,表达这些应用的复杂的查询逻辑,并且在执行查询时能够保证高性能。Apache Tez……给出了传统MapReduce的一种替代方案,让任务能够满足对快速响应时间和PB量级的极端吞吐量的需求。”

为了实现这一目标,Tez并没有将数据处理按照单任务建模,而是作为一种数据流图来处理:

……图中的顶点表示应用逻辑,而边则表示数据转移。丰富的数据流定义API,让用户能够用直观的方式表达复杂的查询逻辑。对于更高级别的声明式应用程序(如Hive和Pig)所生成的查询计划来说,这简直是一种天作之合……数据流管道可以被表示为单一的Tez任务,它会运行整个计算。而Tez负责将这个逻辑图扩展为任务的物理图,并执行它。

在Tez的顶点上,特定的用户逻辑以输入、处理器和输出模块的形式建模,输入和输出模块定义了输入和输出数据(包括格式、访问方法和位置),而处理器模块定义了数据转换逻辑——它可以用MapReduce任务或Reducer的形式表示。虽然Tez并不明确地强制要求任何数据格式的限制,但它需要输入、输出和处理器能够互相兼容。类似地,由一条边连接的输入/输出对,在格式/位置上必须是兼容的。

博客文章Apache Tez中的数据处理API,描绘了一套简单的Java API,用于表示数据处理的DAG(有向无环图)。该API包含三部分:

  • DAG定义了全体任务。用户为每个数据处理任务创建DAG对象。
  • Vertex定义了用户逻辑,以及执行该用户逻辑所需的资源与环境。用户为任务中的每一步创建Vertex对象,并将其添加到DAG。
  • Edge定义了生产者和消费者顶点之间的链接。用户创建Edge对象,用来连接生产者和消费者顶点。

Tez所定义的边属性,使其能够将用户任务实例化、配置其输入输出、恰当地调度它们,并定义任务之间的数据如何路由。Tez还支持通过指定用户指南、数据大小和资源,为每个顶点的执行定义其并发机制。

  • 数据转移:定义了任务之间数据的路由选择。
    • 一对一:数据从第i个生产者任务路由到第i个消费者任务。
    • 广播:数据从一个生产者任务路由到所有消费者任务。
    • 散列:生产者任务以碎片的形式散播数据,而消费者任务收集碎片。来自各个生产者任务的第i块碎片,都会路由到第i个消费者任务。
  • 调度:定义了一个消费者任务何时被设定为以下内容。
    • 顺序的:消费者任务被安排在某个生产者任务完成之后。
    • 并发的:消费者任务必须与某个生产者任务同时执行。
  • 数据源:将某个任务输出的生命周期/可靠性定义为如下内容。
    • 持续的:在任务推出后,输入将依旧可用——它或许在之后被丢弃。
    • 持久可靠:输入将被可靠地存储,而且将永远可用。
    • 短暂的:输出仅在生产者任务运行过程中可用,

有关Tez架构的更多细节,请参阅Tez设计文档

用数据流来表现数据处理的理念并不算新鲜——这正是Cascading的基础,而且许多使用Oozie的应用也实现了这一目的。相比之下,Tez的优势在于,将这一切都放在了一个单一的框架中,并针对资源管理(基于Apache Hadoop YARN)、数据传输和执行,对该框架进行了优化。此外,Tez的设计还提供了对可热插拔的顶点管理模块的支持,用来收集来自任务的相关信息,并在运行时改变数据流图,从而为了性能和资源使用进行优化。

查看英文原文:Apache Tez – a Generalization of the MapReduce Data Processing